summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/Makefile.am2
-rw-r--r--cpp/lib/broker/Broker.h1
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp167
-rw-r--r--cpp/lib/broker/BrokerAdapter.h96
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp95
-rw-r--r--cpp/lib/broker/BrokerChannel.h26
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp3
-rw-r--r--cpp/lib/broker/BrokerMessage.h13
-rw-r--r--cpp/lib/broker/BrokerMessageBase.h44
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp4
-rw-r--r--cpp/lib/broker/BrokerQueue.h10
-rw-r--r--cpp/lib/broker/Connection.cpp43
-rw-r--r--cpp/lib/broker/Connection.h61
-rw-r--r--cpp/lib/broker/HandlerImpl.h71
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp33
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.h13
-rw-r--r--cpp/lib/client/ClientChannel.cpp12
-rw-r--r--cpp/lib/client/ClientChannel.h22
-rw-r--r--cpp/lib/client/Connection.cpp2
-rw-r--r--cpp/lib/client/Connection.h11
-rw-r--r--cpp/lib/client/Connector.cpp14
-rw-r--r--cpp/lib/client/Connector.h2
-rw-r--r--cpp/lib/common/Makefile.am12
-rw-r--r--cpp/lib/common/framing/AMQFrame.cpp19
-rw-r--r--cpp/lib/common/framing/AMQFrame.h8
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.cpp8
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.h15
-rw-r--r--cpp/lib/common/framing/FramingContent.h6
-rw-r--r--cpp/lib/common/framing/MethodContext.cpp31
-rw-r--r--cpp/lib/common/framing/MethodContext.h8
-rw-r--r--cpp/lib/common/framing/ProtocolInitiation.cpp17
-rw-r--r--cpp/lib/common/framing/ProtocolInitiation.h4
-rw-r--r--cpp/lib/common/framing/ProtocolVersion.cpp35
-rw-r--r--cpp/lib/common/framing/ProtocolVersion.h26
-rw-r--r--cpp/lib/common/framing/ProtocolVersionException.h2
-rw-r--r--cpp/lib/common/framing/Proxy.cpp32
-rw-r--r--cpp/lib/common/framing/Proxy.h51
-rw-r--r--cpp/lib/common/framing/amqp_types.h9
-rw-r--r--cpp/lib/common/framing/amqp_types_full.h36
39 files changed, 686 insertions, 378 deletions
diff --git a/cpp/lib/Makefile.am b/cpp/lib/Makefile.am
index 377cd4ede4..09a689bc76 100644
--- a/cpp/lib/Makefile.am
+++ b/cpp/lib/Makefile.am
@@ -1 +1 @@
-SUBDIRS = client common broker
+SUBDIRS = common broker client
diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h
index 27d2fec006..e2ca88d4d0 100644
--- a/cpp/lib/broker/Broker.h
+++ b/cpp/lib/broker/Broker.h
@@ -30,7 +30,6 @@
#include <MessageStore.h>
#include <AutoDelete.h>
#include <ExchangeRegistry.h>
-#include <BrokerChannel.h>
#include <ConnectionToken.h>
#include <DirectExchange.h>
#include <OutputHandler.h>
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 8b081874fc..ec80241c66 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -18,11 +18,10 @@
#include <boost/format.hpp>
#include "BrokerAdapter.h"
+#include "BrokerChannel.h"
#include "Connection.h"
-#include "Exception.h"
#include "AMQMethodBody.h"
#include "Exception.h"
-#include "MessageHandlerImpl.h"
namespace qpid {
namespace broker {
@@ -33,18 +32,37 @@ using namespace qpid::framing;
typedef std::vector<Queue::shared_ptr> QueueVector;
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk(
- const MethodContext& context , const FieldTable& /*clientProperties*/,
+
+BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
+ CoreRefs(ch, c, b),
+ connection(c),
+ basicHandler(*this),
+ channelHandler(*this),
+ connectionHandler(*this),
+ exchangeHandler(*this),
+ messageHandler(*this),
+ queueHandler(*this),
+ txHandler(*this)
+{}
+
+
+ProtocolVersion BrokerAdapter::getVersion() const {
+ return connection.getVersion();
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::startOk(
+ const MethodContext&, const FieldTable& /*clientProperties*/,
const string& /*mechanism*/,
- const string& /*response*/, const string& /*locale*/){
- connection.client->getConnection().tune(
- context, 100, connection.getFrameMax(), connection.getHeartbeat());
+ const string& /*response*/, const string& /*locale*/)
+{
+ client.tune(
+ 100, connection.getFrameMax(), connection.getHeartbeat());
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk(
+void BrokerAdapter::ConnectionHandlerImpl::secureOk(
const MethodContext&, const string& /*response*/){}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
+void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
const MethodContext&, u_int16_t /*channelmax*/,
u_int32_t framemax, u_int16_t heartbeat)
{
@@ -52,50 +70,55 @@ void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
connection.setHeartbeat(heartbeat);
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::ConnectionHandlerImpl::open(
+ const MethodContext& context, const string& /*virtualHost*/,
+ const string& /*capabilities*/, bool /*insist*/)
+{
string knownhosts;
- connection.client->getConnection().openOk(context, knownhosts);
+ client.openOk(
+ knownhosts, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close(
+void BrokerAdapter::ConnectionHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getConnection().closeOk(context);
+ client.closeOk(context.getRequestId());
connection.getOutput().close();
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
+void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
connection.getOutput().close();
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open(
+void BrokerAdapter::ChannelHandlerImpl::open(
const MethodContext& context, const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- connection.client->getChannel().openOk(context, std::string()/* ID */);
+ client.openOk(
+ std::string()/* ID */, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close(
+void BrokerAdapter::ChannelHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/,
const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getChannel().closeOk(context);
+ client.closeOk(context.getRequestId());
// FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
connection.closeChannel(channel.getId());
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
+void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
-void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
if(passive){
if(!broker.getExchanges().get(exchange)) {
@@ -116,27 +139,30 @@ void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodCont
}
}
if(!nowait){
- connection.client->getExchange().declareOk(context);
+ client.declareOk(context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
- if(!nowait) connection.client->getExchange().deleteOk(context);
+ if(!nowait) client.deleteOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = connection.getQueue(name, channel.getId());
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
+ broker.getQueues().declare(
+ name, durable,
+ autoDelete ? connection.getTimeout() : 0,
+ exclusive ? &connection : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -161,20 +187,22 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext
% queue->getName());
if (!nowait) {
string queueName = queue->getName();
- connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ client.declareOk(
+ queueName, queue->getMessageCount(), queue->getConsumerCount(),
+ context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
+void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) connection.client->getQueue().bindOk(context);
+ if(!nowait) client.bindOk(context.getRequestId());
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -182,7 +210,7 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& c
}
void
-BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
+BrokerAdapter::QueueHandlerImpl::unbind(
const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
@@ -198,18 +226,18 @@ BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
exchange->unbind(queue, routingKey, &arguments);
- connection.client->getQueue().unbindOk(context);
+ client.unbindOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
- if(!nowait) connection.client->getQueue().purgeOk(context, count);
+ if(!nowait) client.purgeOk( count, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
@@ -228,20 +256,21 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext
broker.getQueues().destroy(queue);
}
- if(!nowait) connection.client->getQueue().deleteOk(context, count);
+ if(!nowait)
+ client.deleteOk(count, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getBasic().qosOk(context);
+ client.qosOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
+void BrokerAdapter::BasicHandlerImpl::consume(
const MethodContext& context, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
@@ -257,19 +286,19 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
channel.consume(
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
+ if(!nowait) client.consumeOk(newTag, context.getRequestId());
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
+ if(!nowait) client.cancelOk(consumerTag, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
+void BrokerAdapter::BasicHandlerImpl::publish(
const MethodContext& context, u_int16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate)
@@ -287,16 +316,16 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
string clusterId;//not used, part of an imatix hack
- connection.client->getBasic().getEmpty(context, clusterId);
+ client.getEmpty(clusterId, context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
try{
channel.ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
@@ -304,31 +333,31 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
channel.recover(requeue);
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
channel.begin();
- connection.client->getTx().selectOk(context);
+ client.selectOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
channel.commit();
- connection.client->getTx().commitOk(context);
+ client.commitOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
channel.rollback();
- connection.client->getTx().rollbackOk(context);
+ client.rollbackOk(context.getRequestId());
channel.recover(false);
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
+BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
{
//no specific action required, generic response handling should be sufficient
}
@@ -338,21 +367,21 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
// Message class method handlers
//
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
+BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
{
- connection.client->getChannel().ok(context);
- connection.client->getChannel().pong(context);
+ client.ok(context.getRequestId());
+ client.pong();
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
+BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
{
- connection.client->getChannel().ok(context);
+ client.ok(context.getRequestId());
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume(
+BrokerAdapter::ChannelHandlerImpl::resume(
const MethodContext&,
const string& /*channel*/ )
{
diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h
index cd34f17e58..166ec78ddd 100644
--- a/cpp/lib/broker/BrokerAdapter.h
+++ b/cpp/lib/broker/BrokerAdapter.h
@@ -19,8 +19,9 @@
*
*/
#include "AMQP_ServerOperations.h"
+#include "HandlerImpl.h"
#include "MessageHandlerImpl.h"
-#include "BrokerChannel.h"
+#include "Exception.h"
namespace qpid {
namespace broker {
@@ -28,14 +29,6 @@ namespace broker {
class Channel;
class Connection;
class Broker;
-
-/**
- * Per-channel protocol adapter.
- *
- * Translates protocol bodies into calls on the core Channel,
- * Connection and Broker objects.
- */
-
class ChannelHandler;
class ConnectionHandler;
class BasicHandler;
@@ -48,20 +41,23 @@ class FileHandler;
class StreamHandler;
class DtxHandler;
class TunnelHandler;
+class MessageHandlerImpl;
-class BrokerAdapter : public framing::AMQP_ServerOperations
+/**
+ * Per-channel protocol adapter.
+ *
+ * A container for a collection of AMQP-class adapters that translate
+ * AMQP method bodies into calls on the core Channel, Connection and
+ * Broker objects. Each adapter class also provides a client proxy
+ * to send methods to the peer.
+ *
+ */
+class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
{
public:
- BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
- basicHandler(ch, c, b),
- channelHandler(ch, c, b),
- connectionHandler(ch, c, b),
- exchangeHandler(ch, c, b),
- messageHandler(ch, c, b),
- queueHandler(ch, c, b),
- txHandler(ch, c, b)
- {}
-
+ BrokerAdapter(Channel& ch, Connection& c, Broker& b);
+
+ framing::ProtocolVersion getVersion() const;
ChannelHandler* getChannelHandler() { return &channelHandler; }
ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
BasicHandler* getBasicHandler() { return &basicHandler; }
@@ -80,19 +76,16 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
TunnelHandler* getTunnelHandler() {
throw ConnectionException(540, "Tunnel class not implemented"); }
+ framing::AMQP_ClientProxy& getProxy() { return proxy; }
+
private:
- struct CoreRefs {
- CoreRefs(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b) {}
- Channel& channel;
- Connection& connection;
- Broker& broker;
- };
-
- class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
+ class ConnectionHandlerImpl :
+ public ConnectionHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Connection>
+ {
public:
- ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
void startOk(const framing::MethodContext& context,
const qpid::framing::FieldTable& clientProperties,
@@ -112,9 +105,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
void closeOk(const framing::MethodContext& context);
};
- class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
+ class ChannelHandlerImpl :
+ public ChannelHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Channel>
+ {
public:
- ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
void open(const framing::MethodContext& context, const std::string& outOfBand);
void flow(const framing::MethodContext& context, bool active);
void flowOk(const framing::MethodContext& context, bool active);
@@ -127,9 +124,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
void closeOk(const framing::MethodContext& context);
};
- class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
+ class ExchangeHandlerImpl :
+ public ExchangeHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Exchange>
+ {
public:
- ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
void declare(const framing::MethodContext& context, u_int16_t ticket,
const std::string& exchange, const std::string& type,
bool passive, bool durable, bool autoDelete,
@@ -139,9 +140,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
const std::string& exchange, bool ifUnused, bool nowait);
};
- class QueueHandlerImpl : private CoreRefs, public QueueHandler{
+ class QueueHandlerImpl :
+ public QueueHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Queue>
+ {
public:
- QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait,
@@ -162,9 +167,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
bool nowait);
};
- class BasicHandlerImpl : private CoreRefs, public BasicHandler{
+ class BasicHandlerImpl :
+ public BasicHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Basic>
+ {
public:
- BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
void qos(const framing::MethodContext& context, u_int32_t prefetchSize,
u_int16_t prefetchCount, bool global);
void consume(
@@ -184,14 +193,19 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
void recover(const framing::MethodContext& context, bool requeue);
};
- class TxHandlerImpl : private CoreRefs, public TxHandler{
+ class TxHandlerImpl :
+ public TxHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Tx>
+ {
public:
- TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
void select(const framing::MethodContext& context);
void commit(const framing::MethodContext& context);
void rollback(const framing::MethodContext& context);
};
+ Connection& connection;
BasicHandlerImpl basicHandler;
ChannelHandlerImpl channelHandler;
ConnectionHandlerImpl connectionHandler;
@@ -199,7 +213,7 @@ class BrokerAdapter : public framing::AMQP_ServerOperations
MessageHandlerImpl messageHandler;
QueueHandlerImpl queueHandler;
TxHandlerImpl txHandler;
-
+
};
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 07636216a6..74e5504f17 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -25,6 +25,8 @@
#include <algorithm>
#include <functional>
+#include <boost/bind.hpp>
+
#include "BrokerChannel.h"
#include "DeletingTxOp.h"
#include "framing/ChannelAdapter.h"
@@ -50,7 +52,7 @@ Channel::Channel(
u_int32_t _framesize, MessageStore* const _store,
u_int64_t _stagingThreshold
) :
- ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()),
+ ChannelAdapter(id, &con.getOutput(), con.getVersion()),
connection(con),
currentDeliveryTag(1),
transactional(false),
@@ -74,46 +76,32 @@ bool Channel::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks,
+// TODO aconway 2007-02-12: Why is connection token passed in instead
+// of using the channel's parent connection?
+void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
{
- if(tag.empty()) tag = tagGenerator.generate();
- ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
- try{
- queue->consume(c, exclusive);//may throw exception
- consumers[tag] = c;
- } catch(...) {
- // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt.
- delete c;
- throw;
- }
-}
-
-void Channel::cancel(consumer_iterator i){
- ConsumerImpl* c = i->second;
- consumers.erase(i);
- if(c){
- c->cancel();
- delete c;
- }
+ if(tagInOut.empty())
+ tagInOut = tagGenerator.generate();
+ std::auto_ptr<ConsumerImpl> c(
+ new ConsumerImpl(this, tagInOut, queue, connection, acks));
+ queue->consume(c.get(), exclusive);//may throw exception
+ consumers.insert(tagInOut, c.release());
}
void Channel::cancel(const string& tag){
- consumer_iterator i = consumers.find(tag);
- if(i != consumers.end()){
- cancel(i);
- }
+ // consumers is a ptr_map so erase will delete the consumer
+ // which will call cancel.
+ ConsumerImplMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ consumers.erase(i);
}
void Channel::close(){
- if (isOpen()) {
- opened = false;
- while (!consumers.empty())
- cancel(consumers.begin());
- //requeue:
- recover(true);
- }
+ opened = false;
+ consumers.clear();
+ recover(true);
}
void Channel::begin(){
@@ -160,14 +148,10 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){
}
Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
- Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack) : parent(_parent),
- tag(_tag),
- queue(_queue),
- connection(_connection),
- ackExpected(ack),
- blocked(false){
-}
+ Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack
+) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
+ ackExpected(ack), blocked(false) {}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(!connection || connection != msg->getPublisher()){//check for no_local
@@ -182,12 +166,18 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
return false;
}
+Channel::ConsumerImpl::~ConsumerImpl() {
+ cancel();
+}
+
void Channel::ConsumerImpl::cancel(){
- if(queue) queue->cancel(this);
+ if(queue)
+ queue->cancel(this);
}
void Channel::ConsumerImpl::requestDispatch(){
- if(blocked) queue->dispatch();
+ if(blocked)
+ queue->dispatch();
}
void Channel::handleInlineTransfer(Message::shared_ptr msg)
@@ -196,11 +186,15 @@ void Channel::handleInlineTransfer(Message::shared_ptr msg)
connection.broker.getExchanges().get(msg->getExchange());
if(transactional){
TxPublish* deliverable = new TxPublish(msg);
- exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ exchange->route(
+ *deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
txBuffer.enlist(new DeletingTxOp(deliverable));
}else{
DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ exchange->route(
+ deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
}
}
@@ -244,7 +238,8 @@ void Channel::ack(){
ack(getRequestInProgress(), false);
}
-void Channel::ack(u_int64_t deliveryTag, bool multiple){
+void Channel::ack(u_int64_t deliveryTag, bool multiple)
+{
if(transactional){
accumulatedAck.update(deliveryTag, multiple);
//TODO: I think the outstanding prefetch size & count should be updated at this point...
@@ -271,9 +266,8 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
//if the prefetch limit had previously been reached, there may
//be messages that can be now be delivered
- for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
- j->second->requestDispatch();
- }
+ std::for_each(consumers.begin(), consumers.end(),
+ boost::bind(&ConsumerImpl::requestDispatch, _1));
}
}
@@ -328,8 +322,8 @@ void Channel::handleMethodInContext(
method->invoke(*adapter, context);
}
}catch(ChannelException& e){
- connection.client->getChannel().close(
- context, e.code, e.toString(),
+ adapter->getProxy().getChannel().close(
+ e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
connection.closeChannel(getId());
}catch(ConnectionException& e){
@@ -338,4 +332,3 @@ void Channel::handleMethodInContext(
connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
}
-
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 58c4f0a45b..538e86b0a8 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -23,10 +23,10 @@
*/
#include <list>
-#include <map>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/ptr_container/ptr_map.hpp>
#include <AccumulatedAck.h>
#include <Consumer.h>
@@ -56,7 +56,7 @@ using framing::string;
class Channel : public framing::ChannelAdapter,
public CompletionHandler
{
- class ConsumerImpl : public virtual Consumer
+ class ConsumerImpl : public Consumer
{
Channel* parent;
const string tag;
@@ -64,23 +64,25 @@ class Channel : public framing::ChannelAdapter,
ConnectionToken* const connection;
const bool ackExpected;
bool blocked;
+
public:
ConsumerImpl(Channel* parent, const string& tag,
Queue::shared_ptr queue,
ConnectionToken* const connection, bool ack);
+ ~ConsumerImpl();
virtual bool deliver(Message::shared_ptr& msg);
void cancel();
void requestDispatch();
};
- typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+ typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
Connection& connection;
u_int16_t id;
u_int64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
bool transactional;
- std::map<string, ConsumerImpl*> consumers;
+ ConsumerImplMap consumers;
u_int32_t prefetchSize;
u_int16_t prefetchCount;
Prefetch outstanding;
@@ -93,18 +95,17 @@ class Channel : public framing::ChannelAdapter,
MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
bool opened;
-
boost::scoped_ptr<BrokerAdapter> adapter;
// completion handler for MessageBuilder
void complete(Message::shared_ptr msg);
- void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
- void cancel(consumer_iterator consumer);
+ void deliver(Message::shared_ptr& msg, const string& tag,
+ Queue::shared_ptr& queue, bool ackExpected);
bool checkPrefetch(Message::shared_ptr& msg);
public:
- Channel(Connection& channel,
+ Channel(Connection& parent,
framing::ChannelId id,
u_int32_t framesize,
MessageStore* const _store = 0,
@@ -112,8 +113,8 @@ class Channel : public framing::ChannelAdapter,
~Channel();
- // For ChannelAdapter
bool isOpen() const { return opened; }
+ BrokerAdapter& getAdatper() { return *adapter; }
void open() { opened = true; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
@@ -122,7 +123,11 @@ class Channel : public framing::ChannelAdapter,
u_int16_t setPrefetchCount(u_int16_t n){ return prefetchCount = n; }
bool exists(const string& consumerTag);
- void consume(string& tag, Queue::shared_ptr queue, bool acks,
+
+ /**
+ *@param tagInOut - if empty it is updated with the generated token.
+ */
+ void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection = 0,
const framing::FieldTable* = 0);
void cancel(const string& tag);
@@ -146,7 +151,6 @@ class Channel : public framing::ChannelAdapter,
void handleMethodInContext(
boost::shared_ptr<framing::AMQMethodBody> method,
const framing::MethodContext& context);
-
};
struct InvalidAckException{};
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index e8a993942a..bff4492a49 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -28,6 +28,9 @@
#include <MessageStore.h>
#include <BasicDeliverBody.h>
#include <BasicGetOkBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeaderBody.h>
+#include "AMQMethodBody.h"
#include "AMQFrame.h"
#include "framing/ChannelAdapter.h"
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index 9e77eab446..871514e55f 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -22,12 +22,10 @@
*
*/
-#include <BrokerMessageBase.h>
#include <memory>
#include <boost/shared_ptr.hpp>
-#include <AMQContentBody.h>
-#include <AMQHeaderBody.h>
-#include "AMQMethodBody.h"
+
+#include <BrokerMessageBase.h>
#include <BasicHeaderProperties.h>
#include <ConnectionToken.h>
#include <Content.h>
@@ -39,6 +37,7 @@ namespace qpid {
namespace framing {
class MethodContext;
class ChannelAdapter;
+class AMQHeaderBody;
}
namespace broker {
@@ -52,7 +51,7 @@ using framing::string;
* request.
*/
class BasicMessage : public Message {
- framing::AMQHeaderBody::shared_ptr header;
+ boost::shared_ptr<framing::AMQHeaderBody> header;
std::auto_ptr<Content> content;
sys::Mutex contentLock;
u_int64_t size;
@@ -65,10 +64,10 @@ class BasicMessage : public Message {
BasicMessage(const ConnectionToken* const publisher,
const string& exchange, const string& routingKey,
bool mandatory, bool immediate,
- framing::AMQMethodBody::shared_ptr respondTo);
+ boost::shared_ptr<framing::AMQMethodBody> respondTo);
BasicMessage();
~BasicMessage();
- void setHeader(framing::AMQHeaderBody::shared_ptr header);
+ void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header);
void addContent(framing::AMQContentBody::shared_ptr data);
bool isComplete();
diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h
index 41a0cd45fa..3bba95a5f8 100644
--- a/cpp/lib/broker/BrokerMessageBase.h
+++ b/cpp/lib/broker/BrokerMessageBase.h
@@ -22,14 +22,10 @@
*
*/
-#include "AMQContentBody.h"
-#include "AMQHeaderBody.h"
-#include "AMQMethodBody.h"
-#include "Content.h"
-#include "framing/amqp_types.h"
-
#include <string>
#include <boost/shared_ptr.hpp>
+#include "Content.h"
+#include "framing/amqp_types.h"
namespace qpid {
@@ -38,6 +34,9 @@ class MethodContext;
class ChannelAdapter;
class BasicHeaderProperties;
class FieldTable;
+class AMQMethodBody;
+class AMQContentBody;
+class AMQHeaderBody;
}
@@ -50,24 +49,17 @@ class MessageStore;
* abstracting away the operations
* TODO; AMS: for the moment this is mostly a placeholder
*/
-class Message{
- const ConnectionToken* publisher;
- std::string exchange;
- std::string routingKey;
- const bool mandatory;
- const bool immediate;
- u_int64_t persistenceId;
- bool redelivered;
- framing::AMQMethodBody::shared_ptr respondTo;
-
+class Message {
public:
typedef boost::shared_ptr<Message> shared_ptr;
+ typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr;
+
Message(const ConnectionToken* publisher_,
const std::string& _exchange,
const std::string& _routingKey,
bool _mandatory, bool _immediate,
- framing::AMQMethodBody::shared_ptr respondTo_) :
+ AMQMethodBodyPtr respondTo_) :
publisher(publisher_),
exchange(_exchange),
routingKey(_routingKey),
@@ -92,9 +84,7 @@ class Message{
const std::string& getExchange() const { return exchange; }
u_int64_t getPersistenceId() const { return persistenceId; }
bool getRedelivered() const { return redelivered; }
- framing::AMQMethodBody::shared_ptr getRespondTo() const {
- return respondTo;
- }
+ AMQMethodBodyPtr getRespondTo() const { return respondTo; }
void setRouting(const std::string& _exchange, const std::string& _routingKey)
{ exchange = _exchange; routingKey = _routingKey; }
@@ -168,14 +158,24 @@ class Message{
* it uses).
*/
virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(framing::AMQHeaderBody::shared_ptr /*header*/) {};
- virtual void addContent(framing::AMQContentBody::shared_ptr /*data*/) {};
+ virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {};
+ virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {};
/**
* Releases the in-memory content data held by this
* message. Must pass in a store from which the data can
* be reloaded.
*/
virtual void releaseContent(MessageStore* /*store*/) {};
+
+ private:
+ const ConnectionToken* publisher;
+ std::string exchange;
+ std::string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ u_int64_t persistenceId;
+ bool redelivered;
+ AMQMethodBodyPtr respondTo;
};
}}
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index 99c045c59a..789e652947 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -149,7 +149,9 @@ void Queue::consume(Consumer* c, bool requestExclusive){
void Queue::cancel(Consumer* c){
Mutex::ScopedLock locker(lock);
- consumers.erase(find(consumers.begin(), consumers.end(), c));
+ Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
+ if (i != consumers.end())
+ consumers.erase(i);
if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC;
if(exclusive == c) exclusive = 0;
}
diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h
index 40fa4bd415..015b27fe76 100644
--- a/cpp/lib/broker/BrokerQueue.h
+++ b/cpp/lib/broker/BrokerQueue.h
@@ -53,13 +53,17 @@ namespace qpid {
* or more consumers registers.
*/
class Queue{
+ typedef std::vector<Consumer*> Consumers;
+ typedef std::queue<Binding*> Bindings;
+ typedef std::queue<Message::shared_ptr> Messages;
+
const string name;
const u_int32_t autodelete;
MessageStore* const store;
const ConnectionToken* const owner;
- std::vector<Consumer*> consumers;
- std::queue<Binding*> bindings;
- std::queue<Message::shared_ptr> messages;
+ Consumers consumers;
+ Bindings bindings;
+ Messages messages;
bool queueing;
bool dispatching;
int next;
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp
index 000199a65e..3d9e5cdaf8 100644
--- a/cpp/lib/broker/Connection.cpp
+++ b/cpp/lib/broker/Connection.cpp
@@ -22,6 +22,9 @@
#include <assert.h>
#include "Connection.h"
+#include "BrokerChannel.h"
+#include "AMQP_ClientProxy.h"
+#include "BrokerAdapter.h"
using namespace boost;
using namespace qpid::sys;
@@ -33,12 +36,15 @@ namespace broker {
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
broker(broker_),
- settings(broker.getTimeout(), broker.getStagingThreshold()),
out(out_),
framemax(65536),
- heartbeat(0)
+ heartbeat(0),
+ client(0),
+ timeout(broker.getTimeout()),
+ stagingThreshold(broker.getStagingThreshold())
{}
+
Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){
Queue::shared_ptr queue;
if (name.empty()) {
@@ -59,31 +65,27 @@ Exchange::shared_ptr Connection::findExchange(const string& name){
}
-void Connection::received(qpid::framing::AMQFrame* frame){
+void Connection::received(framing::AMQFrame* frame){
getChannel(frame->getChannel()).handleBody(frame->getBody());
}
-void Connection::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId){
- client->getConnection().close(MethodContext(&getChannel(0)), code, text, classId, methodId);
+void Connection::close(
+ ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+ client->close(code, text, classId, methodId);
getOutput().close();
}
-// TODO aconway 2007-02-02: Should be delegated to the BrokerAdapter
-// as it is part of the protocol.
-void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
- if (client.get())
- // TODO aconway 2007-01-16: correct error code.
- throw ConnectionException(0, "Connection initiated twice");
- client.reset(new qpid::framing::AMQP_ClientProxy(
- out, header->getMajor(), header->getMinor()));
+void Connection::initiated(framing::ProtocolInitiation* header) {
+ version = ProtocolVersion(header->getMajor(), header->getMinor());
FieldTable properties;
string mechanisms("PLAIN");
string locales("en_US");
- client->getConnection().start(
- MethodContext(&getChannel(0)),
+ getChannel(0).init(0, *out, getVersion());
+ client = &getChannel(0).getAdatper().getProxy().getConnection();
+ client->start(
header->getMajor(), header->getMinor(),
properties, mechanisms, locales);
- getChannel(0).init(0, *out, client->getProtocolVersion());
}
void Connection::idleOut(){}
@@ -103,9 +105,10 @@ void Connection::closed(){
}
}
-void Connection::closeChannel(u_int16_t channel) {
- getChannel(channel).close();
- channels.erase(channels.find(channel));
+void Connection::closeChannel(u_int16_t id) {
+ ChannelMap::iterator i = channels.find(id);
+ if (i != channels.end())
+ i->close();
}
@@ -115,7 +118,7 @@ Channel& Connection::getChannel(ChannelId id) {
i = channels.insert(
id, new Channel(
*this, id, framemax, broker.getQueues().getStore(),
- settings.stagingThreshold)).first;
+ broker.getStagingThreshold())).first;
}
return *i;
}
diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h
index 4f1156dd01..27faab4967 100644
--- a/cpp/lib/broker/Connection.h
+++ b/cpp/lib/broker/Connection.h
@@ -27,66 +27,64 @@
#include <boost/ptr_container/ptr_map.hpp>
#include <AMQFrame.h>
-#include <AMQP_ClientProxy.h>
#include <AMQP_ServerOperations.h>
+#include <AMQP_ClientProxy.h>
#include <sys/ConnectionOutputHandler.h>
#include <sys/ConnectionInputHandler.h>
#include <sys/TimeoutHandler.h>
+#include "framing/ProtocolVersion.h"
#include "Broker.h"
#include "Exception.h"
+#include "BrokerChannel.h"
namespace qpid {
namespace broker {
-class Settings {
- public:
- const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
- const u_int64_t stagingThreshold;
-
- Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {}
-};
+class Channel;
class Connection : public sys::ConnectionInputHandler,
public ConnectionToken
{
public:
Connection(sys::ConnectionOutputHandler* out, Broker& broker);
- // ConnectionInputHandler methods
- void received(framing::AMQFrame* frame);
- void initiated(framing::ProtocolInitiation* header);
- void idleOut();
- void idleIn();
- void closed();
- sys::ConnectionOutputHandler& getOutput() { return *out; }
+ /** Get a channel. Create if it does not already exist */
+ Channel& getChannel(framing::ChannelId channel);
- const framing::ProtocolVersion& getVersion() {
- return client->getProtocolVersion(); }
+ /** Close a channel */
+ void closeChannel(framing::ChannelId channel);
+
+ /** Close the connection */
+ void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+
+ sys::ConnectionOutputHandler& getOutput() const { return *out; }
+ framing::ProtocolVersion getVersion() const { return version; }
u_int32_t getFrameMax() const { return framemax; }
u_int16_t getHeartbeat() const { return heartbeat; }
+ u_int32_t getTimeout() const { return timeout; }
+ u_int64_t getStagingThreshold() const { return stagingThreshold; }
void setFrameMax(u_int32_t fm) { framemax = fm; }
void setHeartbeat(u_int16_t hb) { heartbeat = hb; }
-
- Broker& broker;
- std::auto_ptr<framing::AMQP_ClientProxy> client;
- Settings settings;
-
- std::vector<Queue::shared_ptr> exclusiveQueues;
-
+
/**
* Get named queue, never returns 0.
* @return: named queue or default queue for channel if name=""
* @exception: ChannelException if no queue of that name is found.
- * @exception: ConnectionException if no queue specified and channel has not declared one.
+ * @exception: ConnectionException if name="" and channel has no default.
*/
Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
- Channel& newChannel(framing::ChannelId channel);
- Channel& getChannel(framing::ChannelId channel);
- void closeChannel(framing::ChannelId channel);
- void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+ Broker& broker;
+ std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ // ConnectionInputHandler methods
+ void received(framing::AMQFrame* frame);
+ void initiated(framing::ProtocolInitiation* header);
+ void idleOut();
+ void idleIn();
+ void closed();
private:
typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap;
@@ -94,10 +92,15 @@ class Connection : public sys::ConnectionInputHandler,
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
Exchange::shared_ptr findExchange(const string& name);
+ framing::ProtocolVersion version;
ChannelMap channels;
sys::ConnectionOutputHandler* out;
u_int32_t framemax;
u_int16_t heartbeat;
+ framing::AMQP_ClientProxy::Connection* client;
+ const u_int32_t timeout; //timeout for auto-deleted queues (in ms)
+ const u_int64_t stagingThreshold;
+
};
}}
diff --git a/cpp/lib/broker/HandlerImpl.h b/cpp/lib/broker/HandlerImpl.h
new file mode 100644
index 0000000000..c55a36da45
--- /dev/null
+++ b/cpp/lib/broker/HandlerImpl.h
@@ -0,0 +1,71 @@
+#ifndef _broker_HandlerImpl_h
+#define _broker_HandlerImpl_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "BrokerChannel.h"
+#include "AMQP_ClientProxy.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
+namespace broker {
+
+class Broker;
+class Channel;
+class Connection;
+
+/**
+ * A collection of references to the core objects required by an adapter,
+ * and a client proxy.
+ */
+struct CoreRefs
+{
+ CoreRefs(Channel& ch, Connection& c, Broker& b)
+ : channel(ch), connection(c), broker(b), proxy(ch) {}
+
+ Channel& channel;
+ Connection& connection;
+ Broker& broker;
+ framing::AMQP_ClientProxy proxy;
+};
+
+
+/**
+ * Base template for protocol handler implementations.
+ * Provides the core references and appropriate AMQP class proxy.
+ */
+template <class ProxyType>
+struct HandlerImpl : public CoreRefs {
+ typedef HandlerImpl<ProxyType> HandlerImplType;
+ HandlerImpl(CoreRefs& parent)
+ : CoreRefs(parent), client(ProxyType::get(proxy)) {}
+ ProxyType client;
+};
+
+
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_HandlerImpl_h*/
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 797e3fbbf9..0853aebcb1 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -25,16 +25,15 @@
#include "BrokerMessageMessage.h"
#include "MessageAppendBody.h"
#include "MessageTransferBody.h"
+#include "BrokerAdapter.h"
namespace qpid {
namespace broker {
using namespace framing;
-MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b), references(ch),
- client(connection.client->getMessage())
-{}
+MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
+ : HandlerImplType(parent), references(channel) {}
//
// Message class method handlers
@@ -47,7 +46,7 @@ MessageHandlerImpl::append(const MethodContext& context,
references.get(reference).append(
boost::shared_polymorphic_downcast<MessageAppendBody>(
context.methodBody));
- client.ok(context);
+ client.ok(context.getRequestId());
}
@@ -56,7 +55,7 @@ MessageHandlerImpl::cancel(const MethodContext& context,
const string& destination )
{
channel.cancel(destination);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -73,7 +72,7 @@ MessageHandlerImpl::close(const MethodContext& context,
const string& reference)
{
references.get(reference).close();
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -84,7 +83,7 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool noLocal,
bool noAck,
bool exclusive,
- const qpid::framing::FieldTable& filter )
+ const framing::FieldTable& filter )
{
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!destination.empty() && channel.exists(destination))
@@ -93,7 +92,7 @@ MessageHandlerImpl::consume(const MethodContext& context,
channel.consume(
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
- client.ok(context);
+ client.ok(context.getRequestId());
// Dispatch messages as there is now a consumer.
queue->dispatch();
}
@@ -117,9 +116,9 @@ MessageHandlerImpl::get( const MethodContext& context,
connection.getQueue(queueName, context.channel->getId());
if(channel.get(queue, destination, !noAck))
- client.ok(context);
+ client.ok(context.getRequestId());
else
- client.empty(context);
+ client.empty(context.getRequestId());
}
void
@@ -141,7 +140,7 @@ MessageHandlerImpl::open(const MethodContext& context,
const string& reference)
{
references.open(reference);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -153,7 +152,7 @@ MessageHandlerImpl::qos(const MethodContext& context,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -161,7 +160,7 @@ MessageHandlerImpl::recover(const MethodContext& context,
bool requeue)
{
channel.recover(requeue);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -204,8 +203,8 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*appId*/,
const string& /*transactionId*/,
const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content body,
+ const framing::FieldTable& /*applicationHeaders*/,
+ const framing::Content& body,
bool /*mandatory*/)
{
MessageTransferBody::shared_ptr transfer(
@@ -218,7 +217,7 @@ MessageHandlerImpl::transfer(const MethodContext& context,
channel.handleInlineTransfer(message);
else
references.get(body.getValue()).addMessage(message);
- client.ok(context);
+ client.ok(context.getRequestId());
}
diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h
index 0fef45bb19..cb7e7e3126 100644
--- a/cpp/lib/broker/MessageHandlerImpl.h
+++ b/cpp/lib/broker/MessageHandlerImpl.h
@@ -24,7 +24,7 @@
#include "AMQP_ServerOperations.h"
#include "AMQP_ClientProxy.h"
#include "Reference.h"
-#include "BrokerChannel.h"
+#include "HandlerImpl.h"
namespace qpid {
namespace broker {
@@ -34,10 +34,11 @@ class Broker;
class MessageMessage;
class MessageHandlerImpl :
- public framing::AMQP_ServerOperations::MessageHandler
+ public framing::AMQP_ServerOperations::MessageHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Message>
{
public:
- MessageHandlerImpl(Channel& ch, Connection& c, Broker& b);
+ MessageHandlerImpl(CoreRefs& parent);
void append(const framing::MethodContext&,
const std::string& reference,
@@ -116,14 +117,10 @@ class MessageHandlerImpl :
const std::string& transactionId,
const std::string& securityToken,
const framing::FieldTable& applicationHeaders,
- framing::Content body,
+ const framing::Content& body,
bool mandatory );
private:
- Channel& channel;
- Connection& connection;
- Broker& broker;
ReferenceRegistry references;
- framing::AMQP_ClientProxy::Message& client;
};
}} // namespace qpid::broker
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index dd93c6ae8b..52910f5161 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -24,6 +24,7 @@
#include <QpidError.h>
#include <MethodBodyInstances.h>
#include "Connection.h"
+#include "AMQP_ServerProxy.h"
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -48,12 +49,23 @@ Channel::~Channel(){
close();
}
+AMQP_ServerProxy& Channel::brokerProxy() {
+ assert(proxy.get());
+ return *proxy;
+}
+
+AMQMethodBody::shared_ptr Channel::brokerResponse() {
+ // FIXME aconway 2007-02-08: implement responses.
+ return AMQMethodBody::shared_ptr();
+}
+
void Channel::open(ChannelId id, Connection& con)
{
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
connection = &con;
init(id, con, con.getVersion()); // ChannelAdapter initialization.
+ proxy.reset(new AMQP_ServerProxy(*this));
string oob;
if (id != 0)
sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob));
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index a34c95d2c4..1c082f3b59 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -24,6 +24,7 @@
#include <map>
#include <string>
#include <queue>
+#include <boost/scoped_ptr.hpp>
#include "sys/types.h"
#include <framing/amqp_framing.h>
@@ -39,8 +40,10 @@
#include "Thread.h"
namespace qpid {
+
namespace framing {
class ChannelCloseBody;
+class AMQP_ServerProxy;
}
namespace client {
@@ -102,6 +105,7 @@ class Channel : public framing::ChannelAdapter,
u_int16_t prefetch;
const bool transactional;
framing::ProtocolVersion version;
+ boost::scoped_ptr<framing::AMQP_ServerProxy> proxy;
void enqueue();
void retrieve(Message& msg);
@@ -151,8 +155,6 @@ class Channel : public framing::ChannelAdapter,
public:
- bool isOpen() const;
-
/**
* Creates a channel object.
*
@@ -358,9 +360,21 @@ class Channel : public framing::ChannelAdapter,
* @see publish()
*/
void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ bool isOpen() const;
+
+ /**
+ * Returns a proxy for the "raw" AMQP broker protocol. Only for use by
+ * protocol experts.
+ */
+
+ framing::AMQP_ServerProxy& brokerProxy();
+ /**
+ * Wait for the next method from the broker.
+ */
+ framing::AMQMethodBody::shared_ptr brokerResponse();
};
-}
-}
+}}
#endif /*!_client_ClientChannel_h*/
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index 0fafd29b90..2f91c44a22 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -43,7 +43,7 @@ const std::string Connection::OK("OK");
Connection::Connection(
bool _debug, u_int32_t _max_frame_size,
- const framing::ProtocolVersion& _version
+ framing::ProtocolVersion _version
) : version(_version), max_frame_size(_max_frame_size),
defaultConnector(version, _debug, _max_frame_size),
isOpen(false), debug(_debug)
diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h
index 2f9b35d5ef..275e02a105 100644
--- a/cpp/lib/client/Connection.h
+++ b/cpp/lib/client/Connection.h
@@ -96,12 +96,12 @@ class Connection : public ConnectionForChannel
Connector* connector;
framing::OutputHandler* out;
volatile bool isOpen;
+ Channel channel0;
+ bool debug;
void erase(framing::ChannelId);
void channelException(
Channel&, framing::AMQMethodBody*, const QpidError&);
- Channel channel0;
- bool debug;
// TODO aconway 2007-01-26: too many friendships, untagle these classes.
friend class Channel;
@@ -120,9 +120,8 @@ class Connection : public ConnectionForChannel
* @param max_frame_size the maximum frame size that the
* client will accept. Optional and defaults to 65536.
*/
- Connection(
- bool debug = false, u_int32_t max_frame_size = 65536,
- const framing::ProtocolVersion& = framing::highestProtocolVersion);
+ Connection(bool debug = false, u_int32_t max_frame_size = 65536,
+ framing::ProtocolVersion=framing::highestProtocolVersion);
~Connection();
/**
@@ -185,7 +184,7 @@ class Connection : public ConnectionForChannel
inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
/** @return protocol version in use on this connection. */
- const framing::ProtocolVersion& getVersion() const { return version; }
+ framing::ProtocolVersion getVersion() const { return version; }
};
}} // namespace qpid::client
diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp
index 425cecaf6f..657ee77f1a 100644
--- a/cpp/lib/client/Connector.cpp
+++ b/cpp/lib/client/Connector.cpp
@@ -23,17 +23,19 @@
#include <sys/Time.h>
#include "Connector.h"
+namespace qpid {
+namespace client {
+
using namespace qpid::sys;
-using namespace qpid::client;
using namespace qpid::framing;
using qpid::QpidError;
-Connector::Connector(const qpid::framing::ProtocolVersion& pVersion,
- bool _debug, u_int32_t buffer_size) :
- debug(_debug),
+Connector::Connector(
+ ProtocolVersion ver, bool _debug, u_int32_t buffer_size
+) : debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
- version(pVersion),
+ version(ver),
closed(true),
lastIn(0), lastOut(0),
timeout(0),
@@ -180,3 +182,5 @@ void Connector::run(){
handleClosed();
}
}
+
+}} // namespace qpid::client
diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h
index 1126e861e0..ccac39f849 100644
--- a/cpp/lib/client/Connector.h
+++ b/cpp/lib/client/Connector.h
@@ -77,7 +77,7 @@ class Connector : public framing::OutputHandler,
friend class Channel;
public:
- Connector(const framing::ProtocolVersion& pVersion,
+ Connector(framing::ProtocolVersion pVersion,
bool debug = false, u_int32_t buffer_size = 1024);
virtual ~Connector();
virtual void connect(const std::string& host, int port);
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am
index eefff79d6f..c44480bddf 100644
--- a/cpp/lib/common/Makefile.am
+++ b/cpp/lib/common/Makefile.am
@@ -63,6 +63,7 @@ libqpidcommon_la_SOURCES = \
$(framing)/AMQHeaderBody.cpp \
$(framing)/AMQHeartbeatBody.cpp \
$(framing)/AMQMethodBody.cpp \
+ $(framing)/MethodContext.cpp \
$(framing)/BasicHeaderProperties.cpp \
$(framing)/BodyHandler.cpp \
$(framing)/ChannelAdapter.cpp \
@@ -76,8 +77,8 @@ libqpidcommon_la_SOURCES = \
$(framing)/Requester.cpp \
$(framing)/Responder.cpp \
$(framing)/Value.cpp \
+ $(framing)/Proxy.cpp \
$(gen)/AMQP_ClientProxy.cpp \
- $(gen)/AMQP_HighestVersion.h \
$(gen)/AMQP_MethodVersionMap.cpp \
$(gen)/AMQP_ServerProxy.cpp \
Exception.cpp \
@@ -87,6 +88,7 @@ libqpidcommon_la_SOURCES = \
sys/Time.cpp
nobase_pkginclude_HEADERS = \
+ $(gen)/AMQP_HighestVersion.h \
$(platform_hdr) \
$(framing)/AMQBody.h \
$(framing)/AMQContentBody.h \
@@ -95,6 +97,7 @@ nobase_pkginclude_HEADERS = \
$(framing)/AMQHeaderBody.h \
$(framing)/AMQHeartbeatBody.h \
$(framing)/AMQMethodBody.h \
+ $(framing)/MethodContext.h \
$(framing)/BasicHeaderProperties.h \
$(framing)/BodyHandler.h \
$(framing)/ChannelAdapter.h \
@@ -111,6 +114,7 @@ nobase_pkginclude_HEADERS = \
$(framing)/Value.h \
$(framing)/amqp_framing.h \
$(framing)/amqp_types.h \
+ $(framing)/Proxy.h \
Exception.h \
ExceptionHolder.h \
QpidError.h \
@@ -121,9 +125,9 @@ nobase_pkginclude_HEADERS = \
sys/Monitor.h \
sys/Mutex.h \
sys/Runnable.h \
- sys/ConnectionOutputHandler.h \
- sys/ConnectionInputHandler.h \
- sys/ConnectionInputHandlerFactory.h \
+ sys/ConnectionOutputHandler.h \
+ sys/ConnectionInputHandler.h \
+ sys/ConnectionInputHandlerFactory.h \
sys/ShutdownHandler.h \
sys/Socket.h \
sys/Thread.h \
diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp
index 9c5e295e22..4e061af2e1 100644
--- a/cpp/lib/common/framing/AMQFrame.cpp
+++ b/cpp/lib/common/framing/AMQFrame.cpp
@@ -26,19 +26,24 @@
#include "AMQRequestBody.h"
#include "AMQResponseBody.h"
-using namespace qpid::framing;
+
+namespace qpid {
+namespace framing {
+
AMQP_MethodVersionMap AMQFrame::versionMap;
-AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version):
+AMQFrame::AMQFrame(ProtocolVersion _version):
version(_version)
- {}
+ {
+ assert(version != ProtocolVersion(0,0));
+ }
-AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) :
+AMQFrame::AMQFrame(ProtocolVersion _version, u_int16_t _channel, AMQBody* _body) :
version(_version), channel(_channel), body(_body)
{}
-AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) :
+AMQFrame::AMQFrame(ProtocolVersion _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) :
version(_version), channel(_channel), body(_body)
{}
@@ -119,7 +124,7 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size)
body->decode(buffer, size);
}
-std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t)
+std::ostream& operator<<(std::ostream& out, const AMQFrame& t)
{
out << "Frame[channel=" << t.channel << "; ";
if (t.body.get() == 0)
@@ -130,3 +135,5 @@ std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t)
return out;
}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
index 15b294a373..4e49b8871f 100644
--- a/cpp/lib/common/framing/AMQFrame.h
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -41,9 +41,9 @@ namespace framing {
class AMQFrame : public AMQDataBlock
{
public:
- AMQFrame(const qpid::framing::ProtocolVersion& _version = highestProtocolVersion);
- AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body);
- AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body);
+ AMQFrame(ProtocolVersion _version = highestProtocolVersion);
+ AMQFrame(ProtocolVersion _version, u_int16_t channel, AMQBody* body);
+ AMQFrame(ProtocolVersion _version, u_int16_t channel, const AMQBody::shared_ptr& body);
virtual ~AMQFrame();
virtual void encode(Buffer& buffer);
virtual bool decode(Buffer& buffer);
@@ -62,7 +62,7 @@ class AMQFrame : public AMQDataBlock
private:
static AMQP_MethodVersionMap versionMap;
- qpid::framing::ProtocolVersion version;
+ ProtocolVersion version;
u_int16_t channel;
u_int8_t type;
diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp
index 53ab30faa0..40241660f2 100644
--- a/cpp/lib/common/framing/ChannelAdapter.cpp
+++ b/cpp/lib/common/framing/ChannelAdapter.cpp
@@ -19,6 +19,7 @@
#include "ChannelAdapter.h"
#include "AMQFrame.h"
+#include "Exception.h"
using boost::format;
@@ -26,7 +27,7 @@ namespace qpid {
namespace framing {
void ChannelAdapter::init(
- ChannelId i, OutputHandler& o, const ProtocolVersion& v)
+ ChannelId i, OutputHandler& o, ProtocolVersion v)
{
assertChannelNotOpen();
id = i;
@@ -34,13 +35,15 @@ void ChannelAdapter::init(
version = v;
}
-void ChannelAdapter::send(AMQBody::shared_ptr body) {
+RequestId ChannelAdapter::send(AMQBody::shared_ptr body) {
+ RequestId result = 0;
assertChannelOpen();
switch (body->type()) {
case REQUEST_BODY: {
AMQRequestBody::shared_ptr request =
boost::shared_polymorphic_downcast<AMQRequestBody>(body);
requester.sending(request->getData());
+ result = request->getData().requestId;
break;
}
case RESPONSE_BODY: {
@@ -51,6 +54,7 @@ void ChannelAdapter::send(AMQBody::shared_ptr body) {
}
}
out->send(new AMQFrame(getVersion(), getId(), body));
+ return result;
}
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h
index c2eba2f4e9..9f654d9a5b 100644
--- a/cpp/lib/common/framing/ChannelAdapter.h
+++ b/cpp/lib/common/framing/ChannelAdapter.h
@@ -35,9 +35,8 @@ namespace framing {
class MethodContext;
/**
- * Base class for client and broker channel adapters.
+ * Base class for client and broker channels.
*
- * BodyHandler::handl*
* - receives frame bodies from the network.
* - Updates request/response data.
* - Dispatches requests with a MethodContext for responses.
@@ -55,21 +54,21 @@ class ChannelAdapter : public BodyHandler {
*@param output Processed frames are forwarded to this handler.
*/
ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0,
- const ProtocolVersion& ver=ProtocolVersion())
+ ProtocolVersion ver=ProtocolVersion())
: id(id_), out(out_), version(ver) {}
/** Initialize the channel adapter. */
- void init(ChannelId, OutputHandler&, const ProtocolVersion&);
+ void init(ChannelId, OutputHandler&, ProtocolVersion);
ChannelId getId() const { return id; }
- const ProtocolVersion& getVersion() const { return version; }
+ ProtocolVersion getVersion() const { return version; }
/**
* Wrap body in a frame and send the frame.
* Takes ownership of body.
*/
- void send(AMQBody::shared_ptr body);
- void send(AMQBody* body) { send(AMQBody::shared_ptr(body)); }
+ RequestId send(AMQBody::shared_ptr body);
+ RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); }
void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>);
@@ -95,7 +94,7 @@ class ChannelAdapter : public BodyHandler {
ProtocolVersion version;
Requester requester;
Responder responder;
- RequestId requestInProgress; // TODO aconway 2007-01-24: use it.
+ RequestId requestInProgress;
};
}}
diff --git a/cpp/lib/common/framing/FramingContent.h b/cpp/lib/common/framing/FramingContent.h
index 0f4a4b8f64..cffd46ee24 100644
--- a/cpp/lib/common/framing/FramingContent.h
+++ b/cpp/lib/common/framing/FramingContent.h
@@ -27,9 +27,9 @@ class Content
void encode(Buffer& buffer) const;
void decode(Buffer& buffer);
size_t size() const;
- bool isInline() { return discriminator == INLINE; }
- bool isReference() { return discriminator == REFERENCE; }
- const string& getValue() { return value; }
+ bool isInline() const { return discriminator == INLINE; }
+ bool isReference() const { return discriminator == REFERENCE; }
+ const string& getValue() const { return value; }
friend std::ostream& operator<<(std::ostream&, const Content&);
};
diff --git a/cpp/lib/common/framing/MethodContext.cpp b/cpp/lib/common/framing/MethodContext.cpp
new file mode 100644
index 0000000000..73af73f8e5
--- /dev/null
+++ b/cpp/lib/common/framing/MethodContext.cpp
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "MethodContext.h"
+#include "amqp_types.h"
+#include "AMQRequestBody.h"
+
+namespace qpid {
+namespace framing {
+
+RequestId MethodContext::getRequestId() const {
+ return boost::shared_polymorphic_downcast<AMQRequestBody>(methodBody)
+ ->getRequestId();
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h
index afb499023d..3493924bf6 100644
--- a/cpp/lib/common/framing/MethodContext.h
+++ b/cpp/lib/common/framing/MethodContext.h
@@ -24,8 +24,6 @@
#include "OutputHandler.h"
#include "ProtocolVersion.h"
-#include <boost/shared_ptr.hpp>
-
namespace qpid {
namespace framing {
@@ -61,6 +59,12 @@ struct MethodContext
* It's also provides the request ID when constructing a response.
*/
BodyPtr methodBody;
+
+ /**
+ * Return methodBody's request ID.
+ * It is an error to call this if methodBody is not a request.
+ */
+ RequestId getRequestId() const;
};
// FIXME aconway 2007-02-01: Method context only required on Handler
diff --git a/cpp/lib/common/framing/ProtocolInitiation.cpp b/cpp/lib/common/framing/ProtocolInitiation.cpp
index 471f736a7d..c119b79d6d 100644
--- a/cpp/lib/common/framing/ProtocolInitiation.cpp
+++ b/cpp/lib/common/framing/ProtocolInitiation.cpp
@@ -20,15 +20,18 @@
*/
#include <ProtocolInitiation.h>
-qpid::framing::ProtocolInitiation::ProtocolInitiation(){}
+namespace qpid {
+namespace framing {
-qpid::framing::ProtocolInitiation::ProtocolInitiation(u_int8_t _major, u_int8_t _minor) : version(_major, _minor) {}
+ProtocolInitiation::ProtocolInitiation(){}
-qpid::framing::ProtocolInitiation::ProtocolInitiation(const qpid::framing::ProtocolVersion& p) : version(p) {}
+ProtocolInitiation::ProtocolInitiation(u_int8_t _major, u_int8_t _minor) : version(_major, _minor) {}
-qpid::framing::ProtocolInitiation::~ProtocolInitiation(){}
+ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {}
-void qpid::framing::ProtocolInitiation::encode(Buffer& buffer){
+ProtocolInitiation::~ProtocolInitiation(){}
+
+void ProtocolInitiation::encode(Buffer& buffer){
buffer.putOctet('A');
buffer.putOctet('M');
buffer.putOctet('Q');
@@ -39,7 +42,7 @@ void qpid::framing::ProtocolInitiation::encode(Buffer& buffer){
buffer.putOctet(version.getMinor());
}
-bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){
+bool ProtocolInitiation::decode(Buffer& buffer){
if(buffer.available() >= 8){
buffer.getOctet();//A
buffer.getOctet();//M
@@ -56,3 +59,5 @@ bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){
}
//TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h
index 6b3dbac88d..fb5cb3abed 100644
--- a/cpp/lib/common/framing/ProtocolInitiation.h
+++ b/cpp/lib/common/framing/ProtocolInitiation.h
@@ -37,14 +37,14 @@ private:
public:
ProtocolInitiation();
ProtocolInitiation(u_int8_t major, u_int8_t minor);
- ProtocolInitiation(const ProtocolVersion& p);
+ ProtocolInitiation(ProtocolVersion p);
virtual ~ProtocolInitiation();
virtual void encode(Buffer& buffer);
virtual bool decode(Buffer& buffer);
inline virtual u_int32_t size() const { return 8; }
inline u_int8_t getMajor() const { return version.getMajor(); }
inline u_int8_t getMinor() const { return version.getMinor(); }
- inline const ProtocolVersion& getVersion() const { return version; }
+ inline ProtocolVersion getVersion() const { return version; }
};
}
diff --git a/cpp/lib/common/framing/ProtocolVersion.cpp b/cpp/lib/common/framing/ProtocolVersion.cpp
index e65c8b79b8..fd4b1a645f 100644
--- a/cpp/lib/common/framing/ProtocolVersion.cpp
+++ b/cpp/lib/common/framing/ProtocolVersion.cpp
@@ -20,37 +20,9 @@
*/
#include <ProtocolVersion.h>
#include <sstream>
-#include "AMQP_HighestVersion.h"
using namespace qpid::framing;
-ProtocolVersion::ProtocolVersion() {
- *this = highestProtocolVersion;
-}
-
-ProtocolVersion::ProtocolVersion(u_int8_t _major, u_int8_t _minor) :
- major_(_major),
- minor_(_minor)
-{}
-
-ProtocolVersion::ProtocolVersion(const ProtocolVersion::ProtocolVersion& p):
- major_(p.major_),
- minor_(p.minor_)
-{}
-
-ProtocolVersion::~ProtocolVersion()
-{}
-
-bool ProtocolVersion::equals(u_int8_t _major, u_int8_t _minor) const
-{
- return major_ == _major && minor_ == _minor;
-}
-
-bool ProtocolVersion::equals(const ProtocolVersion::ProtocolVersion& p) const
-{
- return major_ == p.major_ && minor_ == p.minor_;
-}
-
const std::string ProtocolVersion::toString() const
{
std::stringstream ss;
@@ -58,10 +30,15 @@ const std::string ProtocolVersion::toString() const
return ss.str();
}
-ProtocolVersion::ProtocolVersion ProtocolVersion::operator=(const ProtocolVersion& p)
+ProtocolVersion& ProtocolVersion::operator=(ProtocolVersion p)
{
major_ = p.major_;
minor_ = p.minor_;
return *this;
}
+bool ProtocolVersion::operator==(ProtocolVersion p) const
+{
+ return major_ == p.major_ && minor_ == p.minor_;
+}
+
diff --git a/cpp/lib/common/framing/ProtocolVersion.h b/cpp/lib/common/framing/ProtocolVersion.h
index 6aba87c6f5..aa526aa293 100644
--- a/cpp/lib/common/framing/ProtocolVersion.h
+++ b/cpp/lib/common/framing/ProtocolVersion.h
@@ -35,19 +35,19 @@ private:
u_int8_t minor_;
public:
- ProtocolVersion();
- ProtocolVersion(u_int8_t _major, u_int8_t _minor);
- ProtocolVersion(const ProtocolVersion& p);
- virtual ~ProtocolVersion();
-
- inline u_int8_t getMajor() const { return major_; }
- inline void setMajor(u_int8_t major) { major_ = major; }
- inline u_int8_t getMinor() const { return minor_; }
- inline void setMinor(u_int8_t minor) { minor_ = minor; }
- virtual bool equals(u_int8_t _major, u_int8_t _minor) const;
- virtual bool equals(const ProtocolVersion& p) const;
- virtual const std::string toString() const;
- ProtocolVersion operator=(const ProtocolVersion& p);
+ ProtocolVersion(u_int8_t _major=0, u_int8_t _minor=0)
+ : major_(_major), minor_(_minor) {}
+
+ u_int8_t getMajor() const { return major_; }
+ void setMajor(u_int8_t major) { major_ = major; }
+ u_int8_t getMinor() const { return minor_; }
+ void setMinor(u_int8_t minor) { minor_ = minor; }
+ const std::string toString() const;
+
+ ProtocolVersion& operator=(ProtocolVersion p);
+
+ bool operator==(ProtocolVersion p) const;
+ bool operator!=(ProtocolVersion p) const { return ! (*this == p); }
};
} // namespace framing
diff --git a/cpp/lib/common/framing/ProtocolVersionException.h b/cpp/lib/common/framing/ProtocolVersionException.h
index aff0cd91d6..8e2de8b843 100644
--- a/cpp/lib/common/framing/ProtocolVersionException.h
+++ b/cpp/lib/common/framing/ProtocolVersionException.h
@@ -40,7 +40,7 @@ public:
template <class T>
ProtocolVersionException(
- const ProtocolVersion& ver, const T& msg) throw () : versionFound(ver)
+ ProtocolVersion ver, const T& msg) throw () : versionFound(ver)
{ init(boost::lexical_cast<std::string>(msg)); }
template <class T>
diff --git a/cpp/lib/common/framing/Proxy.cpp b/cpp/lib/common/framing/Proxy.cpp
new file mode 100644
index 0000000000..0b2a882a49
--- /dev/null
+++ b/cpp/lib/common/framing/Proxy.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Proxy.h"
+#include "ChannelAdapter.h"
+#include "ProtocolVersion.h"
+
+namespace qpid {
+namespace framing {
+
+Proxy::~Proxy() {}
+
+ProtocolVersion Proxy::getProtocolVersion() const {
+ return channel.getVersion();
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/Proxy.h b/cpp/lib/common/framing/Proxy.h
new file mode 100644
index 0000000000..8ed46ed748
--- /dev/null
+++ b/cpp/lib/common/framing/Proxy.h
@@ -0,0 +1,51 @@
+#ifndef _framing_Proxy_h
+#define _framing_Proxy_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "ProtocolVersion.h"
+
+namespace qpid {
+namespace framing {
+
+class ChannelAdapter;
+class FieldTable;
+class Content;
+
+/**
+ * Base class for proxies.
+ */
+class Proxy
+{
+
+ public:
+ Proxy(ChannelAdapter& ch) : channel(ch) {}
+ virtual ~Proxy();
+
+ ProtocolVersion getProtocolVersion() const;
+
+ protected:
+ ChannelAdapter& channel;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_Proxy_h*/
diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h
index 777d9e7bc5..2f56cb877e 100644
--- a/cpp/lib/common/framing/amqp_types.h
+++ b/cpp/lib/common/framing/amqp_types.h
@@ -20,6 +20,12 @@
* under the License.
*
*/
+
+/** \file
+ * Type definitions and forward declarations of all types used to
+ * in AMQP messages.
+ */
+
#include <string>
#ifdef _WINDOWS
#include "windows.h"
@@ -44,5 +50,8 @@ typedef u_int16_t ClassId;
typedef u_int16_t MethodId;
typedef u_int16_t ReplyCode;
+// Types represented by classes.
+class Content;
+class FieldTable;
}} // namespace qpid::framing
#endif
diff --git a/cpp/lib/common/framing/amqp_types_full.h b/cpp/lib/common/framing/amqp_types_full.h
new file mode 100644
index 0000000000..6a24a99d38
--- /dev/null
+++ b/cpp/lib/common/framing/amqp_types_full.h
@@ -0,0 +1,36 @@
+#ifndef _framing_amqp_types_decl_h
+#define _framing_amqp_types_decl_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** \file
+ * Type definitions and full declarations of all types used to
+ * in AMQP messages.
+ *
+ * Its better to include amqp_types.h in another header instead of this file
+ * unless the header actually needs the full declarations. Including
+ * full declarations when forward declarations would do increases compile
+ * times.
+ */
+
+#include "amqp_types.h"
+#include "FramingContent.h"
+#include "FieldTable.h"
+
+#endif /*!_framing_amqp_types_decl_h*/