diff options
author | Alan Conway <aconway@apache.org> | 2008-10-09 19:36:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-09 19:36:51 +0000 |
commit | d6901e52ab3ee9c40eddc4ad3b4787127c36d874 (patch) | |
tree | 85b9ba2e0d0922be150480392ec1b706a6df5cd0 | |
parent | 016ae5acebab0eaf6dd70f5d4d653fdfee93925d (diff) | |
download | qpid-python-d6901e52ab3ee9c40eddc4ad3b4787127c36d874.tar.gz |
Client-side support for amq.faiover exchange. Connection::getKnownBrokers provides latest list.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703237 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionAccess.h | 41 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 29 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverListener.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverListener.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 59 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 49 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 68 |
16 files changed, 245 insertions, 162 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 149c1d91e6..86660b89ac 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -390,6 +390,7 @@ libqpidclient_la_SOURCES = \ qpid/client/SessionBase_0_10.cpp \ qpid/client/SessionBase_0_10.h \ qpid/client/SessionBase_0_10Access.h \ + qpid/client/ConnectionAccess.h \ qpid/client/SessionImpl.cpp \ qpid/client/StateManager.cpp \ qpid/client/SubscriptionManager.cpp diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 4b92b4968f..ac75b5c5ff 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -147,7 +147,7 @@ void SessionState::senderRecordKnownCompleted() { void SessionState::senderConfirmed(const SessionPoint& confirmed) { if (confirmed > sender.sendPoint) - throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed commands not yet sent.")); + throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed < " << confirmed << " but only sent < " << sender.sendPoint)); QPID_LOG(debug, getId() << ": sender confirmed point moved to " << confirmed); ReplayList::iterator i = sender.replayList.begin(); while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) { @@ -169,7 +169,7 @@ void SessionState::senderCompleted(const SequenceSet& commands) { QPID_LOG(debug, getId() << ": sender marked completed: " << commands); sender.incomplete -= commands; // Completion implies confirmation but we don't handle out-of-order - // confirmation, so confirm only the first contiguous range of commands. + // confirmation, so confirm up to the end of the first contiguous range of commands. senderConfirmed(SessionPoint(commands.rangesBegin()->end())); } diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index b8b4f9ccf3..27706fab8c 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -44,7 +44,7 @@ using namespace qpid::sys; namespace qpid { namespace client { -Connection::Connection() : channelIdCounter(0), version(framing::highestProtocolVersion) {} +Connection::Connection() : version(framing::highestProtocolVersion) {} Connection::~Connection(){ } @@ -106,26 +106,19 @@ void Connection::open(const ConnectionSettings& settings) impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings)); impl->open(); - max_frame_size = impl->getNegotiatedSettings().maxFrameSize; } -Session Connection::newSession(const std::string& name) { +Session Connection::newSession(const std::string& name, uint32_t timeout) { if (!isOpen()) throw Exception(QPID_MSG("Connection has not yet been opened")); - shared_ptr<SessionImpl> simpl( - new SessionImpl(name, impl, ++channelIdCounter, max_frame_size)); - impl->addSession(simpl); - simpl->open(0); Session s; - SessionBase_0_10Access(s).set(simpl); + SessionBase_0_10Access(s).set(impl->newSession(name, timeout)); return s; } void Connection::resume(Session& session) { if (!isOpen()) throw Exception(QPID_MSG("Connection is not open.")); - - session.impl->setChannel(++channelIdCounter); impl->addSession(session.impl); session.impl->resume(impl); } @@ -134,4 +127,8 @@ void Connection::close() { impl->close(); } +std::vector<Url> Connection::getKnownBrokers() { + return isOpen() ? impl->getKnownBrokers() : std::vector<Url>(); +} + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index a1575dd524..a5ea40ff38 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -32,6 +32,7 @@ class Url; namespace client { class ConnectionSettings; +class ConnectionImpl; /** * Represents a connection to an AMQP broker. All communication is @@ -42,9 +43,7 @@ class ConnectionSettings; */ class Connection { - framing::ChannelId channelIdCounter; framing::ProtocolVersion version; - uint16_t max_frame_size; protected: boost::shared_ptr<ConnectionImpl> impl; @@ -55,6 +54,7 @@ class Connection * @see open() */ Connection(); + ~Connection(); /** @@ -157,7 +157,7 @@ class Connection * If the name is empty (the default) then a unique name will be * chosen using a Universally-unique identifier (UUID) algorithm. */ - Session newSession(const std::string& name=std::string()); + Session newSession(const std::string& name=std::string(), uint32_t timeoutSeconds = 0); /** * Resume a suspended session. A session may be resumed @@ -167,7 +167,8 @@ class Connection bool isOpen() const; - + std::vector<Url> getKnownBrokers(); + friend class ConnectionAccess; ///<@internal friend class SessionBase_0_10; ///<@internal }; diff --git a/cpp/src/qpid/client/ConnectionAccess.h b/cpp/src/qpid/client/ConnectionAccess.h new file mode 100644 index 0000000000..b662fd5d8b --- /dev/null +++ b/cpp/src/qpid/client/ConnectionAccess.h @@ -0,0 +1,41 @@ +#ifndef QPID_CLIENT_CONNECTIONACCESS_H +#define QPID_CLIENT_CONNECTIONACCESS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/Connection.h" + +/**@file @internal Internal use only */ + +namespace qpid { +namespace client { + + + +struct ConnectionAccess { + static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; } + static boost::shared_ptr<ConnectionImpl> getImpl(Connection& c) { return c.impl; } +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_CONNECTIONACCESS_H*/ diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index d9ac65c1b3..910c908ee2 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -22,8 +22,10 @@ #include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" +#include "FailoverListener.h" #include "qpid/log/Statement.h" +#include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" @@ -40,7 +42,9 @@ using namespace qpid::framing::connection;//for connection error codes ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), - version(v) + failover(new FailoverListener()), + version(v), + nextChannel(1) { QPID_LOG(debug, "ConnectionImpl created for " << version); handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); @@ -56,12 +60,14 @@ ConnectionImpl::~ConnectionImpl() { // Important to close the connector first, to ensure the // connector thread does not call on us while the destructor // is running. - if (connector) connector->close(); + failover.reset(); + if (connector) connector->close(); } -void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) +void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel) { Mutex::ScopedLock l(lock); + session->setChannel(channel ? channel : nextChannel++); boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()]; if (s.lock()) throw SessionBusyException(); s = session; @@ -102,7 +108,9 @@ void ConnectionImpl::open() connector->setShutdownHandler(this); connector->connect(host, port); connector->init(); - handler.waitForOpen(); + handler.waitForOpen(); + + if (failover.get()) failover->start(shared_from_this()); } void ConnectionImpl::idleIn() @@ -162,3 +170,16 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() return handler; } +std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { + // FIXME aconway 2008-10-08: initialize failover list from openOk or settings + return failover ? failover->getKnownBrokers() : std::vector<qpid::Url>(); +} + +boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) { + boost::shared_ptr<SessionImpl> simpl(new SessionImpl(name, shared_from_this())); + addSession(simpl, channel); + simpl->open(timeout); + return simpl; +} + +void ConnectionImpl::stopFailoverListener() { failover.reset(); } diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index aca26b963d..22450a7ddf 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -24,6 +24,7 @@ #include "Bounds.h" #include "ConnectionHandler.h" + #include "qpid/framing/FrameHandler.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" @@ -41,6 +42,7 @@ namespace client { class Connector; class ConnectionSettings; class SessionImpl; +class FailoverListener; class ConnectionImpl : public Bounds, public framing::FrameHandler, @@ -54,7 +56,9 @@ class ConnectionImpl : public Bounds, SessionMap sessions; ConnectionHandler handler; boost::scoped_ptr<Connector> connector; + boost::scoped_ptr<FailoverListener> failover; framing::ProtocolVersion version; + uint16_t nextChannel; sys::Mutex lock; template <class F> void closeInternal(const F&); @@ -72,13 +76,18 @@ class ConnectionImpl : public Bounds, void open(); bool isOpen() const; - void addSession(const boost::shared_ptr<SessionImpl>&); + boost::shared_ptr<SessionImpl> newSession(const std::string& name, uint32_t timeout, uint16_t channel=0); + void addSession(const boost::shared_ptr<SessionImpl>&, uint16_t channel=0); void close(); void handle(framing::AMQFrame& frame); void erase(uint16_t channel); - + void stopFailoverListener(); + const ConnectionSettings& getNegotiatedSettings(); + + std::vector<Url> getKnownBrokers(); + }; }} diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp index 95c137d922..3254686d9c 100644 --- a/cpp/src/qpid/client/FailoverListener.cpp +++ b/cpp/src/qpid/client/FailoverListener.cpp @@ -19,27 +19,50 @@ * */ #include "FailoverListener.h" +#include "SessionBase_0_10Access.h" +#include "qpid/client/SubscriptionManager.h" namespace qpid { namespace client { static const std::string AMQ_FAILOVER("amq.failover"); -FailoverListener::FailoverListener(Connection c) - : connection(c), session(c.newSession()), subscriptions(session) -{ +static Session makeSession(boost::shared_ptr<SessionImpl> si) { + // Hold only a weak pointer to the ConnectionImpl so a + // FailoverListener in a ConnectionImpl won't createa a shared_ptr + // cycle. + // + si->setWeakPtr(true); + Session s; + SessionBase_0_10Access(s).set(si); + return s; +} + +FailoverListener::FailoverListener() {} + +void FailoverListener::start(const boost::shared_ptr<ConnectionImpl>& c) { + Session session = makeSession(c->newSession(std::string(), 0)); + if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) { + session.close(); + return; + } + subscriptions.reset(new SubscriptionManager(session)); std::string qname=AMQ_FAILOVER + "." + session.getId().getName(); - if (session.exchangeQuery(arg::exchange=AMQ_FAILOVER).getType().empty()) - return; // Failover exchange not implemented. session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true); session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER); - subscriptions.subscribe(*this, qname, FlowControl::unlimited()); - thread = sys::Thread(subscriptions); + subscriptions->subscribe(*this, qname, FlowControl::unlimited()); + thread = sys::Thread(*subscriptions); } -FailoverListener::~FailoverListener() { - subscriptions.stop(); +void FailoverListener::stop() { + if (subscriptions.get()) subscriptions->stop(); if (thread.id()) thread.join(); + if (subscriptions.get()) subscriptions->getSession().close(); + thread=sys::Thread(); + subscriptions.reset(); +} +FailoverListener::~FailoverListener() { + stop(); } void FailoverListener::received(Message& msg) { diff --git a/cpp/src/qpid/client/FailoverListener.h b/cpp/src/qpid/client/FailoverListener.h index 2c06947300..39bea90bb3 100644 --- a/cpp/src/qpid/client/FailoverListener.h +++ b/cpp/src/qpid/client/FailoverListener.h @@ -22,10 +22,7 @@ * */ -#include "qpid/client/Connection.h" -#include "qpid/client/Session.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/SubscriptionManager.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Thread.h" @@ -34,22 +31,24 @@ namespace qpid { namespace client { +class SubscriptionManager; + /** * @internal Listen for failover updates from the amq.failover exchange. */ -class FailoverListener : public MessageListener -{ +class FailoverListener : public MessageListener { public: - FailoverListener(Connection); + FailoverListener(); ~FailoverListener(); + void start(const boost::shared_ptr<ConnectionImpl>&); + void stop(); + std::vector<Url> getKnownBrokers() const; void received(Message& msg); private: mutable sys::Mutex lock; - Connection connection; - Session session; - SubscriptionManager subscriptions; + std::auto_ptr<SubscriptionManager> subscriptions; sys::Thread thread; std::vector<Url> knowBrokers; }; diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 2d64492bf7..49dd97e324 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -51,21 +51,19 @@ typedef sys::Monitor::ScopedUnlock UnLock; typedef sys::ScopedLock<sys::Semaphore> Acquire; -SessionImpl::SessionImpl(const std::string& name, - shared_ptr<ConnectionImpl> conn, - uint16_t ch, uint64_t _maxFrameSize) +SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> conn) : state(INACTIVE), detachedLifetime(0), - maxFrameSize(_maxFrameSize), + maxFrameSize(conn->getNegotiatedSettings().maxFrameSize), id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name), - connection(conn), - ioHandler(*this), - channel(ch), - proxy(ioHandler), + connectionShared(conn), + connectionWeak(conn), + weakPtr(false), + proxy(out), nextIn(0), nextOut(0) { - channel.next = connection.get(); + channel.next = connectionShared.get(); } SessionImpl::~SessionImpl() { @@ -78,7 +76,8 @@ SessionImpl::~SessionImpl() { state.waitWaiters(); } } - connection->erase(channel); + boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock(); + if (c) c->erase(channel); } @@ -119,6 +118,8 @@ void SessionImpl::close() //user thread void SessionImpl::resume(shared_ptr<ConnectionImpl>) // user thread { + // weakPtr sessions should not be resumed. + if (weakPtr) return; throw NotImplementedException("Resume not yet implemented by client!"); } @@ -251,7 +252,6 @@ void SessionImpl::setExceptionLH(const sys::ExceptionHolder& ex) { // Call with */ void SessionImpl::connectionClosed(uint16_t code, const std::string& text) { setException(createConnectionException(code, text)); - // FIXME aconway 2008-10-07: Should closing a connection detach or close its sessions? handleClosed(); } @@ -259,9 +259,7 @@ void SessionImpl::connectionClosed(uint16_t code, const std::string& text) { * Called by ConnectionImpl to notify active sessions when connection * is disconnected */ -void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) -{ - // FIXME aconway 2008-10-07: distinguish disconnect from clean close. +void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) { connectionClosed(_code, _text); } @@ -426,14 +424,11 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread void SessionImpl::handleOut(AMQFrame& frame) // user thread { - connection->expand(frame.encodedSize(), true); - channel.handle(frame); -} - -void SessionImpl::proxyOut(AMQFrame& frame) // network thread -{ - connection->expand(frame.encodedSize(), false); - channel.handle(frame); + boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock(); + if (c) { + c->expand(frame.encodedSize(), true); + channel.handle(frame); + } } void SessionImpl::deliver(AMQFrame& frame) // network thread @@ -602,11 +597,11 @@ void SessionImpl::exception(uint16_t errorCode, const std::string& description, const framing::FieldTable& /*errorInfo*/) { - QPID_LOG(warning, "Exception received from peer: " << errorCode << ":" << description - << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); - Lock l(state); setExceptionLH(createSessionException(errorCode, description)); + QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() + << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); + if (detachedLifetime) setTimeout(0); } @@ -648,8 +643,6 @@ void SessionImpl::assertOpen() const void SessionImpl::handleClosed() { - // FIXME aconway 2008-06-12: needs to be set to the correct exception type. - // demux.close(exceptionHolder.empty() ? new ClosedException() : exceptionHolder); results.close(); } @@ -662,4 +655,16 @@ uint32_t SessionImpl::setTimeout(uint32_t seconds) { return detachedLifetime; } +uint32_t SessionImpl::getTimeout() const { + return detachedLifetime; +} + +void SessionImpl::setWeakPtr(bool weak) { + weakPtr = weak; + if (weakPtr) + connectionShared.reset(); // Only keep weak pointer + else + connectionShared = connectionWeak.lock(); +} + }} diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 49a65ed568..54ace77254 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -28,7 +28,8 @@ #include "qpid/SessionId.h" #include "qpid/SessionState.h" -#include "qpid/shared_ptr.h" +#include "boost/shared_ptr.hpp" +#include "boost/weak_ptr.hpp" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/ChannelHandler.h" #include "qpid/framing/SequenceNumber.h" @@ -63,7 +64,7 @@ class SessionImpl : public framing::FrameHandler::InOutHandler, private framing::AMQP_ClientOperations::ExecutionHandler { public: - SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); + SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>); ~SessionImpl(); @@ -106,6 +107,11 @@ public: /** Get timeout in seconds. */ uint32_t getTimeout() const; + /** Make this session use a weak_ptr to the ConnectionImpl. + * Used for sessions created by the ConnectionImpl itself. + */ + void setWeakPtr(bool weak=true); + private: enum State { INACTIVE, @@ -131,7 +137,6 @@ private: void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); - void proxyOut(framing::AMQFrame& frame); void deliver(framing::AMQFrame& frame); Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); @@ -175,8 +180,11 @@ private: const uint64_t maxFrameSize; const SessionId id; - shared_ptr<ConnectionImpl> connection; - framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler; + shared_ptr<ConnectionImpl> connection(); + shared_ptr<ConnectionImpl> connectionShared; + boost::weak_ptr<ConnectionImpl> connectionWeak; + bool weakPtr; + framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 3a463d9038..c50e67effa 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -198,10 +198,10 @@ class SubscriptionManager : public sys::Runnable * Default is to acknowledge every message automatically. */ void setAckPolicy(const AckPolicy& autoAck); - /** - * - */ + AckPolicy& getAckPolicy(); + + Session getSession() const { return session; } }; /** AutoCancel cancels a subscription in its destructor */ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e64692bc91..f4d75b7b6b 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -123,6 +123,8 @@ Cluster::~Cluster() { void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Lock l(lock); + // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if not in map? + // esp shadow connections? See race comment in getConnection. assert(!c->isCatchUp()); connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); } @@ -204,15 +206,18 @@ void Cluster::leave(Lock&) { } boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) { - if (connectionId.getMember() == memberId) - return boost::intrusive_ptr<Connection>(connectionId.getPointer()); ConnectionMap::iterator i = connections.find(connectionId); - if (i == connections.end()) { // New shadow connection. - assert(connectionId.getMember() != memberId); - std::ostringstream mgmtId; - mgmtId << name.str() << ":" << connectionId; - ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); - i = connections.insert(value).first; + if (i == connections.end()) { + if (connectionId.getMember() == memberId) { // Closed local connection + QPID_LOG(warning, *this << " attempt to use closed connection " << connectionId); + return boost::intrusive_ptr<Connection>(); + } + else { // New shadow connection + std::ostringstream mgmtId; + mgmtId << name.str() << ":" << connectionId; + ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); + i = connections.insert(value).first; + } } return i->second; } @@ -261,15 +266,17 @@ void Cluster::process(const Event& e, Lock& l) { } } else { // e.isConnection() - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); - if (e.getType() == DATA) { - QPID_LOG(trace, *this << " PROC: " << e); - connection->deliverBuffer(buf); - } - else { // control - while (frame.decode(buf)) { - QPID_LOG(trace, *this << " PROC: " << e << " " << frame); - connection->delivered(frame); + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); + if (connection) { // Ignore if no connection. + if (e.getType() == DATA) { + QPID_LOG(trace, *this << " PROC: " << e); + connection->deliverBuffer(buf); + } + else { // control + while (frame.decode(buf)) { + QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + connection->delivered(frame); + } } } } @@ -333,7 +340,7 @@ void Cluster::configChange ( Mutex::ScopedLock l(lock); QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); - bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined); + map.configChange(current, nCurrent, left, nLeft, joined, nJoined); if (state == LEFT) return; if (!map.isAlive(memberId)) { leave(l); return; } @@ -350,7 +357,7 @@ void Cluster::configChange ( QPID_LOG(debug, *this << " send dump-request " << myUrl); } } - else if (state >= READY && changed) + else if (state >= READY) memberUpdate(l); } @@ -408,8 +415,8 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { - if (map.ready(id, Url(url))) - memberUpdate(l); + map.ready(id, Url(url)); + memberUpdate(l); } void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index ed339b2f85..2b079a22bc 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -23,6 +23,7 @@ #include "ClusterMap.h" #include "Connection.h" #include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/ConnectionAccess.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" @@ -41,14 +42,6 @@ #include <boost/bind.hpp> namespace qpid { - -namespace client { -struct ConnectionAccess { - static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; } - static boost::shared_ptr<ConnectionImpl> getImpl(Connection& c) { return c.impl; } -}; -} // namespace client - namespace cluster { using broker::Broker; @@ -169,10 +162,15 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) { QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection); shadowConnection = catchUpConnection(); + broker::Connection& bc = dumpConnection->getBrokerConnection(); // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size, // authentication etc. See ConnectionSettings. shadowConnection.open(dumpeeUrl, bc.getUserId()); + + // Stop the failover listener as its session will conflict with re-creating-sessions + client::ConnectionAccess::getImpl(shadowConnection)->stopFailoverListener(); + dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( dumpConnection->getId().getMember(), @@ -184,26 +182,21 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn void DumpClient::dumpSession(broker::SessionHandler& sh) { QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " << sh.getSession()->getId()); - broker::SessionState* s = sh.getSession(); - if (!s) return; // no session. + broker::SessionState* ss = sh.getSession(); + if (!ss) return; // no session. - // Re-create the session. + // Create a client session to dump session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); - size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize; - boost::shared_ptr<client::SessionImpl> simpl( - new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size)); - cimpl->addSession(simpl); - simpl->open(sh.getSession()->getTimeout()); + boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); client::SessionBase_0_10Access(shadowSession).set(simpl); AMQP_AllProxy::ClusterConnection proxy(simpl->out); // Re-create session state on remote connection. - broker::SessionState* ss = sh.getSession(); // For reasons unknown, boost::bind does not work here with boost 1.33. ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); - // FIXME aconway 2008-09-19: remaining session state. + // FIXME aconway 2008-09-19: update remaining session state. // Reset command-sequence state. proxy.sessionState( @@ -216,7 +209,7 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { ss->receiverGetIncomplete() ); - // FIXME aconway 2008-09-23: session replay list. + // FIXME aconway 2008-09-23: update session replay list. QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId()); } diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index cc1af255e4..f53b48ec1e 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -58,9 +58,7 @@ void OutputInterceptor::activateOutput() { // Called in write thread when the IO layer has no more data to write. // We do nothing in the write thread, we run doOutput only on delivery // of doOutput requests. -bool OutputInterceptor::doOutput() { - return false; -} +bool OutputInterceptor::doOutput() { return false; } // Delivery of doOutput allows us to run the real connection doOutput() // which stocks up the write buffers with data. diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 5cfcbc262d..5b9657c2c7 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -22,6 +22,7 @@ #include "BrokerFixture.h" #include "qpid/client/Connection.h" +#include "qpid/client/ConnectionAccess.h" #include "qpid/client/Session.h" #include "qpid/client/FailoverListener.h" #include "qpid/cluster/Cluster.h" @@ -176,47 +177,14 @@ template <class C> set<uint16_t> makeSet(const C& c) { return s; } -std::set<uint16_t> portsFromFailoverArray(const framing::Array& urlArray) { - std::set<uint16_t> ports; - for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) { - Url url((*i)->get<std::string>()); - BOOST_REQUIRE(url.size() > 0); - BOOST_REQUIRE(url[0].get<TcpAddress>()); - ports.insert(url[0].get<TcpAddress>()->port); - } - return ports; -} - -std::set<uint16_t> portsFromFailoverMessage(const Message& m) { - framing::Array urlArray; - m.getHeaders().getArray("amq.failover", urlArray); - return portsFromFailoverArray(urlArray); -} - -QPID_AUTO_TEST_CASE(FailoverExchange) { - ClusterFixture cluster(2); - Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q"); - c0.session.exchangeBind(arg::queue="q", arg::exchange="amq.failover"); - - Message m; - BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverMessage(m)); - - cluster.add(); - BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(makeSet(cluster),portsFromFailoverMessage(m)); -} - -std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, size_t n) { - // Wait till there are n ports in the list. - vector<Url> kb = fl.getKnownBrokers(); - for (size_t retry=1000; kb.size() != n && retry != 0; --retry) { +template <class T> std::set<uint16_t> knownBrokerPorts(T& source, size_t n) { + vector<Url> urls = source.getKnownBrokers(); + for (size_t retry=1000; urls.size() != n && retry != 0; --retry) { ::usleep(1000); - kb = fl.getKnownBrokers(); + urls = source.getKnownBrokers(); } set<uint16_t> s; - for (vector<Url>::const_iterator i = kb.begin(); i != kb.end(); ++i) { + for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) { BOOST_MESSAGE("Failover URL: " << *i); BOOST_CHECK(i->size() >= 1); BOOST_CHECK((*i)[0].get<TcpAddress>()); @@ -226,17 +194,29 @@ std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, size_t } QPID_AUTO_TEST_CASE(testFailoverListener) { - ClusterFixture cluster(1); + ClusterFixture cluster(2); Client c0(cluster[0], "c0"); - FailoverListener fl(c0.connection); + FailoverListener fl; + fl.start(ConnectionAccess::getImpl(c0.connection)); + set<uint16_t> set0=makeSet(cluster); + BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2)); + cluster.add(); + BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(fl, 3)); + cluster.kill(2); + BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2)); +} + +QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); set<uint16_t> set0=makeSet(cluster); - BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1)); + BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2)); cluster.add(); - BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverListener(fl, 2)); - cluster.kill(1); - BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1)); + BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(c0.connection, 3)); + cluster.kill(2); + BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2)); } QPID_AUTO_TEST_CASE(DumpConsumers) { |