diff options
| author | Alan Conway <aconway@apache.org> | 2012-05-08 15:04:51 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-05-08 15:04:51 +0000 |
| commit | 1f98f2d4626151dbbeb3ef5b88186b3533e67262 (patch) | |
| tree | a7703ab1bcf75a1dc36901f0363c76d3fc7225ef /qpid/cpp | |
| parent | df7cc99c47c1917c736691a71c0b50eb08ebfa9f (diff) | |
| download | qpid-python-1f98f2d4626151dbbeb3ef5b88186b3533e67262.tar.gz | |
QPID-3603: Add ErrorListener to Bridge to handle session errors.
Will be used in HA code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1335563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 26 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.h | 16 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 18 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.cpp | 36 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.h | 21 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 86 |
6 files changed, 175 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 53fe38a504..1f6aae7111 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -90,8 +90,7 @@ void Bridge::create(Connection& c) FieldTable options; if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); SessionHandler& sessionHandler = c.getChannel(channel); - sessionHandler.setDetachedCallback( - boost::bind(&Bridge::sessionDetached, shared_from_this())); + sessionHandler.setErrorListener(shared_from_this()); if (args.i_srcIsLocal) { if (args.i_dynamic) throw Exception("Dynamic routing not supported for push routes"); @@ -377,8 +376,29 @@ const string& Bridge::getLocalTag() const { return link->getBroker()->getFederationTag(); } -void Bridge::sessionDetached() { + +// SessionHandler::ErrorListener methods. +void Bridge::connectionException( + framing::connection::CloseCode code, const std::string& msg) +{ + if (errorListener) errorListener->connectionException(code, msg); +} + +void Bridge::channelException( + framing::session::DetachCode code, const std::string& msg) +{ + if (errorListener) errorListener->channelException(code, msg); +} + +void Bridge::executionException( + framing::execution::ErrorCode code, const std::string& msg) +{ + if (errorListener) errorListener->executionException(code, msg); +} + +void Bridge::detach() { detached = true; + if (errorListener) errorListener->detach(); } std::string Bridge::createName(const std::string& linkName, diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 2cf07d3a94..b980abc900 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -29,6 +29,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/SessionHandler.h" #include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h" #include "qmf/org/apache/qpid/broker/Bridge.h" @@ -43,14 +44,14 @@ class Connection; class ConnectionState; class Link; class LinkRegistry; -class SessionHandler; class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge, + public SessionHandler::ErrorListener, public boost::enable_shared_from_this<Bridge> { -public: + public: typedef boost::shared_ptr<Bridge> shared_ptr; typedef boost::function<void(Bridge*)> CancellationListener; typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback; @@ -104,10 +105,14 @@ public: const std::string& dest, const std::string& key); -private: - // Callback when the bridge's session is detached. - void sessionDetached(); + // SessionHandler::ErrorListener methods. + void connectionException(framing::connection::CloseCode code, const std::string& msg); + void channelException(framing::session::DetachCode, const std::string& msg); + void executionException(framing::execution::ErrorCode, const std::string& msg); + void detach(); + void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; } + private: struct PushHandler : framing::FrameHandler { PushHandler(Connection* c) { conn = c; } void handle(framing::AMQFrame& frame); @@ -138,6 +143,7 @@ private: void cancel(Connection& c); void closed(); friend class Link; // to call create, cancel, closed() + boost::shared_ptr<ErrorListener> errorListener; }; diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index b605ca71e5..467d422721 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -267,13 +267,20 @@ void Link::setUrl(const Url& u) { namespace { - /** invoked when session used to subscribe to remote's amq.failover exchange detaches */ - void sessionDetached(Link *link) { - QPID_LOG(notice, "detached from 'amq.failover' for link: " << link->getName()); +class DetachedCallback : public SessionHandler::ErrorListener { + public: + DetachedCallback(const Link& link) : name(link.getName()) {} + void connectionException(framing::connection::CloseCode, const std::string&) {} + void channelException(framing::session::DetachCode, const std::string&) {} + void executionException(framing::execution::ErrorCode, const std::string&) {} + void detach() { + QPID_LOG(notice, "detached from 'amq.failover' for link: " << name); } + private: + const std::string name; +}; } - void Link::opened() { Mutex::ScopedLock mutex(lock); if (!connection) return; @@ -301,7 +308,8 @@ void Link::opened() { failoverChannel = nextChannel(); SessionHandler& sessionHandler = connection->getChannel(failoverChannel); - sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) ); + sessionHandler.setErrorListener( + boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this))); failoverSession = queueName; sessionHandler.attachAs(failoverSession); diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index b58c7c01c5..23fa2ee0ca 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -35,23 +35,39 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch) : amqp_0_10::SessionHandler(&c.getOutput(), ch), connection(c), proxy(out), - clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0) + clusterOrderProxy(c.getClusterOrderOutput() ? + new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0) {} SessionHandler::~SessionHandler() {} -void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) { +void SessionHandler::connectionException( + framing::connection::CloseCode code, const std::string& msg) +{ // NOTE: must tell the error listener _before_ calling connection.close() - if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg); + if (connection.getErrorListener()) + connection.getErrorListener()->connectionError(msg); + if (errorListener) + errorListener->connectionException(code, msg); connection.close(code, msg); } -void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) { - if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg); +void SessionHandler::channelException( + framing::session::DetachCode code, const std::string& msg) +{ + if (connection.getErrorListener()) + connection.getErrorListener()->sessionError(getChannel(), msg); + if (errorListener) + errorListener->channelException(code, msg); } -void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) { - if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg); +void SessionHandler::executionException( + framing::execution::ErrorCode code, const std::string& msg) +{ + if (connection.getErrorListener()) + connection.getErrorListener()->sessionError(getChannel(), msg); + if (errorListener) + errorListener->executionException(code, msg); } ConnectionState& SessionHandler::getConnection() { return connection; } @@ -64,7 +80,7 @@ void SessionHandler::handleDetach() { if (session.get()) connection.getBroker().getSessionManager().detach(session); assert(!session.get()); - if (detachedCallback) detachedCallback(); + if (errorListener) errorListener->detach(); connection.closeChannel(channel.get()); } @@ -118,8 +134,4 @@ void SessionHandler::attached(const std::string& name) } } -void SessionHandler::setDetachedCallback(boost::function<void()> cb) { - detachedCallback = cb; -} - }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index 4e2cfaa963..ab87cf41a4 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -25,7 +25,7 @@ #include "qpid/amqp_0_10/SessionHandler.h" #include "qpid/broker/SessionHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" -#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> namespace qpid { class SessionState; @@ -43,6 +43,21 @@ class SessionState; */ class SessionHandler : public amqp_0_10::SessionHandler { public: + class ErrorListener { + public: + virtual ~ErrorListener() {} + virtual void connectionException( + framing::connection::CloseCode code, const std::string& msg) = 0; + virtual void channelException( + framing::session::DetachCode, const std::string& msg) = 0; + virtual void executionException( + framing::execution::ErrorCode, const std::string& msg) = 0; + /** Called when it is safe to delete the ErrorListener. */ + virtual void detach() = 0; + }; + + /** + *@param e must not be deleted until ErrorListener::detach has been called */ SessionHandler(Connection&, framing::ChannelId); ~SessionHandler(); @@ -71,7 +86,7 @@ class SessionHandler : public amqp_0_10::SessionHandler { void attached(const std::string& name);//used by 'pushing' inter-broker bridges void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges - void setDetachedCallback(boost::function<void()> cb); + void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; } protected: virtual void setState(const std::string& sessionName, bool force); @@ -94,7 +109,7 @@ class SessionHandler : public amqp_0_10::SessionHandler { framing::AMQP_ClientProxy proxy; std::auto_ptr<SessionState> session; std::auto_ptr<SetChannelProxy> clusterOrderProxy; - boost::function<void ()> detachedCallback; + boost::shared_ptr<ErrorListener> errorListener; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp new file mode 100644 index 0000000000..bf17d27ca3 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Backup.h" +#include "ConnectionExcluder.h" +#include "HaBroker.h" +#include "Primary.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/FieldTable.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace ha { + +Primary* Primary::instance = 0; + +Primary::Primary(HaBroker& b) : + haBroker(b), logPrefix(b), + expected(b.getSettings().expectedBackups), + unready(0), activated(false) +{ + instance = this; // Let queue replicators find us. + if (expected == 0) // No need for ready check + activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor. + else { + // Set up the primary-ready check: ready when all queues have + // expected number of backups. Note clients are excluded at this point + // so dont't have to worry about changes to the set of queues. + HaBroker::QueueNames names = haBroker.getActiveBackups(); + for (HaBroker::QueueNames::const_iterator i = names.begin(); i != names.end(); ++i) + { + queues[*i] = 0; + ++unready; + QPID_LOG(debug, logPrefix << "Need backup of " << *i + << ", " << unready << " unready queues"); + } + QPID_LOG(critical, "FIXME Primary " << queues.size() << " queues"); + if (queues.empty()) + activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor. + else { + QPID_LOG(debug, logPrefix << "Waiting for " << expected + << " backups on " << queues.size() << " queues"); + // Allow backups to connect. + haBroker.getExcluder()->setBackupAllowed(true); + } + } +} + +void Primary::addReplica(const std::string& q) { + sys::Mutex::ScopedLock l(lock); + if (!activated) { + QueueCounts::iterator i = queues.find(q); + if (i != queues.end()) { + ++i->second; + if (i->second == expected) --unready; + QPID_LOG(debug, logPrefix << i->second << " backups caught up on " << q + << ", " << unready << " unready queues"); + if (unready == 0) activate(l); + } + } +} + +void Primary::activate(sys::Mutex::ScopedLock&) { + activated = true; + haBroker.activate(); +} + +}} // namespace qpid::ha |
