summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-08 15:04:51 +0000
committerAlan Conway <aconway@apache.org>2012-05-08 15:04:51 +0000
commit1f98f2d4626151dbbeb3ef5b88186b3533e67262 (patch)
treea7703ab1bcf75a1dc36901f0363c76d3fc7225ef /qpid/cpp
parentdf7cc99c47c1917c736691a71c0b50eb08ebfa9f (diff)
downloadqpid-python-1f98f2d4626151dbbeb3ef5b88186b3533e67262.tar.gz
QPID-3603: Add ErrorListener to Bridge to handle session errors.
Will be used in HA code. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1335563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h16
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp36
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h21
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp86
6 files changed, 175 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 53fe38a504..1f6aae7111 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -90,8 +90,7 @@ void Bridge::create(Connection& c)
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
SessionHandler& sessionHandler = c.getChannel(channel);
- sessionHandler.setDetachedCallback(
- boost::bind(&Bridge::sessionDetached, shared_from_this()));
+ sessionHandler.setErrorListener(shared_from_this());
if (args.i_srcIsLocal) {
if (args.i_dynamic)
throw Exception("Dynamic routing not supported for push routes");
@@ -377,8 +376,29 @@ const string& Bridge::getLocalTag() const
{
return link->getBroker()->getFederationTag();
}
-void Bridge::sessionDetached() {
+
+// SessionHandler::ErrorListener methods.
+void Bridge::connectionException(
+ framing::connection::CloseCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->connectionException(code, msg);
+}
+
+void Bridge::channelException(
+ framing::session::DetachCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->channelException(code, msg);
+}
+
+void Bridge::executionException(
+ framing::execution::ErrorCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->executionException(code, msg);
+}
+
+void Bridge::detach() {
detached = true;
+ if (errorListener) errorListener->detach();
}
std::string Bridge::createName(const std::string& linkName,
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 2cf07d3a94..b980abc900 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -29,6 +29,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/SessionHandler.h"
#include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
#include "qmf/org/apache/qpid/broker/Bridge.h"
@@ -43,14 +44,14 @@ class Connection;
class ConnectionState;
class Link;
class LinkRegistry;
-class SessionHandler;
class Bridge : public PersistableConfig,
public management::Manageable,
public Exchange::DynamicBridge,
+ public SessionHandler::ErrorListener,
public boost::enable_shared_from_this<Bridge>
{
-public:
+ public:
typedef boost::shared_ptr<Bridge> shared_ptr;
typedef boost::function<void(Bridge*)> CancellationListener;
typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
@@ -104,10 +105,14 @@ public:
const std::string& dest,
const std::string& key);
-private:
- // Callback when the bridge's session is detached.
- void sessionDetached();
+ // SessionHandler::ErrorListener methods.
+ void connectionException(framing::connection::CloseCode code, const std::string& msg);
+ void channelException(framing::session::DetachCode, const std::string& msg);
+ void executionException(framing::execution::ErrorCode, const std::string& msg);
+ void detach();
+ void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
+ private:
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
void handle(framing::AMQFrame& frame);
@@ -138,6 +143,7 @@ private:
void cancel(Connection& c);
void closed();
friend class Link; // to call create, cancel, closed()
+ boost::shared_ptr<ErrorListener> errorListener;
};
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index b605ca71e5..467d422721 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -267,13 +267,20 @@ void Link::setUrl(const Url& u) {
namespace {
- /** invoked when session used to subscribe to remote's amq.failover exchange detaches */
- void sessionDetached(Link *link) {
- QPID_LOG(notice, "detached from 'amq.failover' for link: " << link->getName());
+class DetachedCallback : public SessionHandler::ErrorListener {
+ public:
+ DetachedCallback(const Link& link) : name(link.getName()) {}
+ void connectionException(framing::connection::CloseCode, const std::string&) {}
+ void channelException(framing::session::DetachCode, const std::string&) {}
+ void executionException(framing::execution::ErrorCode, const std::string&) {}
+ void detach() {
+ QPID_LOG(notice, "detached from 'amq.failover' for link: " << name);
}
+ private:
+ const std::string name;
+};
}
-
void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
@@ -301,7 +308,8 @@ void Link::opened() {
failoverChannel = nextChannel();
SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
- sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
+ sessionHandler.setErrorListener(
+ boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this)));
failoverSession = queueName;
sessionHandler.attachAs(failoverSession);
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index b58c7c01c5..23fa2ee0ca 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -35,23 +35,39 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
proxy(out),
- clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
+ clusterOrderProxy(c.getClusterOrderOutput() ?
+ new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
{}
SessionHandler::~SessionHandler() {}
-void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+void SessionHandler::connectionException(
+ framing::connection::CloseCode code, const std::string& msg)
+{
// NOTE: must tell the error listener _before_ calling connection.close()
- if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
+ if (connection.getErrorListener())
+ connection.getErrorListener()->connectionError(msg);
+ if (errorListener)
+ errorListener->connectionException(code, msg);
connection.close(code, msg);
}
-void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) {
- if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+void SessionHandler::channelException(
+ framing::session::DetachCode code, const std::string& msg)
+{
+ if (connection.getErrorListener())
+ connection.getErrorListener()->sessionError(getChannel(), msg);
+ if (errorListener)
+ errorListener->channelException(code, msg);
}
-void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) {
- if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+void SessionHandler::executionException(
+ framing::execution::ErrorCode code, const std::string& msg)
+{
+ if (connection.getErrorListener())
+ connection.getErrorListener()->sessionError(getChannel(), msg);
+ if (errorListener)
+ errorListener->executionException(code, msg);
}
ConnectionState& SessionHandler::getConnection() { return connection; }
@@ -64,7 +80,7 @@ void SessionHandler::handleDetach() {
if (session.get())
connection.getBroker().getSessionManager().detach(session);
assert(!session.get());
- if (detachedCallback) detachedCallback();
+ if (errorListener) errorListener->detach();
connection.closeChannel(channel.get());
}
@@ -118,8 +134,4 @@ void SessionHandler::attached(const std::string& name)
}
}
-void SessionHandler::setDetachedCallback(boost::function<void()> cb) {
- detachedCallback = cb;
-}
-
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index 4e2cfaa963..ab87cf41a4 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -25,7 +25,7 @@
#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
-#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
class SessionState;
@@ -43,6 +43,21 @@ class SessionState;
*/
class SessionHandler : public amqp_0_10::SessionHandler {
public:
+ class ErrorListener {
+ public:
+ virtual ~ErrorListener() {}
+ virtual void connectionException(
+ framing::connection::CloseCode code, const std::string& msg) = 0;
+ virtual void channelException(
+ framing::session::DetachCode, const std::string& msg) = 0;
+ virtual void executionException(
+ framing::execution::ErrorCode, const std::string& msg) = 0;
+ /** Called when it is safe to delete the ErrorListener. */
+ virtual void detach() = 0;
+ };
+
+ /**
+ *@param e must not be deleted until ErrorListener::detach has been called */
SessionHandler(Connection&, framing::ChannelId);
~SessionHandler();
@@ -71,7 +86,7 @@ class SessionHandler : public amqp_0_10::SessionHandler {
void attached(const std::string& name);//used by 'pushing' inter-broker bridges
void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
- void setDetachedCallback(boost::function<void()> cb);
+ void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
protected:
virtual void setState(const std::string& sessionName, bool force);
@@ -94,7 +109,7 @@ class SessionHandler : public amqp_0_10::SessionHandler {
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
std::auto_ptr<SetChannelProxy> clusterOrderProxy;
- boost::function<void ()> detachedCallback;
+ boost::shared_ptr<ErrorListener> errorListener;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
new file mode 100644
index 0000000000..bf17d27ca3
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "ConnectionExcluder.h"
+#include "HaBroker.h"
+#include "Primary.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/FieldTable.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace ha {
+
+Primary* Primary::instance = 0;
+
+Primary::Primary(HaBroker& b) :
+ haBroker(b), logPrefix(b),
+ expected(b.getSettings().expectedBackups),
+ unready(0), activated(false)
+{
+ instance = this; // Let queue replicators find us.
+ if (expected == 0) // No need for ready check
+ activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor.
+ else {
+ // Set up the primary-ready check: ready when all queues have
+ // expected number of backups. Note clients are excluded at this point
+ // so dont't have to worry about changes to the set of queues.
+ HaBroker::QueueNames names = haBroker.getActiveBackups();
+ for (HaBroker::QueueNames::const_iterator i = names.begin(); i != names.end(); ++i)
+ {
+ queues[*i] = 0;
+ ++unready;
+ QPID_LOG(debug, logPrefix << "Need backup of " << *i
+ << ", " << unready << " unready queues");
+ }
+ QPID_LOG(critical, "FIXME Primary " << queues.size() << " queues");
+ if (queues.empty())
+ activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor.
+ else {
+ QPID_LOG(debug, logPrefix << "Waiting for " << expected
+ << " backups on " << queues.size() << " queues");
+ // Allow backups to connect.
+ haBroker.getExcluder()->setBackupAllowed(true);
+ }
+ }
+}
+
+void Primary::addReplica(const std::string& q) {
+ sys::Mutex::ScopedLock l(lock);
+ if (!activated) {
+ QueueCounts::iterator i = queues.find(q);
+ if (i != queues.end()) {
+ ++i->second;
+ if (i->second == expected) --unready;
+ QPID_LOG(debug, logPrefix << i->second << " backups caught up on " << q
+ << ", " << unready << " unready queues");
+ if (unready == 0) activate(l);
+ }
+ }
+}
+
+void Primary::activate(sys::Mutex::ScopedLock&) {
+ activated = true;
+ haBroker.activate();
+}
+
+}} // namespace qpid::ha