summaryrefslogtreecommitdiff
path: root/cpp/lib/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker')
-rw-r--r--cpp/lib/broker/Broker.h1
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp167
-rw-r--r--cpp/lib/broker/BrokerAdapter.h96
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp95
-rw-r--r--cpp/lib/broker/BrokerChannel.h26
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp3
-rw-r--r--cpp/lib/broker/BrokerMessage.h13
-rw-r--r--cpp/lib/broker/BrokerMessageBase.h44
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp4
-rw-r--r--cpp/lib/broker/BrokerQueue.h10
-rw-r--r--cpp/lib/broker/Connection.cpp43
-rw-r--r--cpp/lib/broker/Connection.h61
-rw-r--r--cpp/lib/broker/HandlerImpl.h71
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp33
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.h13
15 files changed, 400 insertions, 280 deletions
diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h
index 27d2fec006..e2ca88d4d0 100644
--- a/cpp/lib/broker/Broker.h
+++ b/cpp/lib/broker/Broker.h
@@ -30,7 +30,6 @@
#include <MessageStore.h>
#include <AutoDelete.h>
#include <ExchangeRegistry.h>
-#include <BrokerChannel.h>
#include <ConnectionToken.h>
#include <DirectExchange.h>
#include <OutputHandler.h>
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 8b081874fc..ec80241c66 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -18,11 +18,10 @@
#include <boost/format.hpp>
#include "BrokerAdapter.h"
+#include "BrokerChannel.h"
#include "Connection.h"
-#include "Exception.h"
#include "AMQMethodBody.h"
#include "Exception.h"
-#include "MessageHandlerImpl.h"
namespace qpid {
namespace broker {
@@ -33,18 +32,37 @@ using namespace qpid::framing;
typedef std::vector<Queue::shared_ptr> QueueVector;
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk(
- const MethodContext& context , const FieldTable& /*clientProperties*/,
+
+BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
+ CoreRefs(ch, c, b),
+ connection(c),
+ basicHandler(*this),
+ channelHandler(*this),
+ connectionHandler(*this),
+ exchangeHandler(*this),
+ messageHandler(*this),
+ queueHandler(*this),
+ txHandler(*this)
+{}
+
+
+ProtocolVersion BrokerAdapter::getVersion() const {
+ return connection.getVersion();
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::startOk(
+ const MethodContext&, const FieldTable& /*clientProperties*/,
const string& /*mechanism*/,
- const string& /*response*/, const string& /*locale*/){
- connection.client->getConnection().tune(
- context, 100, connection.getFrameMax(), connection.getHeartbeat());
+ const string& /*response*/, const string& /*locale*/)
+{
+ client.tune(
+ 100, connection.getFrameMax(), connection.getHeartbeat());
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk(
+void BrokerAdapter::ConnectionHandlerImpl::secureOk(
const MethodContext&, const string& /*response*/){}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
+void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
const MethodContext&, u_int16_t /*channelmax*/,
u_int32_t framemax, u_int16_t heartbeat)
{
@@ -52,50 +70,55 @@ void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
connection.setHeartbeat(heartbeat);
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::ConnectionHandlerImpl::open(
+ const MethodContext& context, const string& /*virtualHost*/,
+ const string& /*capabilities*/, bool /*insist*/)
+{
string knownhosts;
- connection.client->getConnection().openOk(context, knownhosts);
+ client.openOk(
+ knownhosts, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close(
+void BrokerAdapter::ConnectionHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getConnection().closeOk(context);
+ client.closeOk(context.getRequestId());
connection.getOutput().close();
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
+void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
connection.getOutput().close();
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open(
+void BrokerAdapter::ChannelHandlerImpl::open(
const MethodContext& context, const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- connection.client->getChannel().openOk(context, std::string()/* ID */);
+ client.openOk(
+ std::string()/* ID */, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close(
+void BrokerAdapter::ChannelHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/,
const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getChannel().closeOk(context);
+ client.closeOk(context.getRequestId());
// FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
connection.closeChannel(channel.getId());
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
+void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
-void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
if(passive){
if(!broker.getExchanges().get(exchange)) {
@@ -116,27 +139,30 @@ void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodCont
}
}
if(!nowait){
- connection.client->getExchange().declareOk(context);
+ client.declareOk(context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
- if(!nowait) connection.client->getExchange().deleteOk(context);
+ if(!nowait) client.deleteOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = connection.getQueue(name, channel.getId());
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
+ broker.getQueues().declare(
+ name, durable,
+ autoDelete ? connection.getTimeout() : 0,
+ exclusive ? &connection : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -161,20 +187,22 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext
% queue->getName());
if (!nowait) {
string queueName = queue->getName();
- connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ client.declareOk(
+ queueName, queue->getMessageCount(), queue->getConsumerCount(),
+ context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
+void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) connection.client->getQueue().bindOk(context);
+ if(!nowait) client.bindOk(context.getRequestId());
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -182,7 +210,7 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& c
}
void
-BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
+BrokerAdapter::QueueHandlerImpl::unbind(
const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
@@ -198,18 +226,18 @@ BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
exchange->unbind(queue, routingKey, &arguments);
- connection.client->getQueue().unbindOk(context);
+ client.unbindOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
- if(!nowait) connection.client->getQueue().purgeOk(context, count);
+ if(!nowait) client.purgeOk( count, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
@@ -228,20 +256,21 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext
broker.getQueues().destroy(queue);
}
- if(!nowait) connection.client->getQueue().deleteOk(context, count);
+ if(!nowait)
+ client.deleteOk(count, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getBasic().qosOk(context);
+ client.qosOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
+void BrokerAdapter::BasicHandlerImpl::consume(
const MethodContext& context, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
@@ -257,19 +286,19 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
channel.consume(
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
+ if(!nowait) client.consumeOk(newTag, context.getRequestId());
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
+ if(!nowait) client.cancelOk(consumerTag, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
+void BrokerAdapter::BasicHandlerImpl::publish(
const MethodContext& context, u_int16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate)
@@ -287,16 +316,16 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
string clusterId;//not used, part of an imatix hack
- connection.client->getBasic().getEmpty(context, clusterId);
+ client.getEmpty(clusterId, context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
try{
channel.ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
@@ -304,31 +333,31 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
channel.recover(requeue);
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
channel.begin();
- connection.client->getTx().selectOk(context);
+ client.selectOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
channel.commit();
- connection.client->getTx().commitOk(context);
+ client.commitOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
channel.rollback();
- connection.client->getTx().rollbackOk(context);
+ client.rollbackOk(context.getRequestId());
channel.recover(false);
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
+BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
{
//no specific action required, generic response handling should be sufficient
}
@@ -338,21 +367,21 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
// Message class method handlers
//
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
+BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
{
- connection.client->getChannel().ok(context);
- connection.client->getChannel().pong(context);
+ client.ok(context.getRequestId());
+ client.pong();
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
+BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
{
- connection.client->getChannel().ok(context);
+ client.ok(context.getRequestId());
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume(
+BrokerAdapter::ChannelHandlerImpl::resume(
const MethodContext&,
const string& /*channel*/ )
{
diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h
index cd34f17e58..166ec78ddd 100644
--- a/cpp/lib/broker/BrokerAdapter.h
+++ b/cpp/lib/broker/BrokerAdapter.h
@@ -19,8 +19,9 @@
*
*/
#include "AMQP_ServerOperations.h"
+#include "HandlerImpl.h"
#include "MessageHandlerImpl.h"
-#include "BrokerChannel.h"
+#include "Exception.h"
namespace qpid {
namespace broker {
@@ -28,14 +29,6 @@ namespace broker {
class Channel;
class Connection;
class Broker;
-
-/**
- * Per-channel protocol adapter.
- *
- * Translates protocol bodies into calls on the core Channel,
- * Connection and Broker objects.
- */
-
class ChannelHandler;
class ConnectionHandler;
class BasicHandler;
@@ -48,20 +41,23 @@ class FileHandler;
class StreamHandler;
class DtxHandler;
class TunnelHandler;
+class MessageHandlerImpl;
-class BrokerAdapter : public framing::AMQP_ServerOperations
+/**
+ * Per-channel protocol adapter.
+ *
+ * A container for a collection of AMQP-class adapters that translate
+ * AMQP method bodies into calls on the core Channel, Connection and
+ * Broker objects. Each adapter class also provides a client proxy
+ * to send methods to the peer.
+ *
+ */
+class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
{
public:
- BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
- basicHandler(ch, c, b),
- channelHandler(ch, c, b),
- connectionHandler(ch, c, b),
- exchangeHandler(ch, c, b),
- messageHandler(ch, c, b),
- queueHandler(ch, c, b),
- txHandler(ch, c, b)
- {}
-
+ BrokerAdapter(Channel& ch, Connection& c, Broker& b);
+
+ framing::ProtocolVersion getVersion() const;
ChannelHandler* getChannelHandler() { return &channelHandler; }
ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
BasicHandler* getBasicHandler() { return &basicHandler; }
@@ -80,19 +76,16 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
TunnelHandler* getTunnelHandler() {
throw ConnectionException(540, "Tunnel class not implemented"); }
+ framing::AMQP_ClientProxy& getProxy() { return proxy; }
+
private:
- struct CoreRefs {
- CoreRefs(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b) {}
- Channel& channel;
- Connection& connection;
- Broker& broker;
- };
-
- class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
+ class ConnectionHandlerImpl :
+ public ConnectionHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Connection>
+ {
public:
- ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
void startOk(const framing::MethodContext& context,
const qpid::framing::FieldTable& clientProperties,
@@ -112,9 +105,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
void closeOk(const framing::MethodContext& context);
};
- class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
+ class ChannelHandlerImpl :
+ public ChannelHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Channel>
+ {
public:
- ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
void open(const framing::MethodContext& context, const std::string& outOfBand);
void flow(const framing::MethodContext& context, bool active);
void flowOk(const framing::MethodContext& context, bool active);
@@ -127,9 +124,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
void closeOk(const framing::MethodContext& context);
};
- class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
+ class ExchangeHandlerImpl :
+ public ExchangeHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Exchange>
+ {
public:
- ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
void declare(const framing::MethodContext& context, u_int16_t ticket,
const std::string& exchange, const std::string& type,
bool passive, bool durable, bool autoDelete,
@@ -139,9 +140,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
const std::string& exchange, bool ifUnused, bool nowait);
};
- class QueueHandlerImpl : private CoreRefs, public QueueHandler{
+ class QueueHandlerImpl :
+ public QueueHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Queue>
+ {
public:
- QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait,
@@ -162,9 +167,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
bool nowait);
};
- class BasicHandlerImpl : private CoreRefs, public BasicHandler{
+ class BasicHandlerImpl :
+ public BasicHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Basic>
+ {
public:
- BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
void qos(const framing::MethodContext& context, u_int32_t prefetchSize,
u_int16_t prefetchCount, bool global);
void consume(
@@ -184,14 +193,19 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
void recover(const framing::MethodContext& context, bool requeue);
};
- class TxHandlerImpl : private CoreRefs, public TxHandler{
+ class TxHandlerImpl :
+ public TxHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Tx>
+ {
public:
- TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
void select(const framing::MethodContext& context);
void commit(const framing::MethodContext& context);
void rollback(const framing::MethodContext& context);
};
+ Connection& connection;
BasicHandlerImpl basicHandler;
ChannelHandlerImpl channelHandler;
ConnectionHandlerImpl connectionHandler;
@@ -199,7 +213,7 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
MessageHandlerImpl messageHandler;
QueueHandlerImpl queueHandler;
TxHandlerImpl txHandler;
-
+
};
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 07636216a6..74e5504f17 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -25,6 +25,8 @@
#include <algorithm>
#include <functional>
+#include <boost/bind.hpp>
+
#include "BrokerChannel.h"
#include "DeletingTxOp.h"
#include "framing/ChannelAdapter.h"
@@ -50,7 +52,7 @@ Channel::Channel(
u_int32_t _framesize, MessageStore* const _store,
u_int64_t _stagingThreshold
) :
- ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()),
+ ChannelAdapter(id, &con.getOutput(), con.getVersion()),
connection(con),
currentDeliveryTag(1),
transactional(false),
@@ -74,46 +76,32 @@ bool Channel::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks,
+// TODO aconway 2007-02-12: Why is connection token passed in instead
+// of using the channel's parent connection?
+void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
{
- if(tag.empty()) tag = tagGenerator.generate();
- ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
- try{
- queue->consume(c, exclusive);//may throw exception
- consumers[tag] = c;
- } catch(...) {
- // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt.
- delete c;
- throw;
- }
-}
-
-void Channel::cancel(consumer_iterator i){
- ConsumerImpl* c = i->second;
- consumers.erase(i);
- if(c){
- c->cancel();
- delete c;
- }
+ if(tagInOut.empty())
+ tagInOut = tagGenerator.generate();
+ std::auto_ptr<ConsumerImpl> c(
+ new ConsumerImpl(this, tagInOut, queue, connection, acks));
+ queue->consume(c.get(), exclusive);//may throw exception
+ consumers.insert(tagInOut, c.release());
}
void Channel::cancel(const string& tag){
- consumer_iterator i = consumers.find(tag);
- if(i != consumers.end()){
- cancel(i);
- }
+ // consumers is a ptr_map so erase will delete the consumer
+ // which will call cancel.
+ ConsumerImplMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ consumers.erase(i);
}
void Channel::close(){
- if (isOpen()) {
- opened = false;
- while (!consumers.empty())
- cancel(consumers.begin());
- //requeue:
- recover(true);
- }
+ opened = false;
+ consumers.clear();
+ recover(true);
}
void Channel::begin(){
@@ -160,14 +148,10 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){
}
Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
- Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack) : parent(_parent),
- tag(_tag),
- queue(_queue),
- connection(_connection),
- ackExpected(ack),
- blocked(false){
-}
+ Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack
+) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
+ ackExpected(ack), blocked(false) {}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(!connection || connection != msg->getPublisher()){//check for no_local
@@ -182,12 +166,18 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
return false;
}
+Channel::ConsumerImpl::~ConsumerImpl() {
+ cancel();
+}
+
void Channel::ConsumerImpl::cancel(){
- if(queue) queue->cancel(this);
+ if(queue)
+ queue->cancel(this);
}
void Channel::ConsumerImpl::requestDispatch(){
- if(blocked) queue->dispatch();
+ if(blocked)
+ queue->dispatch();
}
void Channel::handleInlineTransfer(Message::shared_ptr msg)
@@ -196,11 +186,15 @@ void Channel::handleInlineTransfer(Message::shared_ptr msg)
connection.broker.getExchanges().get(msg->getExchange());
if(transactional){
TxPublish* deliverable = new TxPublish(msg);
- exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ exchange->route(
+ *deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
txBuffer.enlist(new DeletingTxOp(deliverable));
}else{
DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ exchange->route(
+ deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
}
}
@@ -244,7 +238,8 @@ void Channel::ack(){
ack(getRequestInProgress(), false);
}
-void Channel::ack(u_int64_t deliveryTag, bool multiple){
+void Channel::ack(u_int64_t deliveryTag, bool multiple)
+{
if(transactional){
accumulatedAck.update(deliveryTag, multiple);
//TODO: I think the outstanding prefetch size & count should be updated at this point...
@@ -271,9 +266,8 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
//if the prefetch limit had previously been reached, there may
//be messages that can be now be delivered
- for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
- j->second->requestDispatch();
- }
+ std::for_each(consumers.begin(), consumers.end(),
+ boost::bind(&ConsumerImpl::requestDispatch, _1));
}
}
@@ -328,8 +322,8 @@ void Channel::handleMethodInContext(
method->invoke(*adapter, context);
}
}catch(ChannelException& e){
- connection.client->getChannel().close(
- context, e.code, e.toString(),
+ adapter->getProxy().getChannel().close(
+ e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
connection.closeChannel(getId());
}catch(ConnectionException& e){
@@ -338,4 +332,3 @@ void Channel::handleMethodInContext(
connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
}
-
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 58c4f0a45b..538e86b0a8 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -23,10 +23,10 @@
*/
#include <list>
-#include <map>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/ptr_container/ptr_map.hpp>
#include <AccumulatedAck.h>
#include <Consumer.h>
@@ -56,7 +56,7 @@ using framing::string;
class Channel : public framing::ChannelAdapter,
public CompletionHandler
{
- class ConsumerImpl : public virtual Consumer
+ class ConsumerImpl : public Consumer
{
Channel* parent;
const string tag;
@@ -64,23 +64,25 @@ class Channel : public framing::ChannelAdapter,
ConnectionToken* const connection;
const bool ackExpected;
bool blocked;
+
public:
ConsumerImpl(Channel* parent, const string& tag,
Queue::shared_ptr queue,
ConnectionToken* const connection, bool ack);
+ ~ConsumerImpl();
virtual bool deliver(Message::shared_ptr& msg);
void cancel();
void requestDispatch();
};
- typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+ typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
Connection& connection;
u_int16_t id;
u_int64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
bool transactional;
- std::map<string, ConsumerImpl*> consumers;
+ ConsumerImplMap consumers;
u_int32_t prefetchSize;
u_int16_t prefetchCount;
Prefetch outstanding;
@@ -93,18 +95,17 @@ class Channel : public framing::ChannelAdapter,
MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
bool opened;
-
boost::scoped_ptr<BrokerAdapter> adapter;
// completion handler for MessageBuilder
void complete(Message::shared_ptr msg);
- void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
- void cancel(consumer_iterator consumer);
+ void deliver(Message::shared_ptr& msg, const string& tag,
+ Queue::shared_ptr& queue, bool ackExpected);
bool checkPrefetch(Message::shared_ptr& msg);
public:
- Channel(Connection& channel,
+ Channel(Connection& parent,
framing::ChannelId id,
u_int32_t framesize,
MessageStore* const _store = 0,
@@ -112,8 +113,8 @@ class Channel : public framing::ChannelAdapter,
~Channel();
- // For ChannelAdapter
bool isOpen() const { return opened; }
+ BrokerAdapter& getAdatper() { return *adapter; }
void open() { opened = true; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
@@ -122,7 +123,11 @@ class Channel : public framing::ChannelAdapter,
u_int16_t setPrefetchCount(u_int16_t n){ return prefetchCount = n; }
bool exists(const string& consumerTag);
- void consume(string& tag, Queue::shared_ptr queue, bool acks,
+
+ /**
+ *@param tagInOut - if empty it is updated with the generated token.
+ */
+ void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection = 0,
const framing::FieldTable* = 0);
void cancel(const string& tag);
@@ -146,7 +151,6 @@ class Channel : public framing::ChannelAdapter,
void handleMethodInContext(
boost::shared_ptr<framing::AMQMethodBody> method,
const framing::MethodContext& context);
-
};
struct InvalidAckException{};
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index e8a993942a..bff4492a49 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -28,6 +28,9 @@
#include <MessageStore.h>
#include <BasicDeliverBody.h>
#include <BasicGetOkBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeaderBody.h>
+#include "AMQMethodBody.h"
#include "AMQFrame.h"
#include "framing/ChannelAdapter.h"
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index 9e77eab446..871514e55f 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -22,12 +22,10 @@
*
*/
-#include <BrokerMessageBase.h>
#include <memory>
#include <boost/shared_ptr.hpp>
-#include <AMQContentBody.h>
-#include <AMQHeaderBody.h>
-#include "AMQMethodBody.h"
+
+#include <BrokerMessageBase.h>
#include <BasicHeaderProperties.h>
#include <ConnectionToken.h>
#include <Content.h>
@@ -39,6 +37,7 @@ namespace qpid {
namespace framing {
class MethodContext;
class ChannelAdapter;
+class AMQHeaderBody;
}
namespace broker {
@@ -52,7 +51,7 @@ using framing::string;
* request.
*/
class BasicMessage : public Message {
- framing::AMQHeaderBody::shared_ptr header;
+ boost::shared_ptr<framing::AMQHeaderBody> header;
std::auto_ptr<Content> content;
sys::Mutex contentLock;
u_int64_t size;
@@ -65,10 +64,10 @@ class BasicMessage : public Message {
BasicMessage(const ConnectionToken* const publisher,
const string& exchange, const string& routingKey,
bool mandatory, bool immediate,
- framing::AMQMethodBody::shared_ptr respondTo);
+ boost::shared_ptr<framing::AMQMethodBody> respondTo);
BasicMessage();
~BasicMessage();
- void setHeader(framing::AMQHeaderBody::shared_ptr header);
+ void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header);
void addContent(framing::AMQContentBody::shared_ptr data);
bool isComplete();
diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h
index 41a0cd45fa..3bba95a5f8 100644
--- a/cpp/lib/broker/BrokerMessageBase.h
+++ b/cpp/lib/broker/BrokerMessageBase.h
@@ -22,14 +22,10 @@
*
*/
-#include "AMQContentBody.h"
-#include "AMQHeaderBody.h"
-#include "AMQMethodBody.h"
-#include "Content.h"
-#include "framing/amqp_types.h"
-
#include <string>
#include <boost/shared_ptr.hpp>
+#include "Content.h"
+#include "framing/amqp_types.h"
namespace qpid {
@@ -38,6 +34,9 @@ class MethodContext;
class ChannelAdapter;
class BasicHeaderProperties;
class FieldTable;
+class AMQMethodBody;
+class AMQContentBody;
+class AMQHeaderBody;
}
@@ -50,24 +49,17 @@ class MessageStore;
* abstracting away the operations
* TODO; AMS: for the moment this is mostly a placeholder
*/
-class Message{
- const ConnectionToken* publisher;
- std::string exchange;
- std::string routingKey;
- const bool mandatory;
- const bool immediate;
- u_int64_t persistenceId;
- bool redelivered;
- framing::AMQMethodBody::shared_ptr respondTo;
-
+class Message {
public:
typedef boost::shared_ptr<Message> shared_ptr;
+ typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr;
+
Message(const ConnectionToken* publisher_,
const std::string& _exchange,
const std::string& _routingKey,
bool _mandatory, bool _immediate,
- framing::AMQMethodBody::shared_ptr respondTo_) :
+ AMQMethodBodyPtr respondTo_) :
publisher(publisher_),
exchange(_exchange),
routingKey(_routingKey),
@@ -92,9 +84,7 @@ class Message{
const std::string& getExchange() const { return exchange; }
u_int64_t getPersistenceId() const { return persistenceId; }
bool getRedelivered() const { return redelivered; }
- framing::AMQMethodBody::shared_ptr getRespondTo() const {
- return respondTo;
- }
+ AMQMethodBodyPtr getRespondTo() const { return respondTo; }
void setRouting(const std::string& _exchange, const std::string& _routingKey)
{ exchange = _exchange; routingKey = _routingKey; }
@@ -168,14 +158,24 @@ class Message{
* it uses).
*/
virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(framing::AMQHeaderBody::shared_ptr /*header*/) {};
- virtual void addContent(framing::AMQContentBody::shared_ptr /*data*/) {};
+ virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {};
+ virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {};
/**
* Releases the in-memory content data held by this
* message. Must pass in a store from which the data can
* be reloaded.
*/
virtual void releaseContent(MessageStore* /*store*/) {};
+
+ private:
+ const ConnectionToken* publisher;
+ std::string exchange;
+ std::string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ u_int64_t persistenceId;
+ bool redelivered;
+ AMQMethodBodyPtr respondTo;
};
}}
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index 99c045c59a..789e652947 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -149,7 +149,9 @@ void Queue::consume(Consumer* c, bool requestExclusive){
void Queue::cancel(Consumer* c){
Mutex::ScopedLock locker(lock);
- consumers.erase(find(consumers.begin(), consumers.end(), c));
+ Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
+ if (i != consumers.end())
+ consumers.erase(i);
if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC;
if(exclusive == c) exclusive = 0;
}
diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h
index 40fa4bd415..015b27fe76 100644
--- a/cpp/lib/broker/BrokerQueue.h
+++ b/cpp/lib/broker/BrokerQueue.h
@@ -53,13 +53,17 @@ namespace qpid {
* or more consumers registers.
*/
class Queue{
+ typedef std::vector<Consumer*> Consumers;
+ typedef std::queue<Binding*> Bindings;
+ typedef std::queue<Message::shared_ptr> Messages;
+
const string name;
const u_int32_t autodelete;
MessageStore* const store;
const ConnectionToken* const owner;
- std::vector<Consumer*> consumers;
- std::queue<Binding*> bindings;
- std::queue<Message::shared_ptr> messages;
+ Consumers consumers;
+ Bindings bindings;
+ Messages messages;
bool queueing;
bool dispatching;
int next;
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp
index 000199a65e..3d9e5cdaf8 100644
--- a/cpp/lib/broker/Connection.cpp
+++ b/cpp/lib/broker/Connection.cpp
@@ -22,6 +22,9 @@
#include <assert.h>
#include "Connection.h"
+#include "BrokerChannel.h"
+#include "AMQP_ClientProxy.h"
+#include "BrokerAdapter.h"
using namespace boost;
using namespace qpid::sys;
@@ -33,12 +36,15 @@ namespace broker {
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
broker(broker_),
- settings(broker.getTimeout(), broker.getStagingThreshold()),
out(out_),
framemax(65536),
- heartbeat(0)
+ heartbeat(0),
+ client(0),
+ timeout(broker.getTimeout()),
+ stagingThreshold(broker.getStagingThreshold())
{}
+
Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){
Queue::shared_ptr queue;
if (name.empty()) {
@@ -59,31 +65,27 @@ Exchange::shared_ptr Connection::findExchange(const string& name){
}
-void Connection::received(qpid::framing::AMQFrame* frame){
+void Connection::received(framing::AMQFrame* frame){
getChannel(frame->getChannel()).handleBody(frame->getBody());
}
-void Connection::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId){
- client->getConnection().close(MethodContext(&getChannel(0)), code, text, classId, methodId);
+void Connection::close(
+ ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+ client->close(code, text, classId, methodId);
getOutput().close();
}
-// TODO aconway 2007-02-02: Should be delegated to the BrokerAdapter
-// as it is part of the protocol.
-void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
- if (client.get())
- // TODO aconway 2007-01-16: correct error code.
- throw ConnectionException(0, "Connection initiated twice");
- client.reset(new qpid::framing::AMQP_ClientProxy(
- out, header->getMajor(), header->getMinor()));
+void Connection::initiated(framing::ProtocolInitiation* header) {
+ version = ProtocolVersion(header->getMajor(), header->getMinor());
FieldTable properties;
string mechanisms("PLAIN");
string locales("en_US");
- client->getConnection().start(
- MethodContext(&getChannel(0)),
+ getChannel(0).init(0, *out, getVersion());
+ client = &getChannel(0).getAdatper().getProxy().getConnection();
+ client->start(
header->getMajor(), header->getMinor(),
properties, mechanisms, locales);
- getChannel(0).init(0, *out, client->getProtocolVersion());
}
void Connection::idleOut(){}
@@ -103,9 +105,10 @@ void Connection::closed(){
}
}
-void Connection::closeChannel(u_int16_t channel) {
- getChannel(channel).close();
- channels.erase(channels.find(channel));
+void Connection::closeChannel(u_int16_t id) {
+ ChannelMap::iterator i = channels.find(id);
+ if (i != channels.end())
+ i->close();
}
@@ -115,7 +118,7 @@ Channel& Connection::getChannel(ChannelId id) {
i = channels.insert(
id, new Channel(
*this, id, framemax, broker.getQueues().getStore(),
- settings.stagingThreshold)).first;
+ broker.getStagingThreshold())).first;
}
return *i;
}
diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h
index 4f1156dd01..27faab4967 100644
--- a/cpp/lib/broker/Connection.h
+++ b/cpp/lib/broker/Connection.h
@@ -27,66 +27,64 @@
#include <boost/ptr_container/ptr_map.hpp>
#include <AMQFrame.h>
-#include <AMQP_ClientProxy.h>
#include <AMQP_ServerOperations.h>
+#include <AMQP_ClientProxy.h>
#include <sys/ConnectionOutputHandler.h>
#include <sys/ConnectionInputHandler.h>
#include <sys/TimeoutHandler.h>
+#include "framing/ProtocolVersion.h"
#include "Broker.h"
#include "Exception.h"
+#include "BrokerChannel.h"
namespace qpid {
namespace broker {
-class Settings {
- public:
- const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
- const u_int64_t stagingThreshold;
-
- Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {}
-};
+class Channel;
class Connection : public sys::ConnectionInputHandler,
public ConnectionToken
{
public:
Connection(sys::ConnectionOutputHandler* out, Broker& broker);
- // ConnectionInputHandler methods
- void received(framing::AMQFrame* frame);
- void initiated(framing::ProtocolInitiation* header);
- void idleOut();
- void idleIn();
- void closed();
- sys::ConnectionOutputHandler& getOutput() { return *out; }
+ /** Get a channel. Create if it does not already exist */
+ Channel& getChannel(framing::ChannelId channel);
- const framing::ProtocolVersion& getVersion() {
- return client->getProtocolVersion(); }
+ /** Close a channel */
+ void closeChannel(framing::ChannelId channel);
+
+ /** Close the connection */
+ void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+
+ sys::ConnectionOutputHandler& getOutput() const { return *out; }
+ framing::ProtocolVersion getVersion() const { return version; }
u_int32_t getFrameMax() const { return framemax; }
u_int16_t getHeartbeat() const { return heartbeat; }
+ u_int32_t getTimeout() const { return timeout; }
+ u_int64_t getStagingThreshold() const { return stagingThreshold; }
void setFrameMax(u_int32_t fm) { framemax = fm; }
void setHeartbeat(u_int16_t hb) { heartbeat = hb; }
-
- Broker& broker;
- std::auto_ptr<framing::AMQP_ClientProxy> client;
- Settings settings;
-
- std::vector<Queue::shared_ptr> exclusiveQueues;
-
+
/**
* Get named queue, never returns 0.
* @return: named queue or default queue for channel if name=""
* @exception: ChannelException if no queue of that name is found.
- * @exception: ConnectionException if no queue specified and channel has not declared one.
+ * @exception: ConnectionException if name="" and channel has no default.
*/
Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
- Channel& newChannel(framing::ChannelId channel);
- Channel& getChannel(framing::ChannelId channel);
- void closeChannel(framing::ChannelId channel);
- void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+ Broker& broker;
+ std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ // ConnectionInputHandler methods
+ void received(framing::AMQFrame* frame);
+ void initiated(framing::ProtocolInitiation* header);
+ void idleOut();
+ void idleIn();
+ void closed();
private:
typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap;
@@ -94,10 +92,15 @@ class Connection : public sys::ConnectionInputHandler,
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
Exchange::shared_ptr findExchange(const string& name);
+ framing::ProtocolVersion version;
ChannelMap channels;
sys::ConnectionOutputHandler* out;
u_int32_t framemax;
u_int16_t heartbeat;
+ framing::AMQP_ClientProxy::Connection* client;
+ const u_int32_t timeout; //timeout for auto-deleted queues (in ms)
+ const u_int64_t stagingThreshold;
+
};
}}
diff --git a/cpp/lib/broker/HandlerImpl.h b/cpp/lib/broker/HandlerImpl.h
new file mode 100644
index 0000000000..c55a36da45
--- /dev/null
+++ b/cpp/lib/broker/HandlerImpl.h
@@ -0,0 +1,71 @@
+#ifndef _broker_HandlerImpl_h
+#define _broker_HandlerImpl_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "BrokerChannel.h"
+#include "AMQP_ClientProxy.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
+namespace broker {
+
+class Broker;
+class Channel;
+class Connection;
+
+/**
+ * A collection of references to the core objects required by an adapter,
+ * and a client proxy.
+ */
+struct CoreRefs
+{
+ CoreRefs(Channel& ch, Connection& c, Broker& b)
+ : channel(ch), connection(c), broker(b), proxy(ch) {}
+
+ Channel& channel;
+ Connection& connection;
+ Broker& broker;
+ framing::AMQP_ClientProxy proxy;
+};
+
+
+/**
+ * Base template for protocol handler implementations.
+ * Provides the core references and appropriate AMQP class proxy.
+ */
+template <class ProxyType>
+struct HandlerImpl : public CoreRefs {
+ typedef HandlerImpl<ProxyType> HandlerImplType;
+ HandlerImpl(CoreRefs& parent)
+ : CoreRefs(parent), client(ProxyType::get(proxy)) {}
+ ProxyType client;
+};
+
+
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_HandlerImpl_h*/
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 797e3fbbf9..0853aebcb1 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -25,16 +25,15 @@
#include "BrokerMessageMessage.h"
#include "MessageAppendBody.h"
#include "MessageTransferBody.h"
+#include "BrokerAdapter.h"
namespace qpid {
namespace broker {
using namespace framing;
-MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b), references(ch),
- client(connection.client->getMessage())
-{}
+MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
+ : HandlerImplType(parent), references(channel) {}
//
// Message class method handlers
@@ -47,7 +46,7 @@ MessageHandlerImpl::append(const MethodContext& context,
references.get(reference).append(
boost::shared_polymorphic_downcast<MessageAppendBody>(
context.methodBody));
- client.ok(context);
+ client.ok(context.getRequestId());
}
@@ -56,7 +55,7 @@ MessageHandlerImpl::cancel(const MethodContext& context,
const string& destination )
{
channel.cancel(destination);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -73,7 +72,7 @@ MessageHandlerImpl::close(const MethodContext& context,
const string& reference)
{
references.get(reference).close();
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -84,7 +83,7 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool noLocal,
bool noAck,
bool exclusive,
- const qpid::framing::FieldTable& filter )
+ const framing::FieldTable& filter )
{
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!destination.empty() && channel.exists(destination))
@@ -93,7 +92,7 @@ MessageHandlerImpl::consume(const MethodContext& context,
channel.consume(
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
- client.ok(context);
+ client.ok(context.getRequestId());
// Dispatch messages as there is now a consumer.
queue->dispatch();
}
@@ -117,9 +116,9 @@ MessageHandlerImpl::get( const MethodContext& context,
connection.getQueue(queueName, context.channel->getId());
if(channel.get(queue, destination, !noAck))
- client.ok(context);
+ client.ok(context.getRequestId());
else
- client.empty(context);
+ client.empty(context.getRequestId());
}
void
@@ -141,7 +140,7 @@ MessageHandlerImpl::open(const MethodContext& context,
const string& reference)
{
references.open(reference);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -153,7 +152,7 @@ MessageHandlerImpl::qos(const MethodContext& context,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -161,7 +160,7 @@ MessageHandlerImpl::recover(const MethodContext& context,
bool requeue)
{
channel.recover(requeue);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -204,8 +203,8 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*appId*/,
const string& /*transactionId*/,
const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content body,
+ const framing::FieldTable& /*applicationHeaders*/,
+ const framing::Content& body,
bool /*mandatory*/)
{
MessageTransferBody::shared_ptr transfer(
@@ -218,7 +217,7 @@ MessageHandlerImpl::transfer(const MethodContext& context,
channel.handleInlineTransfer(message);
else
references.get(body.getValue()).addMessage(message);
- client.ok(context);
+ client.ok(context.getRequestId());
}
diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h
index 0fef45bb19..cb7e7e3126 100644
--- a/cpp/lib/broker/MessageHandlerImpl.h
+++ b/cpp/lib/broker/MessageHandlerImpl.h
@@ -24,7 +24,7 @@
#include "AMQP_ServerOperations.h"
#include "AMQP_ClientProxy.h"
#include "Reference.h"
-#include "BrokerChannel.h"
+#include "HandlerImpl.h"
namespace qpid {
namespace broker {
@@ -34,10 +34,11 @@ class Broker;
class MessageMessage;
class MessageHandlerImpl :
- public framing::AMQP_ServerOperations::MessageHandler
+ public framing::AMQP_ServerOperations::MessageHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Message>
{
public:
- MessageHandlerImpl(Channel& ch, Connection& c, Broker& b);
+ MessageHandlerImpl(CoreRefs& parent);
void append(const framing::MethodContext&,
const std::string& reference,
@@ -116,14 +117,10 @@ class MessageHandlerImpl :
const std::string& transactionId,
const std::string& securityToken,
const framing::FieldTable& applicationHeaders,
- framing::Content body,
+ const framing::Content& body,
bool mandatory );
private:
- Channel& channel;
- Connection& connection;
- Broker& broker;
ReferenceRegistry references;
- framing::AMQP_ClientProxy::Message& client;
};
}} // namespace qpid::broker