summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-09 19:36:51 +0000
committerAlan Conway <aconway@apache.org>2008-10-09 19:36:51 +0000
commitd6901e52ab3ee9c40eddc4ad3b4787127c36d874 (patch)
tree85b9ba2e0d0922be150480392ec1b706a6df5cd0
parent016ae5acebab0eaf6dd70f5d4d653fdfee93925d (diff)
downloadqpid-python-d6901e52ab3ee9c40eddc4ad3b4787127c36d874.tar.gz
Client-side support for amq.faiover exchange. Connection::getKnownBrokers provides latest list.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703237 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/SessionState.cpp4
-rw-r--r--cpp/src/qpid/client/Connection.cpp17
-rw-r--r--cpp/src/qpid/client/Connection.h9
-rw-r--r--cpp/src/qpid/client/ConnectionAccess.h41
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp29
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h13
-rw-r--r--cpp/src/qpid/client/FailoverListener.cpp41
-rw-r--r--cpp/src/qpid/client/FailoverListener.h17
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp59
-rw-r--r--cpp/src/qpid/client/SessionImpl.h18
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h6
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp49
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp31
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp4
-rw-r--r--cpp/src/tests/cluster_test.cpp68
16 files changed, 245 insertions, 162 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 149c1d91e6..86660b89ac 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -390,6 +390,7 @@ libqpidclient_la_SOURCES = \
qpid/client/SessionBase_0_10.cpp \
qpid/client/SessionBase_0_10.h \
qpid/client/SessionBase_0_10Access.h \
+ qpid/client/ConnectionAccess.h \
qpid/client/SessionImpl.cpp \
qpid/client/StateManager.cpp \
qpid/client/SubscriptionManager.cpp
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 4b92b4968f..ac75b5c5ff 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -147,7 +147,7 @@ void SessionState::senderRecordKnownCompleted() {
void SessionState::senderConfirmed(const SessionPoint& confirmed) {
if (confirmed > sender.sendPoint)
- throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed commands not yet sent."));
+ throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed < " << confirmed << " but only sent < " << sender.sendPoint));
QPID_LOG(debug, getId() << ": sender confirmed point moved to " << confirmed);
ReplayList::iterator i = sender.replayList.begin();
while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) {
@@ -169,7 +169,7 @@ void SessionState::senderCompleted(const SequenceSet& commands) {
QPID_LOG(debug, getId() << ": sender marked completed: " << commands);
sender.incomplete -= commands;
// Completion implies confirmation but we don't handle out-of-order
- // confirmation, so confirm only the first contiguous range of commands.
+ // confirmation, so confirm up to the end of the first contiguous range of commands.
senderConfirmed(SessionPoint(commands.rangesBegin()->end()));
}
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index b8b4f9ccf3..27706fab8c 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -44,7 +44,7 @@ using namespace qpid::sys;
namespace qpid {
namespace client {
-Connection::Connection() : channelIdCounter(0), version(framing::highestProtocolVersion) {}
+Connection::Connection() : version(framing::highestProtocolVersion) {}
Connection::~Connection(){ }
@@ -106,26 +106,19 @@ void Connection::open(const ConnectionSettings& settings)
impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
impl->open();
- max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
}
-Session Connection::newSession(const std::string& name) {
+Session Connection::newSession(const std::string& name, uint32_t timeout) {
if (!isOpen())
throw Exception(QPID_MSG("Connection has not yet been opened"));
- shared_ptr<SessionImpl> simpl(
- new SessionImpl(name, impl, ++channelIdCounter, max_frame_size));
- impl->addSession(simpl);
- simpl->open(0);
Session s;
- SessionBase_0_10Access(s).set(simpl);
+ SessionBase_0_10Access(s).set(impl->newSession(name, timeout));
return s;
}
void Connection::resume(Session& session) {
if (!isOpen())
throw Exception(QPID_MSG("Connection is not open."));
-
- session.impl->setChannel(++channelIdCounter);
impl->addSession(session.impl);
session.impl->resume(impl);
}
@@ -134,4 +127,8 @@ void Connection::close() {
impl->close();
}
+std::vector<Url> Connection::getKnownBrokers() {
+ return isOpen() ? impl->getKnownBrokers() : std::vector<Url>();
+}
+
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index a1575dd524..a5ea40ff38 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -32,6 +32,7 @@ class Url;
namespace client {
class ConnectionSettings;
+class ConnectionImpl;
/**
* Represents a connection to an AMQP broker. All communication is
@@ -42,9 +43,7 @@ class ConnectionSettings;
*/
class Connection
{
- framing::ChannelId channelIdCounter;
framing::ProtocolVersion version;
- uint16_t max_frame_size;
protected:
boost::shared_ptr<ConnectionImpl> impl;
@@ -55,6 +54,7 @@ class Connection
* @see open()
*/
Connection();
+
~Connection();
/**
@@ -157,7 +157,7 @@ class Connection
* If the name is empty (the default) then a unique name will be
* chosen using a Universally-unique identifier (UUID) algorithm.
*/
- Session newSession(const std::string& name=std::string());
+ Session newSession(const std::string& name=std::string(), uint32_t timeoutSeconds = 0);
/**
* Resume a suspended session. A session may be resumed
@@ -167,7 +167,8 @@ class Connection
bool isOpen() const;
-
+ std::vector<Url> getKnownBrokers();
+
friend class ConnectionAccess; ///<@internal
friend class SessionBase_0_10; ///<@internal
};
diff --git a/cpp/src/qpid/client/ConnectionAccess.h b/cpp/src/qpid/client/ConnectionAccess.h
new file mode 100644
index 0000000000..b662fd5d8b
--- /dev/null
+++ b/cpp/src/qpid/client/ConnectionAccess.h
@@ -0,0 +1,41 @@
+#ifndef QPID_CLIENT_CONNECTIONACCESS_H
+#define QPID_CLIENT_CONNECTIONACCESS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Connection.h"
+
+/**@file @internal Internal use only */
+
+namespace qpid {
+namespace client {
+
+
+
+struct ConnectionAccess {
+ static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; }
+ static boost::shared_ptr<ConnectionImpl> getImpl(Connection& c) { return c.impl; }
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_CONNECTIONACCESS_H*/
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index d9ac65c1b3..910c908ee2 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -22,8 +22,10 @@
#include "Connector.h"
#include "ConnectionSettings.h"
#include "SessionImpl.h"
+#include "FailoverListener.h"
#include "qpid/log/Statement.h"
+#include "qpid/Url.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
@@ -40,7 +42,9 @@ using namespace qpid::framing::connection;//for connection error codes
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
- version(v)
+ failover(new FailoverListener()),
+ version(v),
+ nextChannel(1)
{
QPID_LOG(debug, "ConnectionImpl created for " << version);
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
@@ -56,12 +60,14 @@ ConnectionImpl::~ConnectionImpl() {
// Important to close the connector first, to ensure the
// connector thread does not call on us while the destructor
// is running.
- if (connector) connector->close();
+ failover.reset();
+ if (connector) connector->close();
}
-void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
+void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)
{
Mutex::ScopedLock l(lock);
+ session->setChannel(channel ? channel : nextChannel++);
boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
if (s.lock()) throw SessionBusyException();
s = session;
@@ -102,7 +108,9 @@ void ConnectionImpl::open()
connector->setShutdownHandler(this);
connector->connect(host, port);
connector->init();
- handler.waitForOpen();
+ handler.waitForOpen();
+
+ if (failover.get()) failover->start(shared_from_this());
}
void ConnectionImpl::idleIn()
@@ -162,3 +170,16 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings()
return handler;
}
+std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
+ // FIXME aconway 2008-10-08: initialize failover list from openOk or settings
+ return failover ? failover->getKnownBrokers() : std::vector<qpid::Url>();
+}
+
+boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) {
+ boost::shared_ptr<SessionImpl> simpl(new SessionImpl(name, shared_from_this()));
+ addSession(simpl, channel);
+ simpl->open(timeout);
+ return simpl;
+}
+
+void ConnectionImpl::stopFailoverListener() { failover.reset(); }
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index aca26b963d..22450a7ddf 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -24,6 +24,7 @@
#include "Bounds.h"
#include "ConnectionHandler.h"
+
#include "qpid/framing/FrameHandler.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
@@ -41,6 +42,7 @@ namespace client {
class Connector;
class ConnectionSettings;
class SessionImpl;
+class FailoverListener;
class ConnectionImpl : public Bounds,
public framing::FrameHandler,
@@ -54,7 +56,9 @@ class ConnectionImpl : public Bounds,
SessionMap sessions;
ConnectionHandler handler;
boost::scoped_ptr<Connector> connector;
+ boost::scoped_ptr<FailoverListener> failover;
framing::ProtocolVersion version;
+ uint16_t nextChannel;
sys::Mutex lock;
template <class F> void closeInternal(const F&);
@@ -72,13 +76,18 @@ class ConnectionImpl : public Bounds,
void open();
bool isOpen() const;
- void addSession(const boost::shared_ptr<SessionImpl>&);
+ boost::shared_ptr<SessionImpl> newSession(const std::string& name, uint32_t timeout, uint16_t channel=0);
+ void addSession(const boost::shared_ptr<SessionImpl>&, uint16_t channel=0);
void close();
void handle(framing::AMQFrame& frame);
void erase(uint16_t channel);
-
+ void stopFailoverListener();
+
const ConnectionSettings& getNegotiatedSettings();
+
+ std::vector<Url> getKnownBrokers();
+
};
}}
diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp
index 95c137d922..3254686d9c 100644
--- a/cpp/src/qpid/client/FailoverListener.cpp
+++ b/cpp/src/qpid/client/FailoverListener.cpp
@@ -19,27 +19,50 @@
*
*/
#include "FailoverListener.h"
+#include "SessionBase_0_10Access.h"
+#include "qpid/client/SubscriptionManager.h"
namespace qpid {
namespace client {
static const std::string AMQ_FAILOVER("amq.failover");
-FailoverListener::FailoverListener(Connection c)
- : connection(c), session(c.newSession()), subscriptions(session)
-{
+static Session makeSession(boost::shared_ptr<SessionImpl> si) {
+ // Hold only a weak pointer to the ConnectionImpl so a
+ // FailoverListener in a ConnectionImpl won't createa a shared_ptr
+ // cycle.
+ //
+ si->setWeakPtr(true);
+ Session s;
+ SessionBase_0_10Access(s).set(si);
+ return s;
+}
+
+FailoverListener::FailoverListener() {}
+
+void FailoverListener::start(const boost::shared_ptr<ConnectionImpl>& c) {
+ Session session = makeSession(c->newSession(std::string(), 0));
+ if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) {
+ session.close();
+ return;
+ }
+ subscriptions.reset(new SubscriptionManager(session));
std::string qname=AMQ_FAILOVER + "." + session.getId().getName();
- if (session.exchangeQuery(arg::exchange=AMQ_FAILOVER).getType().empty())
- return; // Failover exchange not implemented.
session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
- subscriptions.subscribe(*this, qname, FlowControl::unlimited());
- thread = sys::Thread(subscriptions);
+ subscriptions->subscribe(*this, qname, FlowControl::unlimited());
+ thread = sys::Thread(*subscriptions);
}
-FailoverListener::~FailoverListener() {
- subscriptions.stop();
+void FailoverListener::stop() {
+ if (subscriptions.get()) subscriptions->stop();
if (thread.id()) thread.join();
+ if (subscriptions.get()) subscriptions->getSession().close();
+ thread=sys::Thread();
+ subscriptions.reset();
+}
+FailoverListener::~FailoverListener() {
+ stop();
}
void FailoverListener::received(Message& msg) {
diff --git a/cpp/src/qpid/client/FailoverListener.h b/cpp/src/qpid/client/FailoverListener.h
index 2c06947300..39bea90bb3 100644
--- a/cpp/src/qpid/client/FailoverListener.h
+++ b/cpp/src/qpid/client/FailoverListener.h
@@ -22,10 +22,7 @@
*
*/
-#include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
#include "qpid/client/MessageListener.h"
-#include "qpid/client/SubscriptionManager.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Thread.h"
@@ -34,22 +31,24 @@
namespace qpid {
namespace client {
+class SubscriptionManager;
+
/**
* @internal Listen for failover updates from the amq.failover exchange.
*/
-class FailoverListener : public MessageListener
-{
+class FailoverListener : public MessageListener {
public:
- FailoverListener(Connection);
+ FailoverListener();
~FailoverListener();
+ void start(const boost::shared_ptr<ConnectionImpl>&);
+ void stop();
+
std::vector<Url> getKnownBrokers() const;
void received(Message& msg);
private:
mutable sys::Mutex lock;
- Connection connection;
- Session session;
- SubscriptionManager subscriptions;
+ std::auto_ptr<SubscriptionManager> subscriptions;
sys::Thread thread;
std::vector<Url> knowBrokers;
};
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 2d64492bf7..49dd97e324 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -51,21 +51,19 @@ typedef sys::Monitor::ScopedUnlock UnLock;
typedef sys::ScopedLock<sys::Semaphore> Acquire;
-SessionImpl::SessionImpl(const std::string& name,
- shared_ptr<ConnectionImpl> conn,
- uint16_t ch, uint64_t _maxFrameSize)
+SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> conn)
: state(INACTIVE),
detachedLifetime(0),
- maxFrameSize(_maxFrameSize),
+ maxFrameSize(conn->getNegotiatedSettings().maxFrameSize),
id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name),
- connection(conn),
- ioHandler(*this),
- channel(ch),
- proxy(ioHandler),
+ connectionShared(conn),
+ connectionWeak(conn),
+ weakPtr(false),
+ proxy(out),
nextIn(0),
nextOut(0)
{
- channel.next = connection.get();
+ channel.next = connectionShared.get();
}
SessionImpl::~SessionImpl() {
@@ -78,7 +76,8 @@ SessionImpl::~SessionImpl() {
state.waitWaiters();
}
}
- connection->erase(channel);
+ boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
+ if (c) c->erase(channel);
}
@@ -119,6 +118,8 @@ void SessionImpl::close() //user thread
void SessionImpl::resume(shared_ptr<ConnectionImpl>) // user thread
{
+ // weakPtr sessions should not be resumed.
+ if (weakPtr) return;
throw NotImplementedException("Resume not yet implemented by client!");
}
@@ -251,7 +252,6 @@ void SessionImpl::setExceptionLH(const sys::ExceptionHolder& ex) { // Call with
*/
void SessionImpl::connectionClosed(uint16_t code, const std::string& text) {
setException(createConnectionException(code, text));
- // FIXME aconway 2008-10-07: Should closing a connection detach or close its sessions?
handleClosed();
}
@@ -259,9 +259,7 @@ void SessionImpl::connectionClosed(uint16_t code, const std::string& text) {
* Called by ConnectionImpl to notify active sessions when connection
* is disconnected
*/
-void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text)
-{
- // FIXME aconway 2008-10-07: distinguish disconnect from clean close.
+void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) {
connectionClosed(_code, _text);
}
@@ -426,14 +424,11 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread
void SessionImpl::handleOut(AMQFrame& frame) // user thread
{
- connection->expand(frame.encodedSize(), true);
- channel.handle(frame);
-}
-
-void SessionImpl::proxyOut(AMQFrame& frame) // network thread
-{
- connection->expand(frame.encodedSize(), false);
- channel.handle(frame);
+ boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
+ if (c) {
+ c->expand(frame.encodedSize(), true);
+ channel.handle(frame);
+ }
}
void SessionImpl::deliver(AMQFrame& frame) // network thread
@@ -602,11 +597,11 @@ void SessionImpl::exception(uint16_t errorCode,
const std::string& description,
const framing::FieldTable& /*errorInfo*/)
{
- QPID_LOG(warning, "Exception received from peer: " << errorCode << ":" << description
- << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
-
Lock l(state);
setExceptionLH(createSessionException(errorCode, description));
+ QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what()
+ << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
+
if (detachedLifetime)
setTimeout(0);
}
@@ -648,8 +643,6 @@ void SessionImpl::assertOpen() const
void SessionImpl::handleClosed()
{
- // FIXME aconway 2008-06-12: needs to be set to the correct exception type.
- //
demux.close(exceptionHolder.empty() ? new ClosedException() : exceptionHolder);
results.close();
}
@@ -662,4 +655,16 @@ uint32_t SessionImpl::setTimeout(uint32_t seconds) {
return detachedLifetime;
}
+uint32_t SessionImpl::getTimeout() const {
+ return detachedLifetime;
+}
+
+void SessionImpl::setWeakPtr(bool weak) {
+ weakPtr = weak;
+ if (weakPtr)
+ connectionShared.reset(); // Only keep weak pointer
+ else
+ connectionShared = connectionWeak.lock();
+}
+
}}
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 49a65ed568..54ace77254 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -28,7 +28,8 @@
#include "qpid/SessionId.h"
#include "qpid/SessionState.h"
-#include "qpid/shared_ptr.h"
+#include "boost/shared_ptr.hpp"
+#include "boost/weak_ptr.hpp"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/SequenceNumber.h"
@@ -63,7 +64,7 @@ class SessionImpl : public framing::FrameHandler::InOutHandler,
private framing::AMQP_ClientOperations::ExecutionHandler
{
public:
- SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
+ SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>);
~SessionImpl();
@@ -106,6 +107,11 @@ public:
/** Get timeout in seconds. */
uint32_t getTimeout() const;
+ /** Make this session use a weak_ptr to the ConnectionImpl.
+ * Used for sessions created by the ConnectionImpl itself.
+ */
+ void setWeakPtr(bool weak=true);
+
private:
enum State {
INACTIVE,
@@ -131,7 +137,6 @@ private:
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
- void proxyOut(framing::AMQFrame& frame);
void deliver(framing::AMQFrame& frame);
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
@@ -175,8 +180,11 @@ private:
const uint64_t maxFrameSize;
const SessionId id;
- shared_ptr<ConnectionImpl> connection;
- framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
+ shared_ptr<ConnectionImpl> connection();
+ shared_ptr<ConnectionImpl> connectionShared;
+ boost::weak_ptr<ConnectionImpl> connectionWeak;
+ bool weakPtr;
+
framing::ChannelHandler channel;
framing::AMQP_ServerProxy::Session proxy;
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 3a463d9038..c50e67effa 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -198,10 +198,10 @@ class SubscriptionManager : public sys::Runnable
* Default is to acknowledge every message automatically.
*/
void setAckPolicy(const AckPolicy& autoAck);
- /**
- *
- */
+
AckPolicy& getAckPolicy();
+
+ Session getSession() const { return session; }
};
/** AutoCancel cancels a subscription in its destructor */
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e64692bc91..f4d75b7b6b 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -123,6 +123,8 @@ Cluster::~Cluster() {
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Lock l(lock);
+ // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if not in map?
+ // esp shadow connections? See race comment in getConnection.
assert(!c->isCatchUp());
connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
}
@@ -204,15 +206,18 @@ void Cluster::leave(Lock&) {
}
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) {
- if (connectionId.getMember() == memberId)
- return boost::intrusive_ptr<Connection>(connectionId.getPointer());
ConnectionMap::iterator i = connections.find(connectionId);
- if (i == connections.end()) { // New shadow connection.
- assert(connectionId.getMember() != memberId);
- std::ostringstream mgmtId;
- mgmtId << name.str() << ":" << connectionId;
- ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId));
- i = connections.insert(value).first;
+ if (i == connections.end()) {
+ if (connectionId.getMember() == memberId) { // Closed local connection
+ QPID_LOG(warning, *this << " attempt to use closed connection " << connectionId);
+ return boost::intrusive_ptr<Connection>();
+ }
+ else { // New shadow connection
+ std::ostringstream mgmtId;
+ mgmtId << name.str() << ":" << connectionId;
+ ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId));
+ i = connections.insert(value).first;
+ }
}
return i->second;
}
@@ -261,15 +266,17 @@ void Cluster::process(const Event& e, Lock& l) {
}
}
else { // e.isConnection()
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
- if (e.getType() == DATA) {
- QPID_LOG(trace, *this << " PROC: " << e);
- connection->deliverBuffer(buf);
- }
- else { // control
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
- connection->delivered(frame);
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+ if (connection) { // Ignore if no connection.
+ if (e.getType() == DATA) {
+ QPID_LOG(trace, *this << " PROC: " << e);
+ connection->deliverBuffer(buf);
+ }
+ else { // control
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+ connection->delivered(frame);
+ }
}
}
}
@@ -333,7 +340,7 @@ void Cluster::configChange (
Mutex::ScopedLock l(lock);
QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
- bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
+ map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
if (state == LEFT) return;
if (!map.isAlive(memberId)) { leave(l); return; }
@@ -350,7 +357,7 @@ void Cluster::configChange (
QPID_LOG(debug, *this << " send dump-request " << myUrl);
}
}
- else if (state >= READY && changed)
+ else if (state >= READY)
memberUpdate(l);
}
@@ -408,8 +415,8 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
- if (map.ready(id, Url(url)))
- memberUpdate(l);
+ map.ready(id, Url(url));
+ memberUpdate(l);
}
void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index ed339b2f85..2b079a22bc 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -23,6 +23,7 @@
#include "ClusterMap.h"
#include "Connection.h"
#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/ConnectionAccess.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
@@ -41,14 +42,6 @@
#include <boost/bind.hpp>
namespace qpid {
-
-namespace client {
-struct ConnectionAccess {
- static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; }
- static boost::shared_ptr<ConnectionImpl> getImpl(Connection& c) { return c.impl; }
-};
-} // namespace client
-
namespace cluster {
using broker::Broker;
@@ -169,10 +162,15 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi
void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection);
shadowConnection = catchUpConnection();
+
broker::Connection& bc = dumpConnection->getBrokerConnection();
// FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size,
// authentication etc. See ConnectionSettings.
shadowConnection.open(dumpeeUrl, bc.getUserId());
+
+ // Stop the failover listener as its session will conflict with re-creating-sessions
+ client::ConnectionAccess::getImpl(shadowConnection)->stopFailoverListener();
+
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
dumpConnection->getId().getMember(),
@@ -184,26 +182,21 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn
void DumpClient::dumpSession(broker::SessionHandler& sh) {
QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
<< sh.getSession()->getId());
- broker::SessionState* s = sh.getSession();
- if (!s) return; // no session.
+ broker::SessionState* ss = sh.getSession();
+ if (!ss) return; // no session.
- // Re-create the session.
+ // Create a client session to dump session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
- size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize;
- boost::shared_ptr<client::SessionImpl> simpl(
- new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size));
- cimpl->addSession(simpl);
- simpl->open(sh.getSession()->getTimeout());
+ boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
client::SessionBase_0_10Access(shadowSession).set(simpl);
AMQP_AllProxy::ClusterConnection proxy(simpl->out);
// Re-create session state on remote connection.
- broker::SessionState* ss = sh.getSession();
// For reasons unknown, boost::bind does not work here with boost 1.33.
ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
- // FIXME aconway 2008-09-19: remaining session state.
+ // FIXME aconway 2008-09-19: update remaining session state.
// Reset command-sequence state.
proxy.sessionState(
@@ -216,7 +209,7 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
ss->receiverGetIncomplete()
);
- // FIXME aconway 2008-09-23: session replay list.
+ // FIXME aconway 2008-09-23: update session replay list.
QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());
}
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index cc1af255e4..f53b48ec1e 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -58,9 +58,7 @@ void OutputInterceptor::activateOutput() {
// Called in write thread when the IO layer has no more data to write.
// We do nothing in the write thread, we run doOutput only on delivery
// of doOutput requests.
-bool OutputInterceptor::doOutput() {
- return false;
-}
+bool OutputInterceptor::doOutput() { return false; }
// Delivery of doOutput allows us to run the real connection doOutput()
// which stocks up the write buffers with data.
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 5cfcbc262d..5b9657c2c7 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -22,6 +22,7 @@
#include "BrokerFixture.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/Session.h"
#include "qpid/client/FailoverListener.h"
#include "qpid/cluster/Cluster.h"
@@ -176,47 +177,14 @@ template <class C> set<uint16_t> makeSet(const C& c) {
return s;
}
-std::set<uint16_t> portsFromFailoverArray(const framing::Array& urlArray) {
- std::set<uint16_t> ports;
- for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) {
- Url url((*i)->get<std::string>());
- BOOST_REQUIRE(url.size() > 0);
- BOOST_REQUIRE(url[0].get<TcpAddress>());
- ports.insert(url[0].get<TcpAddress>()->port);
- }
- return ports;
-}
-
-std::set<uint16_t> portsFromFailoverMessage(const Message& m) {
- framing::Array urlArray;
- m.getHeaders().getArray("amq.failover", urlArray);
- return portsFromFailoverArray(urlArray);
-}
-
-QPID_AUTO_TEST_CASE(FailoverExchange) {
- ClusterFixture cluster(2);
- Client c0(cluster[0], "c0");
- c0.session.queueDeclare("q");
- c0.session.exchangeBind(arg::queue="q", arg::exchange="amq.failover");
-
- Message m;
- BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverMessage(m));
-
- cluster.add();
- BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(makeSet(cluster),portsFromFailoverMessage(m));
-}
-
-std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, size_t n) {
- // Wait till there are n ports in the list.
- vector<Url> kb = fl.getKnownBrokers();
- for (size_t retry=1000; kb.size() != n && retry != 0; --retry) {
+template <class T> std::set<uint16_t> knownBrokerPorts(T& source, size_t n) {
+ vector<Url> urls = source.getKnownBrokers();
+ for (size_t retry=1000; urls.size() != n && retry != 0; --retry) {
::usleep(1000);
- kb = fl.getKnownBrokers();
+ urls = source.getKnownBrokers();
}
set<uint16_t> s;
- for (vector<Url>::const_iterator i = kb.begin(); i != kb.end(); ++i) {
+ for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
BOOST_MESSAGE("Failover URL: " << *i);
BOOST_CHECK(i->size() >= 1);
BOOST_CHECK((*i)[0].get<TcpAddress>());
@@ -226,17 +194,29 @@ std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, size_t
}
QPID_AUTO_TEST_CASE(testFailoverListener) {
- ClusterFixture cluster(1);
+ ClusterFixture cluster(2);
Client c0(cluster[0], "c0");
- FailoverListener fl(c0.connection);
+ FailoverListener fl;
+ fl.start(ConnectionAccess::getImpl(c0.connection));
+ set<uint16_t> set0=makeSet(cluster);
+ BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2));
+ cluster.add();
+ BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(fl, 3));
+ cluster.kill(2);
+ BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2));
+}
+
+QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
+ ClusterFixture cluster(2);
+ Client c0(cluster[0], "c0");
set<uint16_t> set0=makeSet(cluster);
- BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1));
+ BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2));
cluster.add();
- BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverListener(fl, 2));
- cluster.kill(1);
- BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1));
+ BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(c0.connection, 3));
+ cluster.kill(2);
+ BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2));
}
QPID_AUTO_TEST_CASE(DumpConsumers) {