summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-02 22:03:10 +0000
committerAlan Conway <aconway@apache.org>2007-02-02 22:03:10 +0000
commitb5c270f10496f522ef6a03a8fa60f85d55c9187d (patch)
tree714e7abf7ba591d00232d821440e51461175cb9e /cpp
parent750f272ac99e8c830807affb3ae68ab0beeca63f (diff)
downloadqpid-python-b5c270f10496f522ef6a03a8fa60f85d55c9187d.tar.gz
* cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to
ChannelAdapter and Method Body. Request ID comes from body, ChannelAdapter is used to send frames, not OutputHandler. * cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member. Context is per-method not per-channel. * cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId with MethodContext (for responses) or ChannelAdapter (for requests.) Use context request-ID to construct responses, send all bodies via ChannelAdapter. * cpp/lib/broker/BrokerAdapter.cpp: Link broker::Channel to BrokerAdapter. * cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters. Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion& * Cosmetic changes, many files: - fixed indentation, broke long lines. - removed unnecessary qpid:: prefixes. * broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into broker::channel. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502767 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/Broker.cpp2
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp300
-rw-r--r--cpp/lib/broker/BrokerAdapter.h186
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp74
-rw-r--r--cpp/lib/broker/BrokerChannel.h72
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp96
-rw-r--r--cpp/lib/broker/BrokerMessage.h190
-rw-r--r--cpp/lib/broker/BrokerMessageBase.h242
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp33
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h67
-rw-r--r--cpp/lib/broker/Connection.cpp38
-rw-r--r--cpp/lib/broker/Connection.h30
-rw-r--r--cpp/lib/broker/Content.h28
-rw-r--r--cpp/lib/broker/InMemoryContent.cpp13
-rw-r--r--cpp/lib/broker/InMemoryContent.h2
-rw-r--r--cpp/lib/broker/LazyLoadedContent.cpp13
-rw-r--r--cpp/lib/broker/LazyLoadedContent.h8
-rw-r--r--cpp/lib/broker/MessageBuilder.h4
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp115
-rw-r--r--cpp/lib/broker/NullMessageStore.h2
-rw-r--r--cpp/lib/client/ClientChannel.cpp28
-rw-r--r--cpp/lib/client/ResponseHandler.cpp6
-rw-r--r--cpp/lib/client/ResponseHandler.h7
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.cpp4
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.h7
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.h2
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.cpp7
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.h8
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.cpp16
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.h21
-rw-r--r--cpp/lib/common/framing/MethodContext.h59
-rw-r--r--cpp/lib/common/framing/ProtocolVersion.cpp5
-rw-r--r--cpp/tests/ChannelTest.cpp92
-rw-r--r--cpp/tests/ExchangeTest.cpp8
-rw-r--r--cpp/tests/FramingTest.cpp7
-rw-r--r--cpp/tests/InMemoryContentTest.cpp35
-rw-r--r--cpp/tests/LazyLoadedContentTest.cpp30
-rw-r--r--cpp/tests/MessageBuilderTest.cpp24
-rw-r--r--cpp/tests/MessageTest.cpp25
-rw-r--r--cpp/tests/QueueTest.cpp20
-rw-r--r--cpp/tests/TxAckTest.cpp7
-rw-r--r--cpp/tests/TxPublishTest.cpp11
42 files changed, 976 insertions, 968 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp
index 18254484bc..f650452e33 100644
--- a/cpp/lib/broker/Broker.cpp
+++ b/cpp/lib/broker/Broker.cpp
@@ -54,7 +54,7 @@ Broker::Broker(const Configuration& conf) :
factory(*this)
{
if (config.getStore().empty())
- store.reset(new NullMessageStore());
+ store.reset(new NullMessageStore(config.isTrace()));
else
store.reset(new MessageStoreModule(config.getStore()));
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 73ece8b264..10e386ff41 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -28,168 +28,20 @@ namespace broker {
using namespace qpid;
using namespace qpid::framing;
-typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+typedef std::vector<Queue::shared_ptr> QueueVector;
-class BrokerAdapter::ServerOps : public AMQP_ServerOperations
-{
- public:
- ServerOps(Channel& ch, Connection& c, Broker& b) :
- basicHandler(ch, c, b),
- channelHandler(ch, c, b),
- connectionHandler(ch, c, b),
- exchangeHandler(ch, c, b),
- messageHandler(ch, c, b),
- queueHandler(ch, c, b),
- txHandler(ch, c, b)
- {}
-
- ChannelHandler* getChannelHandler() { return &channelHandler; }
- ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
- BasicHandler* getBasicHandler() { return &basicHandler; }
- ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
- QueueHandler* getQueueHandler() { return &queueHandler; }
- TxHandler* getTxHandler() { return &txHandler; }
- MessageHandler* getMessageHandler() { return &messageHandler; }
- AccessHandler* getAccessHandler() {
- throw ConnectionException(540, "Access class not implemented"); }
- FileHandler* getFileHandler() {
- throw ConnectionException(540, "File class not implemented"); }
- StreamHandler* getStreamHandler() {
- throw ConnectionException(540, "Stream class not implemented"); }
- DtxHandler* getDtxHandler() {
- throw ConnectionException(540, "Dtx class not implemented"); }
- TunnelHandler* getTunnelHandler() {
- throw ConnectionException(540, "Tunnel class not implemented"); }
-
- private:
- struct CoreRefs {
- CoreRefs(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b) {}
-
- Channel& channel;
- Connection& connection;
- Broker& broker;
- };
-
- class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
- public:
- ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-
- void startOk(const MethodContext& context,
- const qpid::framing::FieldTable& clientProperties,
- const std::string& mechanism, const std::string& response,
- const std::string& locale);
- void secureOk(const MethodContext& context, const std::string& response);
- void tuneOk(const MethodContext& context, u_int16_t channelMax,
- u_int32_t frameMax, u_int16_t heartbeat);
- void open(const MethodContext& context, const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(const MethodContext& context, u_int16_t replyCode,
- const std::string& replyText,
- u_int16_t classId, u_int16_t methodId);
- void closeOk(const MethodContext& context);
- };
-
- class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
- public:
- ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void open(const MethodContext& context, const std::string& outOfBand);
- void flow(const MethodContext& context, bool active);
- void flowOk(const MethodContext& context, bool active);
- void ok( const MethodContext& context );
- void ping( const MethodContext& context );
- void pong( const MethodContext& context );
- void resume( const MethodContext& context, const std::string& channelId );
- void close(const MethodContext& context, u_int16_t replyCode, const
- std::string& replyText, u_int16_t classId, u_int16_t methodId);
- void closeOk(const MethodContext& context);
- };
-
- class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
- public:
- ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(const MethodContext& context, u_int16_t ticket,
- const std::string& exchange, const std::string& type,
- bool passive, bool durable, bool autoDelete,
- bool internal, bool nowait,
- const qpid::framing::FieldTable& arguments);
- void delete_(const MethodContext& context, u_int16_t ticket,
- const std::string& exchange, bool ifUnused, bool nowait);
- };
-
- class QueueHandlerImpl : private CoreRefs, public QueueHandler{
- public:
- QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait,
- const qpid::framing::FieldTable& arguments);
- void bind(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- const std::string& exchange, const std::string& routingKey,
- bool nowait, const qpid::framing::FieldTable& arguments);
- void unbind(const MethodContext& context,
- u_int16_t ticket,
- const std::string& queue,
- const std::string& exchange,
- const std::string& routingKey,
- const qpid::framing::FieldTable& arguments );
- void purge(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool nowait);
- void delete_(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool ifUnused, bool ifEmpty,
- bool nowait);
- };
-
- class BasicHandlerImpl : private CoreRefs, public BasicHandler{
- public:
- BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void qos(const MethodContext& context, u_int32_t prefetchSize,
- u_int16_t prefetchCount, bool global);
- void consume(
- const MethodContext& context, u_int16_t ticket, const std::string& queue,
- const std::string& consumerTag, bool noLocal, bool noAck,
- bool exclusive, bool nowait,
- const qpid::framing::FieldTable& fields);
- void cancel(const MethodContext& context, const std::string& consumerTag,
- bool nowait);
- void publish(const MethodContext& context, u_int16_t ticket,
- const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
- void get(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool noAck);
- void ack(const MethodContext& context, u_int64_t deliveryTag, bool multiple);
- void reject(const MethodContext& context, u_int64_t deliveryTag, bool requeue);
- void recover(const MethodContext& context, bool requeue);
- };
-
- class TxHandlerImpl : private CoreRefs, public TxHandler{
- public:
- TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void select(const MethodContext& context);
- void commit(const MethodContext& context);
- void rollback(const MethodContext& context);
- };
-
- BasicHandlerImpl basicHandler;
- ChannelHandlerImpl channelHandler;
- ConnectionHandlerImpl connectionHandler;
- ExchangeHandlerImpl exchangeHandler;
- MessageHandlerImpl messageHandler;
- QueueHandlerImpl queueHandler;
- TxHandlerImpl txHandler;
-
-};
-
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
- const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk(
+ const MethodContext& context , const FieldTable& /*clientProperties*/,
+ const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
connection.client->getConnection().tune(
context, 100, connection.getFrameMax(), connection.getHeartbeat());
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){}
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk(
+ const MethodContext&, const string& /*response*/){}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
const MethodContext&, u_int16_t /*channelmax*/,
u_int32_t framemax, u_int16_t heartbeat)
{
@@ -197,12 +49,12 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(
connection.setHeartbeat(heartbeat);
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
connection.client->getConnection().openOk(context, knownhosts);
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
@@ -210,21 +62,21 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
connection.getOutput().close();
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
connection.getOutput().close();
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open(
const MethodContext& context, const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
connection.client->getChannel().openOk(context, std::string()/* ID */);
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/,
const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
@@ -234,13 +86,13 @@ void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(
connection.closeChannel(channel.getId());
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&){}
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
if(passive){
if(!broker.getExchanges().get(exchange)) {
@@ -265,17 +117,17 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&
}
}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
if(!nowait) connection.client->getExchange().deleteOk(context);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = connection.getQueue(name, channel.getId());
@@ -308,9 +160,9 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& co
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
@@ -325,7 +177,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& conte
}
void
-BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
+BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
@@ -344,15 +196,15 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
connection.client->getQueue().unbindOk(context);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
if(!nowait) connection.client->getQueue().purgeOk(context, count);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
@@ -363,7 +215,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& co
}else{
//remove the queue from the list of exclusive queues if necessary
if(q->isExclusiveOwner(&connection)){
- queue_iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
+ QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
}
count = q->getMessageCount();
@@ -377,14 +229,14 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& co
-void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
connection.client->getBasic().qosOk(context);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
const MethodContext& context, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
@@ -412,19 +264,23 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/,
- const string& exchangeName, const string& routingKey,
- bool mandatory, bool immediate){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
+ const MethodContext& context, u_int16_t /*ticket*/,
+ const string& exchangeName, const string& routingKey,
+ bool mandatory, bool immediate)
+{
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
+ BasicMessage* msg = new BasicMessage(
+ &connection, exchangeName, routingKey, mandatory, immediate,
+ context.methodBody);
channel.handlePublish(msg, exchange);
}else{
throw ChannelException(
@@ -432,7 +288,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
@@ -441,7 +297,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& contex
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
try{
channel.ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
@@ -449,23 +305,23 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
channel.recover(requeue);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
channel.begin();
connection.client->getTx().selectOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
channel.commit();
connection.client->getTx().commitOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
channel.rollback();
connection.client->getTx().rollbackOk(context);
@@ -473,82 +329,32 @@ void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& cont
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& )
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
{
//no specific action required, generic response handling should be sufficient
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context)
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
{
connection.client->getChannel().ok(context);
connection.client->getChannel().pong(context);
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context)
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
{
connection.client->getChannel().ok(context);
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume(
const MethodContext&,
const string& /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
-BrokerAdapter::BrokerAdapter(
- std::auto_ptr<Channel> ch, Connection& c, Broker& b
-) :
- channel(ch),
- connection(c),
- broker(b),
- serverOps(new ServerOps(*channel,c,b))
-{
- init(channel->getId(), c.getOutput(), channel->getVersion());
-}
-
-void BrokerAdapter::handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const MethodContext& context
-)
-{
- try{
- method->invoke(*serverOps, context);
- }catch(ChannelException& e){
- connection.client->getChannel().close(
- context, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(ConnectionException& e){
- connection.client->getConnection().close(
- context, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- connection.client->getConnection().close(
- context, 541/*internal error*/, e.what(),
- method->amqpClassId(), method->amqpMethodId());
- }
-}
-
-void BrokerAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
- channel->handleHeader(body);
-}
-
-void BrokerAdapter::handleContent(AMQContentBody::shared_ptr body) {
- channel->handleContent(body);
-}
-
-void BrokerAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
- // TODO aconway 2007-01-17: Implement heartbeats.
-}
-
-
-bool BrokerAdapter::isOpen() const {
- return channel->isOpen();
-}
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h
index 72c09be849..cd34f17e58 100644
--- a/cpp/lib/broker/BrokerAdapter.h
+++ b/cpp/lib/broker/BrokerAdapter.h
@@ -18,18 +18,14 @@
* limitations under the License.
*
*/
-#include <memory.h>
-
#include "AMQP_ServerOperations.h"
-#include "BodyHandler.h"
+#include "MessageHandlerImpl.h"
#include "BrokerChannel.h"
-#include "amqp_types.h"
-#include "framing/ChannelAdapter.h"
namespace qpid {
namespace broker {
-class AMQMethodBody;
+class Channel;
class Connection;
class Broker;
@@ -38,35 +34,173 @@ class Broker;
*
* Translates protocol bodies into calls on the core Channel,
* Connection and Broker objects.
- *
- * Owns a channel, has references to Connection and Broker.
*/
-class BrokerAdapter : public framing::ChannelAdapter
+
+class ChannelHandler;
+class ConnectionHandler;
+class BasicHandler;
+class ExchangeHandler;
+class QueueHandler;
+class TxHandler;
+class MessageHandler;
+class AccessHandler;
+class FileHandler;
+class StreamHandler;
+class DtxHandler;
+class TunnelHandler;
+
+class BrokerAdapter : public framing::AMQP_ServerOperations
{
public:
- BrokerAdapter(std::auto_ptr<Channel> ch, Connection&, Broker&);
- Channel& getChannel() { return *channel; }
+ BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
+ basicHandler(ch, c, b),
+ channelHandler(ch, c, b),
+ connectionHandler(ch, c, b),
+ exchangeHandler(ch, c, b),
+ messageHandler(ch, c, b),
+ queueHandler(ch, c, b),
+ txHandler(ch, c, b)
+ {}
+
+ ChannelHandler* getChannelHandler() { return &channelHandler; }
+ ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
+ BasicHandler* getBasicHandler() { return &basicHandler; }
+ ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
+ QueueHandler* getQueueHandler() { return &queueHandler; }
+ TxHandler* getTxHandler() { return &txHandler; }
+ MessageHandler* getMessageHandler() { return &messageHandler; }
+ AccessHandler* getAccessHandler() {
+ throw ConnectionException(540, "Access class not implemented"); }
+ FileHandler* getFileHandler() {
+ throw ConnectionException(540, "File class not implemented"); }
+ StreamHandler* getStreamHandler() {
+ throw ConnectionException(540, "Stream class not implemented"); }
+ DtxHandler* getDtxHandler() {
+ throw ConnectionException(540, "Dtx class not implemented"); }
+ TunnelHandler* getTunnelHandler() {
+ throw ConnectionException(540, "Tunnel class not implemented"); }
- void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
- void handleContent(boost::shared_ptr<framing::AMQContentBody>);
- void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+ private:
+ struct CoreRefs {
+ CoreRefs(Channel& ch, Connection& c, Broker& b)
+ : channel(ch), connection(c), broker(b) {}
- bool isOpen() const;
+ Channel& channel;
+ Connection& connection;
+ Broker& broker;
+ };
- private:
- void handleMethodInContext(
- boost::shared_ptr<framing::AMQMethodBody> method,
- const framing::MethodContext& context);
+ class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
+ public:
+ ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+
+ void startOk(const framing::MethodContext& context,
+ const qpid::framing::FieldTable& clientProperties,
+ const std::string& mechanism, const std::string& response,
+ const std::string& locale);
+ void secureOk(const framing::MethodContext& context,
+ const std::string& response);
+ void tuneOk(const framing::MethodContext& context,
+ u_int16_t channelMax,
+ u_int32_t frameMax, u_int16_t heartbeat);
+ void open(const framing::MethodContext& context,
+ const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(const framing::MethodContext& context, u_int16_t replyCode,
+ const std::string& replyText,
+ u_int16_t classId, u_int16_t methodId);
+ void closeOk(const framing::MethodContext& context);
+ };
+
+ class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
+ public:
+ ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void open(const framing::MethodContext& context, const std::string& outOfBand);
+ void flow(const framing::MethodContext& context, bool active);
+ void flowOk(const framing::MethodContext& context, bool active);
+ void ok( const framing::MethodContext& context );
+ void ping( const framing::MethodContext& context );
+ void pong( const framing::MethodContext& context );
+ void resume( const framing::MethodContext& context, const std::string& channelId );
+ void close(const framing::MethodContext& context, u_int16_t replyCode, const
+ std::string& replyText, u_int16_t classId, u_int16_t methodId);
+ void closeOk(const framing::MethodContext& context);
+ };
- class ServerOps;
+ class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
+ public:
+ ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void declare(const framing::MethodContext& context, u_int16_t ticket,
+ const std::string& exchange, const std::string& type,
+ bool passive, bool durable, bool autoDelete,
+ bool internal, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+ void delete_(const framing::MethodContext& context, u_int16_t ticket,
+ const std::string& exchange, bool ifUnused, bool nowait);
+ };
- std::auto_ptr<Channel> channel;
- Connection& connection;
- Broker& broker;
- boost::shared_ptr<ServerOps> serverOps;
-};
-
+ class QueueHandlerImpl : private CoreRefs, public QueueHandler{
+ public:
+ QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+ void bind(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ const std::string& exchange, const std::string& routingKey,
+ bool nowait, const qpid::framing::FieldTable& arguments);
+ void unbind(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& queue,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& arguments );
+ void purge(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ bool nowait);
+ void delete_(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ bool ifUnused, bool ifEmpty,
+ bool nowait);
+ };
+
+ class BasicHandlerImpl : private CoreRefs, public BasicHandler{
+ public:
+ BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void qos(const framing::MethodContext& context, u_int32_t prefetchSize,
+ u_int16_t prefetchCount, bool global);
+ void consume(
+ const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ const std::string& consumerTag, bool noLocal, bool noAck,
+ bool exclusive, bool nowait,
+ const qpid::framing::FieldTable& fields);
+ void cancel(const framing::MethodContext& context, const std::string& consumerTag,
+ bool nowait);
+ void publish(const framing::MethodContext& context, u_int16_t ticket,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
+ void get(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ bool noAck);
+ void ack(const framing::MethodContext& context, u_int64_t deliveryTag, bool multiple);
+ void reject(const framing::MethodContext& context, u_int64_t deliveryTag, bool requeue);
+ void recover(const framing::MethodContext& context, bool requeue);
+ };
+ class TxHandlerImpl : private CoreRefs, public TxHandler{
+ public:
+ TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void select(const framing::MethodContext& context);
+ void commit(const framing::MethodContext& context);
+ void rollback(const framing::MethodContext& context);
+ };
+
+ BasicHandlerImpl basicHandler;
+ ChannelHandlerImpl channelHandler;
+ ConnectionHandlerImpl connectionHandler;
+ ExchangeHandlerImpl exchangeHandler;
+ MessageHandlerImpl messageHandler;
+ QueueHandlerImpl queueHandler;
+ TxHandlerImpl txHandler;
+
+};
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index f0b6ff6d07..96215a60ed 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -18,12 +18,25 @@
* under the License.
*
*/
+#include <assert.h>
+
#include <iostream>
#include <sstream>
-#include <assert.h>
+#include <algorithm>
+#include <functional>
-#include <BrokerChannel.h>
+#include "BrokerChannel.h"
+#include "DeletingTxOp.h"
+#include "framing/ChannelAdapter.h"
#include <QpidError.h>
+#include <DeliverableMessage.h>
+#include <BrokerQueue.h>
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <TxAck.h>
+#include <TxPublish.h>
+#include "BrokerAdapter.h"
+#include "Connection.h"
using std::mem_fun_ref;
using std::bind2nd;
@@ -33,12 +46,12 @@ using namespace qpid::sys;
Channel::Channel(
- const ProtocolVersion& _version, OutputHandler* _out, int _id,
+ Connection& con, ChannelId id,
u_int32_t _framesize, MessageStore* const _store,
u_int64_t _stagingThreshold
) :
- id(_id),
- out(*_out),
+ ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()),
+ connection(con),
currentDeliveryTag(1),
transactional(false),
prefetchSize(0),
@@ -47,8 +60,8 @@ Channel::Channel(
tagGenerator("sgen"),
store(_store),
messageBuilder(this, _store, _stagingThreshold),
- version(_version),
- opened(false)
+ opened(true),
+ adapter(new BrokerAdapter(*this, con, con.broker))
{
outstanding.reset();
}
@@ -61,7 +74,10 @@ bool Channel::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) {
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks,
+ bool exclusive, ConnectionToken* const connection,
+ const FieldTable*)
+{
if(tag.empty()) tag = tagGenerator.generate();
ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
try{
@@ -117,7 +133,10 @@ void Channel::rollback(){
accumulatedAck.clear();
}
-void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
+void Channel::deliver(
+ Message::shared_ptr& msg, const string& consumerTag,
+ Queue::shared_ptr& queue, bool ackExpected)
+{
Mutex::ScopedLock locker(deliveryLock);
u_int64_t deliveryTag = currentDeliveryTag++;
@@ -127,7 +146,7 @@ void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue
outstanding.count++;
}
//send deliver method, header and content(s)
- msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version);
+ msg->deliver(*this, consumerTag, deliveryTag, framesize);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -184,7 +203,7 @@ void Channel::handleContent(AMQContentBody::shared_ptr content){
messageBuilder.addContent(content);
}
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
// TODO aconway 2007-01-17: Implement heartbeating.
}
@@ -255,7 +274,9 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
if(msg){
Mutex::ScopedLock locker(deliveryLock);
u_int64_t myDeliveryTag = currentDeliveryTag++;
- msg->sendGetOk(&out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version);
+ msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
+ queue->getMessageCount() + 1, myDeliveryTag,
+ framesize);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -265,7 +286,32 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
}
}
-void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
- msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version);
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
+ u_int64_t deliveryTag)
+{
+ msg->deliver(*this, consumerTag, deliveryTag, framesize);
+}
+
+void Channel::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
+{
+ try{
+ method->invoke(*adapter, context);
+ }catch(ChannelException& e){
+ connection.client->getChannel().close(
+ context, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ connection.closeChannel(getId());
+ }catch(ConnectionException& e){
+ connection.client->getConnection().close(
+ context, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ connection.client->getConnection().close(
+ context, 541/*internal error*/, e.what(),
+ method->amqpClassId(), method->amqpMethodId());
+ }
}
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 4e64d148e8..dd95e944bb 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -22,42 +22,39 @@
*
*/
-#include <algorithm>
-#include <functional>
#include <list>
#include <map>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
#include <AccumulatedAck.h>
-#include <Binding.h>
#include <Consumer.h>
-#include <DeletingTxOp.h>
-#include <DeliverableMessage.h>
#include <DeliveryRecord.h>
-#include <BrokerMessage.h>
#include <MessageBuilder.h>
#include <NameGenerator.h>
#include <Prefetch.h>
-#include <BrokerQueue.h>
-#include <MessageStore.h>
-#include <TxAck.h>
#include <TxBuffer.h>
-#include <TxPublish.h>
-#include <sys/Monitor.h>
-#include <OutputHandler.h>
-#include <AMQContentBody.h>
-#include <AMQHeaderBody.h>
-#include <AMQHeartbeatBody.h>
-#include <BasicPublishBody.h>
+#include "framing/ChannelAdapter.h"
namespace qpid {
namespace broker {
-using qpid::framing::string;
+class ConnectionToken;
+class Connection;
+class Queue;
+class BrokerAdapter;
+
+using framing::string;
/**
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel : private MessageBuilder::CompletionHandler {
+class Channel :
+ public framing::ChannelAdapter,
+ private MessageBuilder::CompletionHandler
+{
class ConsumerImpl : public virtual Consumer
{
Channel* parent;
@@ -67,15 +64,18 @@ class Channel : private MessageBuilder::CompletionHandler {
const bool ackExpected;
bool blocked;
public:
- ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
+ ConsumerImpl(Channel* parent, const string& tag,
+ Queue::shared_ptr queue,
+ ConnectionToken* const connection, bool ack);
virtual bool deliver(Message::shared_ptr& msg);
void cancel();
void requestDispatch();
};
typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+
+ Connection& connection;
u_int16_t id;
- qpid::framing::OutputHandler& out;
u_int64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
bool transactional;
@@ -86,30 +86,32 @@ class Channel : private MessageBuilder::CompletionHandler {
u_int32_t framesize;
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
- qpid::sys::Mutex deliveryLock;
+ sys::Mutex deliveryLock;
TxBuffer txBuffer;
AccumulatedAck accumulatedAck;
MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
- qpid::framing::ProtocolVersion version; // version used for this channel
bool opened;
+ boost::scoped_ptr<BrokerAdapter> adapter;
+
virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
void cancel(consumer_iterator consumer);
bool checkPrefetch(Message::shared_ptr& msg);
public:
- Channel(
- const qpid::framing::ProtocolVersion& _version,
- qpid::framing::OutputHandler* out, int id, u_int32_t framesize,
- MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
+ Channel(Connection& channel,
+ framing::ChannelId id,
+ u_int32_t framesize,
+ MessageStore* const _store = 0,
+ u_int64_t stagingThreshold = 0);
+
~Channel();
+
bool isOpen() const { return opened; }
- const framing::ProtocolVersion& getVersion() const { return version; }
void open() { opened = true; }
- u_int16_t getId() const { return id; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; }
@@ -118,7 +120,7 @@ class Channel : private MessageBuilder::CompletionHandler {
bool exists(const string& consumerTag);
void consume(string& tag, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection = 0,
- const qpid::framing::FieldTable* = 0);
+ const framing::FieldTable* = 0);
void cancel(const string& tag);
bool get(Queue::shared_ptr queue, bool ackExpected);
void begin();
@@ -129,14 +131,18 @@ class Channel : private MessageBuilder::CompletionHandler {
void recover(bool requeue);
void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
void handlePublish(Message* msg, Exchange::shared_ptr exchange);
- void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr);
- void handleContent(qpid::framing::AMQContentBody::shared_ptr);
- void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr);
+ void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
+ void handleContent(boost::shared_ptr<framing::AMQContentBody>);
+ void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+ void handleMethodInContext(
+ boost::shared_ptr<framing::AMQMethodBody> method,
+ const framing::MethodContext& context);
+
};
struct InvalidAckException{};
-}} // namespace qpid::broker
+}} // namespace broker
#endif /*!_broker_BrokerChannel_h*/
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index b738040470..69d4ba087f 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -27,31 +27,35 @@
#include <BasicDeliverBody.h>
#include <BasicGetOkBody.h>
#include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-BasicMessage::BasicMessage(const ConnectionToken* const _publisher,
- const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate) :
- Message(_exchange, _routingKey, _mandatory, _immediate),
- publisher(_publisher),
- size(0)
+BasicMessage::BasicMessage(
+ const ConnectionToken* const _publisher,
+ const string& _exchange, const string& _routingKey,
+ bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo
+) :
+ Message(_exchange, _routingKey, _mandatory, _immediate, respondTo),
+ publisher(_publisher),
+ size(0)
{
}
-BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
- publisher(0), size(0)
-{
+// FIXME aconway 2007-02-01: remove.
+// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
+// publisher(0), size(0)
+// {
- decode(buffer, headersOnly, contentChunkSize);
-}
+// decode(buffer, headersOnly, contentChunkSize);
+// }
+// For tests only.
BasicMessage::BasicMessage() : publisher(0), size(0)
-{
-}
+{}
BasicMessage::~BasicMessage(){
if (content.get()) content->destroy();
@@ -73,34 +77,42 @@ bool BasicMessage::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
-void BasicMessage::deliver(OutputHandler* out, int channel,
- const string& consumerTag, u_int64_t deliveryTag,
- u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel,
- new BasicDeliverBody(*version, consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey())));
- sendContent(out, channel, framesize, version);
-}
-
-void BasicMessage::sendGetOk(OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel,
- new BasicGetOkBody(*version, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount)));
- sendContent(out, channel, framesize, version);
-}
-
-void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
- AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
- out->send(new AMQFrame(*version, channel, headerBody));
-
+void BasicMessage::deliver(ChannelAdapter& channel,
+ const string& consumerTag, u_int64_t deliveryTag,
+ u_int32_t framesize)
+{
+ // CCT -- TODO - Update code generator to take pointer/ not
+ // instance to avoid extra contruction
+ channel.send(
+ new BasicDeliverBody(
+ channel.getVersion(), consumerTag, deliveryTag,
+ getRedelivered(), getExchange(), getRoutingKey()));
+ sendContent(channel, framesize);
+}
+
+void BasicMessage::sendGetOk(const MethodContext& context,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize)
+{
+ // CCT -- TODO - Update code generator to take pointer/ not
+ // instance to avoid extra contruction
+ context.channel->send(
+ new BasicGetOkBody(
+ context.channel->getVersion(),
+ context.methodBody->getRequestId(),
+ deliveryTag, getRedelivered(), getExchange(),
+ getRoutingKey(), messageCount));
+ sendContent(*context.channel, framesize);
+}
+
+void BasicMessage::sendContent(
+ ChannelAdapter& channel, u_int32_t framesize)
+{
+ channel.send(header);
Mutex::ScopedLock locker(contentLock);
- if (content.get()) content->send(*version, out, channel, framesize);
+ if (content.get())
+ content->send(channel, framesize);
}
BasicHeaderProperties* BasicMessage::getHeaderProperties(){
@@ -126,8 +138,8 @@ void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChu
void BasicMessage::decodeHeader(Buffer& buffer)
{
- string exchange;
- string routingKey;
+ string exchange;
+ string routingKey;
buffer.getShortString(exchange);
buffer.getShortString(routingKey);
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index 4001af97a5..5ac9c83d0e 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -27,109 +27,111 @@
#include <boost/shared_ptr.hpp>
#include <AMQContentBody.h>
#include <AMQHeaderBody.h>
-#include <ProtocolVersion.h>
+#include "AMQMethodBody.h"
#include <BasicHeaderProperties.h>
#include <ConnectionToken.h>
#include <Content.h>
-#include <OutputHandler.h>
#include <Mutex.h>
#include <TxBuffer.h>
namespace qpid {
- namespace broker {
- class MessageStore;
- using qpid::framing::string;
+namespace framing {
+class MethodContext;
+class ChannelAdapter;
+}
+
+namespace broker {
+
+class MessageStore;
+using framing::string;
- /**
- * Represents an AMQP message, i.e. a header body, a list of
- * content bodies and some details about the publication
- * request.
- */
- class BasicMessage : public Message{
- const ConnectionToken* const publisher;
- qpid::framing::AMQHeaderBody::shared_ptr header;
- std::auto_ptr<Content> content;
- qpid::sys::Mutex contentLock;
- u_int64_t size;
-
- void sendContent(qpid::framing::OutputHandler* out,
- int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version);
-
- public:
- typedef boost::shared_ptr<BasicMessage> shared_ptr;
-
- BasicMessage(const ConnectionToken* const publisher,
- const string& exchange, const string& routingKey,
- bool mandatory, bool immediate);
- BasicMessage(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
- BasicMessage();
- ~BasicMessage();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr data);
- bool isComplete();
- const ConnectionToken* const getPublisher();
-
- void deliver(qpid::framing::OutputHandler* out,
- int channel,
- const string& consumerTag,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version);
- void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version);
-
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- bool isPersistent();
- u_int64_t contentSize() const { return size; }
-
- void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
- void decodeHeader(qpid::framing::Buffer& buffer);
- void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0);
-
- void encode(qpid::framing::Buffer& buffer);
- void encodeHeader(qpid::framing::Buffer& buffer);
- void encodeContent(qpid::framing::Buffer& buffer);
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- */
- u_int32_t encodedSize();
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- */
- u_int32_t encodedHeaderSize();
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- u_int32_t encodedContentSize();
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- void releaseContent(MessageStore* store);
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- u_int64_t expectedContentSize();
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- void setContent(std::auto_ptr<Content>& content);
- };
-
- }
+/**
+ * Represents an AMQP message, i.e. a header body, a list of
+ * content bodies and some details about the publication
+ * request.
+ */
+class BasicMessage : public Message {
+ const ConnectionToken* const publisher;
+ framing::AMQHeaderBody::shared_ptr header;
+ std::auto_ptr<Content> content;
+ sys::Mutex contentLock;
+ u_int64_t size;
+
+ void sendContent(framing::ChannelAdapter&, u_int32_t framesize);
+
+ public:
+ typedef boost::shared_ptr<BasicMessage> shared_ptr;
+
+ BasicMessage(const ConnectionToken* const publisher,
+ const string& exchange, const string& routingKey,
+ bool mandatory, bool immediate,
+ framing::AMQMethodBody::shared_ptr respondTo);
+ BasicMessage();
+ ~BasicMessage();
+ void setHeader(framing::AMQHeaderBody::shared_ptr header);
+ void addContent(framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+ const ConnectionToken* const getPublisher();
+
+ void deliver(framing::ChannelAdapter&,
+ const string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
+
+ void sendGetOk(const framing::MethodContext&,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
+
+ framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
+ u_int64_t contentSize() const { return size; }
+
+ void decode(framing::Buffer& buffer, bool headersOnly = false,
+ u_int32_t contentChunkSize = 0);
+ void decodeHeader(framing::Buffer& buffer);
+ void decodeContent(framing::Buffer& buffer, u_int32_t contentChunkSize = 0);
+
+ void encode(framing::Buffer& buffer);
+ void encodeHeader(framing::Buffer& buffer);
+ void encodeContent(framing::Buffer& buffer);
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ */
+ u_int32_t encodedSize();
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ */
+ u_int32_t encodedHeaderSize();
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ u_int32_t encodedContentSize();
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ void releaseContent(MessageStore* store);
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ u_int64_t expectedContentSize();
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ void setContent(std::auto_ptr<Content>& content);
+};
+
+}
}
diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h
index 53fcf66aac..9a5b136ada 100644
--- a/cpp/lib/broker/BrokerMessageBase.h
+++ b/cpp/lib/broker/BrokerMessageBase.h
@@ -24,146 +24,148 @@
#include "AMQContentBody.h"
#include "AMQHeaderBody.h"
+#include "AMQMethodBody.h"
#include "Content.h"
+#include "framing/amqp_types.h"
#include <string>
#include <boost/shared_ptr.hpp>
namespace qpid {
- namespace framing {
- class OutputHandler;
- class ProtocolVersion;
- class BasicHeaderProperties;
- }
+namespace framing {
+class MethodContext;
+class ChannelAdapter;
+class BasicHeaderProperties;
+}
- namespace broker {
-
- class MessageStore;
- class ConnectionToken;
-
- /**
- * Base class for all types of internal broker messages
- * abstracting away the operations
- * TODO; AMS: for the moment this is mostly a placeholder
- */
- class Message{
- std::string exchange;
- std::string routingKey;
- const bool mandatory;
- const bool immediate;
- u_int64_t persistenceId;
-
- bool redelivered;
+namespace broker {
- public:
- typedef boost::shared_ptr<Message> shared_ptr;
+class MessageStore;
+class ConnectionToken;
- Message(const std::string& _exchange, const std::string& _routingKey,
- bool _mandatory, bool _immediate) :
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- persistenceId(0),
- redelivered(false)
- {}
-
- Message() :
- mandatory(false),
- immediate(false),
- persistenceId(0),
- redelivered(false)
- {}
-
- virtual ~Message() {};
+/**
+ * Base class for all types of internal broker messages
+ * abstracting away the operations
+ * TODO; AMS: for the moment this is mostly a placeholder
+ */
+class Message{
+ std::string exchange;
+ std::string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ u_int64_t persistenceId;
+ bool redelivered;
+ framing::AMQMethodBody::shared_ptr respondTo;
+
+ public:
+ typedef boost::shared_ptr<Message> shared_ptr;
+
+ Message(const std::string& _exchange, const std::string& _routingKey,
+ bool _mandatory, bool _immediate,
+ framing::AMQMethodBody::shared_ptr respondTo_) :
+ exchange(_exchange),
+ routingKey(_routingKey),
+ mandatory(_mandatory),
+ immediate(_immediate),
+ persistenceId(0),
+ redelivered(false),
+ respondTo(respondTo_)
+ {}
- // Accessors
- const std::string& getRoutingKey() const { return routingKey; }
- const std::string& getExchange() const { return exchange; }
- u_int64_t getPersistenceId() const { return persistenceId; }
- bool getRedelivered() const { return redelivered; }
+ Message() :
+ mandatory(false),
+ immediate(false),
+ persistenceId(0),
+ redelivered(false)
+ {}
+
+ virtual ~Message() {};
- void setRouting(const std::string& _exchange, const std::string& _routingKey)
- { exchange = _exchange; routingKey = _routingKey; }
- void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests?
- void redeliver() { redelivered = true; }
-
- /**
- * Used to deliver the message from the queue
- */
- virtual void deliver(qpid::framing::OutputHandler* out,
- int channel,
+ // Accessors
+ const std::string& getRoutingKey() const { return routingKey; }
+ const std::string& getExchange() const { return exchange; }
+ u_int64_t getPersistenceId() const { return persistenceId; }
+ bool getRedelivered() const { return redelivered; }
+ framing::AMQMethodBody::shared_ptr getRespondTo() const {
+ return respondTo;
+ }
+
+ void setRouting(const std::string& _exchange, const std::string& _routingKey)
+ { exchange = _exchange; routingKey = _routingKey; }
+ void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests?
+ void redeliver() { redelivered = true; }
+
+ /**
+ * Used to deliver the message from the queue
+ */
+ virtual void deliver(framing::ChannelAdapter& channel,
const std::string& consumerTag,
u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version) = 0;
- /**
- * Used to return a message in response to a get from a queue
- */
- virtual void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
+ u_int32_t framesize) = 0;
+ /**
+ * Used to return a message in response to a get from a queue
+ */
+ virtual void sendGetOk(const framing::MethodContext& context,
u_int32_t messageCount,
u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version) = 0;
+ u_int32_t framesize) = 0;
- virtual bool isComplete() = 0;
+ virtual bool isComplete() = 0;
- virtual u_int64_t contentSize() const = 0;
- virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0;
- virtual bool isPersistent() = 0;
- virtual const ConnectionToken* const getPublisher() = 0;
-
- virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
- virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
-
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- *
- * XXXX: Only used in tests?
- */
- virtual u_int32_t encodedSize() = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- *
- * XXXX: Only used in tests?
- */
- virtual u_int32_t encodedHeaderSize() = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- virtual u_int32_t encodedContentSize() = 0;
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- virtual u_int64_t expectedContentSize() = 0;
+ virtual u_int64_t contentSize() const = 0;
+ virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
+ virtual bool isPersistent() = 0;
+ virtual const ConnectionToken* const getPublisher() = 0;
+
+ virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+ virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedHeaderSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ virtual u_int32_t encodedContentSize() = 0;
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ virtual u_int64_t expectedContentSize() = 0;
- // TODO: AMS 29/1/2007 Don't think these are really part of base class
+ // TODO: AMS 29/1/2007 Don't think these are really part of base class
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {};
- virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {};
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- virtual void releaseContent(MessageStore* /*store*/) {};
- };
-
- }
-}
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
+ virtual void setHeader(framing::AMQHeaderBody::shared_ptr /*header*/) {};
+ virtual void addContent(framing::AMQContentBody::shared_ptr /*data*/) {};
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ virtual void releaseContent(MessageStore* /*store*/) {};
+};
+
+}}
#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
index 46f583b978..4168ff639c 100644
--- a/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -22,29 +22,28 @@
using namespace qpid::broker;
-MessageMessage::MessageMessage(const qpid::framing::AMQMethodBody& _methodBody,
- const std::string& _exchange, const std::string& _routingKey,
- bool _mandatory, bool _immediate) :
- Message(_exchange, _routingKey, _mandatory, _immediate),
- methodBody(_methodBody)
+MessageMessage::MessageMessage(
+ const qpid::framing::AMQMethodBody::shared_ptr _methodBody,
+ const std::string& _exchange, const std::string& _routingKey,
+ bool _mandatory, bool _immediate) :
+ Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
+ methodBody(_methodBody)
{
}
-void MessageMessage::deliver(qpid::framing::OutputHandler* /*out*/,
- int /*channel*/,
- const std::string& /*consumerTag*/,
- u_int64_t /*deliveryTag*/,
- u_int32_t /*framesize*/,
- qpid::framing::ProtocolVersion* /*version*/)
+void MessageMessage::deliver(
+ framing::ChannelAdapter& /*out*/,
+ const std::string& /*consumerTag*/,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t /*framesize*/)
{
}
-void MessageMessage::sendGetOk(qpid::framing::OutputHandler* /*out*/,
- int /*channel*/,
- u_int32_t /*messageCount*/,
- u_int64_t /*deliveryTag*/,
- u_int32_t /*framesize*/,
- qpid::framing::ProtocolVersion* /*version*/)
+void MessageMessage::sendGetOk(
+ const framing::MethodContext& /*context*/,
+ u_int32_t /*messageCount*/,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t /*framesize*/)
{
}
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
index b49f60f5df..cad5cf15b0 100644
--- a/cpp/lib/broker/BrokerMessageMessage.h
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -25,47 +25,46 @@
#include "BrokerMessageBase.h"
namespace qpid {
- namespace framing {
- class AMQMethodBody;
- }
+namespace framing {
+class AMQMethodBody;
+}
- namespace broker {
- class MessageMessage: public Message{
- const qpid::framing::AMQMethodBody& methodBody;
+namespace broker {
+class MessageMessage: public Message{
+ const qpid::framing::AMQMethodBody::shared_ptr methodBody;
- public:
- MessageMessage(const qpid::framing::AMQMethodBody& methodBody,
- const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
+ public:
+ MessageMessage(
+ const framing::AMQMethodBody::shared_ptr methodBody,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
- // Default destructor okay
+ // Default destructor okay
- void deliver(qpid::framing::OutputHandler* out,
- int channel,
- const std::string& consumerTag,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version);
- void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version);
- bool isComplete();
+ void deliver(framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
+
+ void sendGetOk(const framing::MethodContext& context,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
+
+ bool isComplete();
- u_int64_t contentSize() const;
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- bool isPersistent();
- const ConnectionToken* const getPublisher();
+ u_int64_t contentSize() const;
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
+ const ConnectionToken* const getPublisher();
- u_int32_t encodedSize();
- u_int32_t encodedHeaderSize();
- u_int32_t encodedContentSize();
- u_int64_t expectedContentSize();
- };
+ u_int32_t encodedSize();
+ u_int32_t encodedHeaderSize();
+ u_int32_t encodedContentSize();
+ u_int64_t expectedContentSize();
+};
- }
+}
}
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp
index b45aa375e6..6694a0ed13 100644
--- a/cpp/lib/broker/Connection.cpp
+++ b/cpp/lib/broker/Connection.cpp
@@ -60,9 +60,11 @@ Exchange::shared_ptr Connection::findExchange(const string& name){
void Connection::received(qpid::framing::AMQFrame* frame){
- getAdapter(frame->getChannel()).handleBody(frame->getBody());
+ getChannel(frame->getChannel()).handleBody(frame->getBody());
}
+// TODO aconway 2007-02-02: Should be delegated to the BrokerAdapter
+// as it is part of the protocol.
void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
if (client.get())
// TODO aconway 2007-01-16: correct error code.
@@ -72,12 +74,11 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
FieldTable properties;
string mechanisms("PLAIN");
string locales("en_US");
- // TODO aconway 2007-01-16: Client call, move to adapter.
client->getConnection().start(
- MethodContext(0, &getAdapter(0)),
+ MethodContext(&getChannel(0)),
header->getMajor(), header->getMinor(),
properties, mechanisms, locales);
- getAdapter(0).init(0, *out, client->getProtocolVersion());
+ getChannel(0).init(0, *out, client->getProtocolVersion());
}
void Connection::idleOut(){}
@@ -99,28 +100,19 @@ void Connection::closed(){
void Connection::closeChannel(u_int16_t channel) {
getChannel(channel).close();
- adapters.erase(adapters.find(channel));
+ channels.erase(channels.find(channel));
}
-BrokerAdapter& Connection::getAdapter(u_int16_t id) {
- AdapterMap::iterator i = adapters.find(id);
- if (i == adapters.end()) {
- std::auto_ptr<Channel> ch(
- new Channel(
- client->getProtocolVersion(), out, id,
- framemax, broker.getQueues().getStore(),
- settings.stagingThreshold));
- BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker);
- adapters.insert(id, adapter);
- return *adapter;
- }
- else
- return *i;
-}
-
-Channel& Connection::getChannel(u_int16_t id) {
- return getAdapter(id).getChannel();
+Channel& Connection::getChannel(ChannelId id) {
+ ChannelMap::iterator i = channels.find(id);
+ if (i == channels.end()) {
+ i = channels.insert(
+ id, new Channel(
+ *this, id, framemax, broker.getQueues().getStore(),
+ settings.stagingThreshold)).first;
+ }
+ return *i;
}
diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h
index d2b0422f8d..e3d5156c9b 100644
--- a/cpp/lib/broker/Connection.h
+++ b/cpp/lib/broker/Connection.h
@@ -34,7 +34,6 @@
#include <sys/TimeoutHandler.h>
#include "Broker.h"
#include "Exception.h"
-#include "BrokerAdapter.h"
namespace qpid {
namespace broker {
@@ -47,19 +46,22 @@ class Settings {
Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {}
};
-class Connection : public qpid::sys::ConnectionInputHandler,
+class Connection : public sys::ConnectionInputHandler,
public ConnectionToken
{
public:
- Connection(qpid::sys::ConnectionOutputHandler* out, Broker& broker);
+ Connection(sys::ConnectionOutputHandler* out, Broker& broker);
// ConnectionInputHandler methods
- void received(qpid::framing::AMQFrame* frame);
- void initiated(qpid::framing::ProtocolInitiation* header);
+ void received(framing::AMQFrame* frame);
+ void initiated(framing::ProtocolInitiation* header);
void idleOut();
void idleIn();
void closed();
- qpid::sys::ConnectionOutputHandler& getOutput() { return *out; }
+ sys::ConnectionOutputHandler& getOutput() { return *out; }
+
+ const framing::ProtocolVersion& getVersion() {
+ return client->getProtocolVersion(); }
u_int32_t getFrameMax() const { return framemax; }
u_int16_t getHeartbeat() const { return heartbeat; }
@@ -68,7 +70,7 @@ class Connection : public qpid::sys::ConnectionInputHandler,
void setHeartbeat(u_int16_t hb) { heartbeat = hb; }
Broker& broker;
- std::auto_ptr<qpid::framing::AMQP_ClientProxy> client;
+ std::auto_ptr<framing::AMQP_ClientProxy> client;
Settings settings;
std::vector<Queue::shared_ptr> exclusiveQueues;
@@ -81,20 +83,18 @@ class Connection : public qpid::sys::ConnectionInputHandler,
*/
Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
- Channel& newChannel(u_int16_t channel);
- Channel& getChannel(u_int16_t channel);
- void closeChannel(u_int16_t channel);
+ Channel& newChannel(framing::ChannelId channel);
+ Channel& getChannel(framing::ChannelId channel);
+ void closeChannel(framing::ChannelId channel);
private:
- typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap;
+ typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
Exchange::shared_ptr findExchange(const string& name);
- BrokerAdapter& getAdapter(u_int16_t id);
-
- AdapterMap adapters;
- qpid::sys::ConnectionOutputHandler* out;
+ ChannelMap channels;
+ sys::ConnectionOutputHandler* out;
u_int32_t framemax;
u_int16_t heartbeat;
};
diff --git a/cpp/lib/broker/Content.h b/cpp/lib/broker/Content.h
index a793bd8f72..3058fdfd6a 100644
--- a/cpp/lib/broker/Content.h
+++ b/cpp/lib/broker/Content.h
@@ -24,21 +24,25 @@
#include <AMQContentBody.h>
#include <Buffer.h>
#include <OutputHandler.h>
-#include <ProtocolVersion.h>
namespace qpid {
- namespace broker {
- class Content{
- public:
- virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0;
- virtual u_int32_t size() = 0;
- virtual void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
- virtual void encode(qpid::framing::Buffer& buffer) = 0;
- virtual void destroy() = 0;
- virtual ~Content(){}
- };
- }
+
+namespace framing {
+class ChannelAdapter;
}
+namespace broker {
+class Content{
+ public:
+ virtual void add(framing::AMQContentBody::shared_ptr data) = 0;
+ virtual u_int32_t size() = 0;
+ virtual void send(framing::ChannelAdapter& channel,
+ u_int32_t framesize) = 0;
+ virtual void encode(qpid::framing::Buffer& buffer) = 0;
+ virtual void destroy() = 0;
+ virtual ~Content(){}
+};
+}}
+
#endif
diff --git a/cpp/lib/broker/InMemoryContent.cpp b/cpp/lib/broker/InMemoryContent.cpp
index 12721fe86e..e27ccd8c4a 100644
--- a/cpp/lib/broker/InMemoryContent.cpp
+++ b/cpp/lib/broker/InMemoryContent.cpp
@@ -20,6 +20,7 @@
*/
#include <InMemoryContent.h>
#include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -39,24 +40,26 @@ u_int32_t InMemoryContent::size()
return sum;
}
-void InMemoryContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+// FIXME aconway 2007-02-01: Remove version parameter.
+void InMemoryContent::send(ChannelAdapter& channel, u_int32_t framesize)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
if ((*i)->size() > framesize) {
u_int32_t offset = 0;
for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
string data = (*i)->getData().substr(offset, framesize);
- out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ channel.send(new AMQContentBody(data));
offset += framesize;
}
u_int32_t remainder = (*i)->size() % framesize;
if (remainder) {
string data = (*i)->getData().substr(offset, remainder);
- out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ channel.send(new AMQContentBody(data));
}
} else {
- AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
- out->send(new AMQFrame(version, channel, contentBody));
+ AMQBody::shared_ptr contentBody =
+ static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ channel.send(contentBody);
}
}
}
diff --git a/cpp/lib/broker/InMemoryContent.h b/cpp/lib/broker/InMemoryContent.h
index 72c6b46a7f..7003ea2aab 100644
--- a/cpp/lib/broker/InMemoryContent.h
+++ b/cpp/lib/broker/InMemoryContent.h
@@ -35,7 +35,7 @@ namespace qpid {
public:
void add(qpid::framing::AMQContentBody::shared_ptr data);
u_int32_t size();
- void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void send(framing::ChannelAdapter&, u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
void destroy();
~InMemoryContent(){}
diff --git a/cpp/lib/broker/LazyLoadedContent.cpp b/cpp/lib/broker/LazyLoadedContent.cpp
index 2c7a764ec9..d8f18d5c8b 100644
--- a/cpp/lib/broker/LazyLoadedContent.cpp
+++ b/cpp/lib/broker/LazyLoadedContent.cpp
@@ -20,6 +20,7 @@
*/
#include <LazyLoadedContent.h>
#include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -37,19 +38,21 @@ u_int32_t LazyLoadedContent::size()
return 0;//all content is written as soon as it is added
}
-void LazyLoadedContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+void LazyLoadedContent::send(ChannelAdapter& channel, u_int32_t framesize)
{
if (expectedSize > framesize) {
- for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) {
+ for (u_int64_t offset = 0; offset < expectedSize; offset += framesize)
+ {
u_int64_t remaining = expectedSize - offset;
string data;
- store->loadContent(msg, data, offset, remaining > framesize ? framesize : remaining);
- out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ store->loadContent(msg, data, offset,
+ remaining > framesize ? framesize : remaining);
+ channel.send(new AMQContentBody(data));
}
} else {
string data;
store->loadContent(msg, data, 0, expectedSize);
- out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ channel.send(new AMQContentBody(data));
}
}
diff --git a/cpp/lib/broker/LazyLoadedContent.h b/cpp/lib/broker/LazyLoadedContent.h
index 5322e5e711..56fdd7699b 100644
--- a/cpp/lib/broker/LazyLoadedContent.h
+++ b/cpp/lib/broker/LazyLoadedContent.h
@@ -31,10 +31,14 @@ namespace qpid {
Message* const msg;
const u_int64_t expectedSize;
public:
- LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize);
+ LazyLoadedContent(
+ MessageStore* const store, Message* const msg,
+ u_int64_t expectedSize);
void add(qpid::framing::AMQContentBody::shared_ptr data);
u_int32_t size();
- void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void send(
+ framing::ChannelAdapter&,
+ u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
void destroy();
~LazyLoadedContent(){}
diff --git a/cpp/lib/broker/MessageBuilder.h b/cpp/lib/broker/MessageBuilder.h
index 4e51f223f0..5b8516be42 100644
--- a/cpp/lib/broker/MessageBuilder.h
+++ b/cpp/lib/broker/MessageBuilder.h
@@ -39,7 +39,9 @@ namespace qpid {
virtual void complete(Message::shared_ptr&) = 0;
virtual ~CompletionHandler(){}
};
- MessageBuilder(CompletionHandler* _handler, MessageStore* const store = 0, u_int64_t stagingThreshold = 0);
+ MessageBuilder(CompletionHandler* _handler,
+ MessageStore* const store = 0,
+ u_int64_t stagingThreshold = 0);
void initialise(Message::shared_ptr& msg);
void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
void addContent(qpid::framing::AMQContentBody::shared_ptr& content);
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index f04b749996..71100996e7 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -34,8 +34,8 @@ using namespace framing;
//
void
MessageHandlerImpl::append(const MethodContext&,
- const string& /*reference*/,
- const string& /*bytes*/ )
+ const string& /*reference*/,
+ const string& /*bytes*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
@@ -43,7 +43,7 @@ MessageHandlerImpl::append(const MethodContext&,
void
MessageHandlerImpl::cancel( const MethodContext& context,
- const string& destination )
+ const string& destination )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -54,28 +54,28 @@ MessageHandlerImpl::cancel( const MethodContext& context,
void
MessageHandlerImpl::checkpoint(const MethodContext&,
- const string& /*reference*/,
- const string& /*identifier*/ )
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
MessageHandlerImpl::close(const MethodContext&,
- const string& /*reference*/ )
+ const string& /*reference*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
MessageHandlerImpl::consume(const MethodContext& context,
- u_int16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const qpid::framing::FieldTable& filter )
+ u_int16_t /*ticket*/,
+ const string& queueName,
+ const string& destination,
+ bool noLocal,
+ bool noAck,
+ bool exclusive,
+ const qpid::framing::FieldTable& filter )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -106,15 +106,15 @@ MessageHandlerImpl::empty( const MethodContext& )
void
MessageHandlerImpl::get( const MethodContext& context,
- u_int16_t /*ticket*/,
- const string& queueName,
- const string& /*destination*/,
- bool noAck )
+ u_int16_t /*ticket*/,
+ const string& queueName,
+ const string& /*destination*/,
+ bool noAck )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Queue::shared_ptr queue =
- connection.getQueue(queueName, context.channelId);
+ connection.getQueue(queueName, context.channel->getId());
// FIXME: get is probably Basic specific
if(!channel.get(queue, !noAck)){
@@ -125,7 +125,7 @@ MessageHandlerImpl::get( const MethodContext& context,
void
MessageHandlerImpl::offset(const MethodContext&,
- u_int64_t /*value*/ )
+ u_int64_t /*value*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
@@ -138,16 +138,16 @@ MessageHandlerImpl::ok( const MethodContext& )
void
MessageHandlerImpl::open(const MethodContext&,
- const string& /*reference*/ )
+ const string& /*reference*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
MessageHandlerImpl::qos(const MethodContext& context,
- u_int32_t prefetchSize,
- u_int16_t prefetchCount,
- bool /*global*/ )
+ u_int32_t prefetchSize,
+ u_int16_t prefetchCount,
+ bool /*global*/ )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -160,7 +160,7 @@ MessageHandlerImpl::qos(const MethodContext& context,
void
MessageHandlerImpl::recover(const MethodContext&,
- bool requeue )
+ bool requeue )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -170,45 +170,45 @@ MessageHandlerImpl::recover(const MethodContext&,
void
MessageHandlerImpl::reject(const MethodContext&,
- u_int16_t /*code*/,
- const string& /*text*/ )
+ u_int16_t /*code*/,
+ const string& /*text*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
MessageHandlerImpl::resume(const MethodContext&,
- const string& /*reference*/,
- const string& /*identifier*/ )
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
MessageHandlerImpl::transfer(const MethodContext& context,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool immediate,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& exchangeName,
- const string& routingKey,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content body,
- bool mandatory )
+ u_int16_t /*ticket*/,
+ const string& /*destination*/,
+ bool /*redelivered*/,
+ bool immediate,
+ u_int64_t /*ttl*/,
+ u_int8_t /*priority*/,
+ u_int64_t /*timestamp*/,
+ u_int8_t /*deliveryMode*/,
+ u_int64_t /*expiration*/,
+ const string& exchangeName,
+ const string& routingKey,
+ const string& /*messageId*/,
+ const string& /*correlationId*/,
+ const string& /*replyTo*/,
+ const string& /*contentType*/,
+ const string& /*contentEncoding*/,
+ const string& /*userId*/,
+ const string& /*appId*/,
+ const string& /*transactionId*/,
+ const string& /*securityToken*/,
+ const qpid::framing::FieldTable& /*applicationHeaders*/,
+ qpid::framing::Content body,
+ bool mandatory )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -216,14 +216,15 @@ MessageHandlerImpl::transfer(const MethodContext& context,
broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
if (body.isInline()) {
- MessageMessage* msg =
- new MessageMessage(*(context.methodBody), exchangeName, routingKey, mandatory, immediate);
- channel.handlePublish(msg, exchange);
+ MessageMessage* msg =
+ new MessageMessage(context.methodBody, exchangeName,
+ routingKey, mandatory, immediate);
+ channel.handlePublish(msg, exchange);
- connection.client->getMessageHandler()->ok(context);
+ connection.client->getMessageHandler()->ok(context);
} else {
- // Don't handle reference content yet
- assert(body.isInline());
+ // Don't handle reference content yet
+ assert(body.isInline());
}
}else{
throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
diff --git a/cpp/lib/broker/NullMessageStore.h b/cpp/lib/broker/NullMessageStore.h
index ef2bea8fd6..a5dfce504e 100644
--- a/cpp/lib/broker/NullMessageStore.h
+++ b/cpp/lib/broker/NullMessageStore.h
@@ -34,7 +34,7 @@ namespace qpid {
class NullMessageStore : public MessageStore{
const bool warn;
public:
- NullMessageStore(bool warn = true);
+ NullMessageStore(bool warn = false);
virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings);
virtual void destroy(const Queue& queue);
virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index a207763aac..dd93c6ae8b 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -75,7 +75,7 @@ void Channel::protocolInit(
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
new ConnectionStartOkBody(
- version, props, mechanism, response, locale));
+ version, responses.getRequestId(), props, mechanism, response, locale));
/**
* Assume for now that further challenges will not be required
@@ -85,9 +85,9 @@ void Channel::protocolInit(
connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
**/
- (new ConnectionTuneOkBody(
- version, proposal->getChannelMax(), connection->getMaxFrameSize(),
- proposal->getHeartbeat()))->send(context);
+ send(new ConnectionTuneOkBody(
+ version, responses.getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(),
+ proposal->getHeartbeat()));
u_int16_t heartbeat = proposal->getHeartbeat();
connection->connector->setReadTimeout(heartbeat * 2);
@@ -96,8 +96,7 @@ void Channel::protocolInit(
// Send connection open.
std::string capabilities;
responses.expect();
- (new ConnectionOpenBody(version, vhost, capabilities, true))
- ->send(context);
+ send(new ConnectionOpenBody(version, vhost, capabilities, true));
//receive connection.open-ok (or redirect, but ignore that for now
//esp. as using force=true).
responses.waitForResponse();
@@ -208,8 +207,7 @@ void Channel::cancel(const std::string& tag, bool synch) {
if (i != consumers.end()) {
Consumer& c = i->second;
if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- (new BasicAckBody(version, c.lastDeliveryTag, true))
- ->send(context);
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
sendAndReceiveSync<BasicCancelOkBody>(
synch, new BasicCancelBody(version, tag, !synch));
consumers.erase(tag);
@@ -227,8 +225,7 @@ void Channel::cancelAll(){
// trying the rest. NB no memory leaks if we do,
// ConsumerMap holds values, not pointers.
//
- (new BasicAckBody(version, c.lastDeliveryTag, true))
- ->send(context);
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
}
}
}
@@ -249,7 +246,7 @@ void Channel::retrieve(Message& msg){
bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
string name = queue.getName();
responses.expect();
- (new BasicGetBody(version, 0, name, ackMode))->send(context);
+ send(new BasicGetBody(version, 0, name, ackMode));
responses.waitForResponse();
AMQMethodBody::shared_ptr response = responses.getResponse();
if(response->isA<BasicGetOkBody>()) {
@@ -277,7 +274,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
string e = exchange.getName();
string key = routingKey;
- (new BasicPublishBody(version, 0, e, key, mandatory, immediate))->send(context);
+ send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
@@ -426,8 +423,7 @@ void Channel::deliver(Consumer& consumer, Message& msg){
if(++(consumer.count) < prefetch) break;
//else drop-through
case AUTO_ACK:
- (new BasicAckBody(version, msg.getDeliveryTag(), multiple))
- ->send(context);
+ send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
consumer.lastDeliveryTag = 0;
}
}
@@ -512,7 +508,7 @@ void Channel::closeInternal() {
void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
{
responses.expect();
- toSend->send(context);
+ send(toSend);
responses.receive(c, m);
}
@@ -522,7 +518,7 @@ void Channel::sendAndReceiveSync(
if(sync)
sendAndReceive(body, c, m);
else
- body->send(context);
+ send(body);
}
diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp
index ea234ac321..ea48fa2386 100644
--- a/cpp/lib/client/ResponseHandler.cpp
+++ b/cpp/lib/client/ResponseHandler.cpp
@@ -63,7 +63,7 @@ void ResponseHandler::receive(ClassId c, MethodId m) {
THROW_QPID_ERROR(
PROTOCOL_ERROR, "Channel closed unexpectedly.");
}
- if(!validate(response->amqpClassId(), response->amqpMethodId())) {
+ if(!validate(response->amqpClassId(), response->amqpMethodId())) {
THROW_QPID_ERROR(
PROTOCOL_ERROR,
boost::format("Expected class:method %d:%d, got %d:%d")
@@ -71,6 +71,10 @@ void ResponseHandler::receive(ClassId c, MethodId m) {
}
}
+RequestId ResponseHandler::getRequestId() {
+ assert(response->getRequestId());
+ return response->getRequestId();
+}
void ResponseHandler::expect(){
waiting = true;
}
diff --git a/cpp/lib/client/ResponseHandler.h b/cpp/lib/client/ResponseHandler.h
index c402bcbc65..af0c250eb1 100644
--- a/cpp/lib/client/ResponseHandler.h
+++ b/cpp/lib/client/ResponseHandler.h
@@ -20,8 +20,7 @@
*/
#include <string>
-#include <amqp_types.h>
-#include <framing/amqp_framing.h>
+#include <framing/amqp_framing.h> // FIXME aconway 2007-02-01: #include cleanup.
#include <sys/Monitor.h>
#ifndef _ResponseHandler_
@@ -52,11 +51,13 @@ class ResponseHandler{
bool validate(framing::ClassId, framing::MethodId);
void receive(framing::ClassId, framing::MethodId);
+ framing::RequestId getRequestId();
+
template <class BodyType> bool validate() {
return validate(BodyType::CLASS_ID, BodyType::METHOD_ID);
}
template <class BodyType> void receive() {
- return receive(BodyType::CLASS_ID, BodyType::METHOD_ID);
+ receive(BodyType::CLASS_ID, BodyType::METHOD_ID);
}
};
diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp
index 17138a401e..ab1e15f845 100644
--- a/cpp/lib/common/framing/AMQMethodBody.cpp
+++ b/cpp/lib/common/framing/AMQMethodBody.cpp
@@ -56,8 +56,4 @@ void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/) {
decodeContent(buffer);
}
-void AMQMethodBody::send(const MethodContext& context) {
- context.out->send(new AMQFrame(version, context.channelId, this));
-}
-
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h
index b21ae379dc..492571c83c 100644
--- a/cpp/lib/common/framing/AMQMethodBody.h
+++ b/cpp/lib/common/framing/AMQMethodBody.h
@@ -57,11 +57,8 @@ class AMQMethodBody : public AMQBody
return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID;
}
- /**
- * Wrap this method in a frame and send using the current context.
- * Note the frame takes ownership of the body, it will be deleted.
- */
- virtual void send(const MethodContext& context);
+ /** Return request ID or response correlationID */
+ virtual RequestId getRequestId() const { return 0; }
protected:
static u_int32_t baseSize() { return 4; }
diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h
index 390fe77881..9b8fc1d663 100644
--- a/cpp/lib/common/framing/AMQRequestBody.h
+++ b/cpp/lib/common/framing/AMQRequestBody.h
@@ -58,8 +58,8 @@ class AMQRequestBody : public AMQMethodBody
Data& getData() { return data; }
RequestId getRequestId() const { return data.requestId; }
- void setRequestId(RequestId id) { data.requestId=id; }
ResponseId getResponseMark() const { return data.responseMark; }
+ void setRequestId(RequestId id) { data.requestId=id; }
void setResponseMark(ResponseId mark) { data.responseMark=mark; }
protected:
diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp
index da70e24cd4..7da71a5d25 100644
--- a/cpp/lib/common/framing/AMQResponseBody.cpp
+++ b/cpp/lib/common/framing/AMQResponseBody.cpp
@@ -62,11 +62,4 @@ void AMQResponseBody::printPrefix(std::ostream& out) const {
<< ",batch=" << data.batchOffset << "): ";
}
-void AMQResponseBody::send(const MethodContext& context) {
- setRequestId(context.requestId);
- assert(context.out);
- context.out->send(
- new AMQFrame(version, context.channelId, this));
-}
-
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h
index 2520a481f2..3954836ec8 100644
--- a/cpp/lib/common/framing/AMQResponseBody.h
+++ b/cpp/lib/common/framing/AMQResponseBody.h
@@ -62,15 +62,13 @@ class AMQResponseBody : public AMQMethodBody
void encode(Buffer& buffer) const;
Data& getData() { return data; }
- ResponseId getResponseId() { return data.responseId; }
- RequestId getRequestId() { return data.requestId; }
- BatchOffset getBatchOffset() { return data.batchOffset; }
+ ResponseId getResponseId() const { return data.responseId; }
+ RequestId getRequestId() const { return data.requestId; }
+ BatchOffset getBatchOffset() const { return data.batchOffset; }
void setResponseId(ResponseId id) { data.responseId = id; }
void setRequestId(RequestId id) { data.requestId = id; }
void setBatchOffset(BatchOffset id) { data.batchOffset = id; }
- virtual void send(const MethodContext& context);
-
protected:
static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; }
void printPrefix(std::ostream& out) const;
diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp
index 225e5e3393..149c8144b4 100644
--- a/cpp/lib/common/framing/ChannelAdapter.cpp
+++ b/cpp/lib/common/framing/ChannelAdapter.cpp
@@ -32,12 +32,10 @@ void ChannelAdapter::init(
id = i;
out = &o;
version = v;
- context = MethodContext(id, this);
}
-void ChannelAdapter::send(AMQFrame* frame) {
+void ChannelAdapter::send(AMQBody::shared_ptr body) {
assertChannelOpen();
- AMQBody::shared_ptr body = frame->getBody();
switch (body->type()) {
case REQUEST_BODY: {
AMQRequestBody::shared_ptr request =
@@ -52,18 +50,13 @@ void ChannelAdapter::send(AMQFrame* frame) {
break;
}
}
- out->send(frame);
-}
-
-void ChannelAdapter::send(AMQBody::shared_ptr body) {
- send(new AMQFrame(getVersion(), getId(), body));
+ out->send(new AMQFrame(getVersion(), getId(), body));
}
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
assertMethodOk(*request);
responder.received(request->getData());
- context =MethodContext(id, request, this, request->getRequestId());
- handleMethodInContext(request, context);
+ handleMethodInContext(request, MethodContext(this, request));
}
void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
@@ -76,8 +69,7 @@ void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) {
assertMethodOk(*method);
- context = MethodContext(id, method, this);
- handleMethodInContext(method, context);
+ handleMethodInContext(method, MethodContext(this, method));
}
void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const {
diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h
index f0b3d2469a..26cac76aae 100644
--- a/cpp/lib/common/framing/ChannelAdapter.h
+++ b/cpp/lib/common/framing/ChannelAdapter.h
@@ -27,7 +27,7 @@
#include "BodyHandler.h"
#include "Requester.h"
#include "Responder.h"
-#include "OutputHandler.h"
+#include "framing/amqp_types.h"
namespace qpid {
namespace framing {
@@ -37,24 +37,26 @@ class MethodContext;
/**
* Base class for client and broker channel adapters.
*
- * As BodyHandler:
+ * BodyHandler::handl*
* - receives frame bodies from the network.
* - Updates request/response data.
* - Dispatches requests with a MethodContext for responses.
*
- * As OutputHandler:
+ * send()
* - Updates request/resposne ID data.
* - Forwards frame to the peer.
*
* Thread safety: OBJECT UNSAFE. Instances must not be called
* concurrently. AMQP defines channels to be serialized.
*/
-class ChannelAdapter : public BodyHandler, public OutputHandler {
+class ChannelAdapter : public BodyHandler {
public:
/**
*@param output Processed frames are forwarded to this handler.
*/
- ChannelAdapter() : context(0, 0), id(0), out(0) {}
+ ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0,
+ const ProtocolVersion& ver=ProtocolVersion())
+ : id(id_), out(out_), version(ver) {}
/** Initialize the channel adapter. */
void init(ChannelId, OutputHandler&, const ProtocolVersion&);
@@ -63,12 +65,6 @@ class ChannelAdapter : public BodyHandler, public OutputHandler {
const ProtocolVersion& getVersion() const { return version; }
/**
- * Do request/response-id processing and then forward to
- * handler provided to constructor. Response frames should
- * have their request-id set before calling send.
- */
- void send(AMQFrame* frame);
- /**
* Wrap body in a frame and send the frame.
* Takes ownership of body.
*/
@@ -92,9 +88,6 @@ class ChannelAdapter : public BodyHandler, public OutputHandler {
RequestId getRequestInProgress() { return requestInProgress; }
- protected:
- MethodContext context;
-
private:
ChannelId id;
OutputHandler* out;
diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h
index 454025e377..d9717d90a0 100644
--- a/cpp/lib/common/framing/MethodContext.h
+++ b/cpp/lib/common/framing/MethodContext.h
@@ -19,6 +19,8 @@
*
*/
+#include <boost/shared_ptr.hpp>
+
#include "OutputHandler.h"
#include "ProtocolVersion.h"
@@ -29,54 +31,41 @@ namespace framing {
class BodyHandler;
class AMQMethodBody;
+class ChannelAdapter;
/**
* Invocation context for an AMQP method.
- * Some of the context information is related to the channel, some
- * to the specific invocation - e.g. requestId.
- *
- * All generated proxy and handler functions take a MethodContext parameter.
*
- * The user does not need to create MethodContext objects explicitly,
- * the constructor will implicitly create one from a channel ID.
- *
- * Other context members are for internal use.
+ * It provides the method being processed and the channel on which
+ * it arrived.
+ *
+ * All Handler functions take a MethodContext as the last parameter.
*/
struct MethodContext
{
- /**
- * Passing a integer channel-id in place of a MethodContext
- * will automatically construct the MethodContext.
- */
- MethodContext(ChannelId channel,
- OutputHandler* output=0, RequestId request=0)
- : channelId(channel), out(output), requestId(request)
- {}
-
- MethodContext(ChannelId channel,
- boost::shared_ptr<AMQMethodBody> method,
- OutputHandler* output=0, RequestId request=0)
- : channelId(channel), out(output), requestId(request),
- methodBody(method)
- {}
-
- /** \internal Channel on which the method is sent. */
- ChannelId channelId;
+ typedef boost::shared_ptr<AMQMethodBody> BodyPtr;
- /** Output handler for responses in this context */
- OutputHandler* out;
+ MethodContext(ChannelAdapter* ch=0, BodyPtr method=BodyPtr())
+ : channel(ch), methodBody(method) {}
- /** \internal If we are in the context of processing an incoming request,
- * this is the ID. Otherwise it is 0.
- */
- RequestId requestId;
+ /**
+ * Channel on which the method being processed arrived.
+ * 0 if the method was constructed by the caller
+ * rather than received from a channel.
+ */
+ ChannelAdapter* channel;
- /** \internal This is the Method Body itself
- * It's useful for passing around instead of unpacking all its parameters
+ /**
+ * Body of the method being processed.
+ * It's useful for passing around instead of unpacking all its parameters.
+ * It's also provides the request ID when constructing a response.
*/
- boost::shared_ptr<AMQMethodBody> methodBody;
+ BodyPtr methodBody;
};
+// FIXME aconway 2007-02-01: Method context only required on Handler
+// functions, not on Proxy functions.
+
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/ProtocolVersion.cpp b/cpp/lib/common/framing/ProtocolVersion.cpp
index 69ff89ec32..e65c8b79b8 100644
--- a/cpp/lib/common/framing/ProtocolVersion.cpp
+++ b/cpp/lib/common/framing/ProtocolVersion.cpp
@@ -20,10 +20,13 @@
*/
#include <ProtocolVersion.h>
#include <sstream>
+#include "AMQP_HighestVersion.h"
using namespace qpid::framing;
-ProtocolVersion::ProtocolVersion() {}
+ProtocolVersion::ProtocolVersion() {
+ *this = highestProtocolVersion;
+}
ProtocolVersion::ProtocolVersion(u_int8_t _major, u_int8_t _minor) :
major_(_major),
diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp
index 760a4d3344..a3dabe6408 100644
--- a/cpp/tests/ChannelTest.cpp
+++ b/cpp/tests/ChannelTest.cpp
@@ -28,6 +28,9 @@
#include <memory>
#include <AMQP_HighestVersion.h>
#include "AMQFrame.h"
+#include "DummyChannel.h"
+#include "broker/Connection.h"
+#include "ProtocolInitiation.h"
using namespace boost;
using namespace qpid::broker;
@@ -36,12 +39,12 @@ using namespace qpid::sys;
using std::string;
using std::queue;
-struct DummyHandler : OutputHandler{
+struct DummyHandler : ConnectionOutputHandler{
std::vector<AMQFrame*> frames;
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
- }
+ void send(AMQFrame* frame){ frames.push_back(frame); }
+
+ void close() {};
};
@@ -55,6 +58,10 @@ class ChannelTest : public CppUnit::TestCase
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST_SUITE_END();
+ Broker::shared_ptr broker;
+ Connection connection;
+ DummyHandler handler;
+
class MockMessageStore : public NullMessageStore
{
struct MethodCall
@@ -135,9 +142,17 @@ class ChannelTest : public CppUnit::TestCase
public:
+ ChannelTest() :
+ broker(Broker::create()),
+ connection(&handler, *broker)
+ {
+ connection.initiated(new ProtocolInitiation());
+ }
+
+
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(qpid::framing::highestProtocolVersion, 0, 0, 0);
+ Channel channel(connection, 0, 0, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
@@ -162,12 +177,10 @@ class ChannelTest : public CppUnit::TestCase
}
void testDeliveryNoAck(){
- DummyHandler handler;
- Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000);
-
+ Channel channel(connection, 7, 10000);
const string data("abcdefghijklmn");
-
- Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ Message::shared_ptr msg(
+ createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
ConnectionToken* owner(0);
@@ -175,22 +188,25 @@ class ChannelTest : public CppUnit::TestCase
channel.consume(tag, queue, false, false, owner);
queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel());
+ CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
+ handler.frames[0]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
+ handler.frames[1]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
+ handler.frames[2]->getBody().get()));
+ AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
+ handler.frames[3]->getBody().get());
CPPUNIT_ASSERT(contentBody);
CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
}
void testDeliveryAndRecovery(){
- DummyHandler handler;
- Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000);
+ Channel channel(connection, 7, 10000);
const string data("abcdefghijklmn");
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
@@ -202,26 +218,32 @@ class ChannelTest : public CppUnit::TestCase
channel.consume(tag, queue, true, false, owner);
queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel());
+ CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
+ handler.frames[0]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
+ handler.frames[1]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
+ handler.frames[2]->getBody().get()));
+ AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
+ handler.frames[3]->getBody().get());
CPPUNIT_ASSERT(contentBody);
CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
}
void testStaging(){
MockMessageStore store;
- DummyHandler handler;
- Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
+ Channel channel(
+ connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
const string data[] = {"abcde", "fghij", "klmno"};
- Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
+ Message* msg = new BasicMessage(
+ 0, "my_exchange", "my_routing_key", false, false,
+ DummyChannel::basicGetBody());
store.expect();
store.stage(msg);
@@ -309,7 +331,9 @@ class ChannelTest : public CppUnit::TestCase
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize)
{
- BasicMessage* msg = new BasicMessage(0, exchange, routingKey, false, false);
+ BasicMessage* msg = new BasicMessage(
+ 0, exchange, routingKey, false, false,
+ DummyChannel::basicGetBody());
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(contentSize);
msg->setHeader(header);
diff --git a/cpp/tests/ExchangeTest.cpp b/cpp/tests/ExchangeTest.cpp
index a31c369fe1..cccec92024 100644
--- a/cpp/tests/ExchangeTest.cpp
+++ b/cpp/tests/ExchangeTest.cpp
@@ -26,8 +26,10 @@
#include <TopicExchange.h>
#include <qpid_test_plugin.h>
#include <iostream>
+#include "BasicGetBody.h"
using namespace qpid::broker;
+using namespace qpid::framing;
using namespace qpid::sys;
class ExchangeTest : public CppUnit::TestCase
@@ -54,7 +56,11 @@ class ExchangeTest : public CppUnit::TestCase
queue.reset();
queue2.reset();
- Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true));
+ Message::shared_ptr msgPtr(
+ new BasicMessage(
+ 0, "e", "A", true, true,
+ AMQMethodBody::shared_ptr(
+ new BasicGetBody(ProtocolVersion()))));
DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp
index 721d8ad857..f8754337c8 100644
--- a/cpp/tests/FramingTest.cpp
+++ b/cpp/tests/FramingTest.cpp
@@ -105,7 +105,7 @@ class FramingTest : public CppUnit::TestCase
{
std::string a = "hostA";
std::string b = "hostB";
- ConnectionRedirectBody in(version, a, b);
+ ConnectionRedirectBody in(version, 0, a, b);
in.encodeContent(buffer);
buffer.flip();
ConnectionRedirectBody out(version);
@@ -142,7 +142,8 @@ class FramingTest : public CppUnit::TestCase
{
std::string a = "hostA";
std::string b = "hostB";
- AMQFrame in(version, 999, new ConnectionRedirectBody(version, a, b));
+ AMQFrame in(version, 999,
+ new ConnectionRedirectBody(version, 0, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -153,7 +154,7 @@ class FramingTest : public CppUnit::TestCase
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(version, 999, new BasicConsumeOkBody(version, s));
+ AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp
index 1494518578..db54ea44a0 100644
--- a/cpp/tests/InMemoryContentTest.cpp
+++ b/cpp/tests/InMemoryContentTest.cpp
@@ -24,6 +24,7 @@
#include <iostream>
#include <list>
#include "AMQFrame.h"
+#include "DummyChannel.h"
using std::list;
using std::string;
@@ -31,13 +32,6 @@ using boost::dynamic_pointer_cast;
using namespace qpid::broker;
using namespace qpid::framing;
-struct DummyHandler : OutputHandler{
- std::vector<AMQFrame*> frames;
-
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
- }
-};
class InMemoryContentTest : public CppUnit::TestCase
{
@@ -64,12 +58,21 @@ public:
void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5)
{
InMemoryContent content;
- DummyHandler handler;
- u_int16_t channel = 3;
+ DummyChannel channel(3);
addframes(content, inCount, in);
- content.send(highestProtocolVersion, &handler, channel, framesize);
- check(handler, channel, outCount, out);
+ content.send(channel, framesize);
+ CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
+
+ for (unsigned int i = 0; i < outCount; i++) {
+ AMQContentBody::shared_ptr chunk(
+ dynamic_pointer_cast<AMQContentBody>(
+ channel.out.frames[i]->getBody()));
+ CPPUNIT_ASSERT(chunk);
+ CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
+ CPPUNIT_ASSERT_EQUAL(
+ ChannelId(3), channel.out.frames[i]->getChannel());
+ }
}
void addframes(InMemoryContent& content, size_t frameCount, string* frameData)
@@ -80,17 +83,7 @@ public:
}
}
- void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks)
- {
- CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size());
- for (unsigned int i = 0; i < expectedChunkCount; i++) {
- AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody()));
- CPPUNIT_ASSERT(chunk);
- CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData());
- CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel());
- }
- }
};
// Make this test suite a plugin.
diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp
index 624d2be3ff..49e4ecc4ae 100644
--- a/cpp/tests/LazyLoadedContentTest.cpp
+++ b/cpp/tests/LazyLoadedContentTest.cpp
@@ -26,20 +26,13 @@
#include <list>
#include <sstream>
#include "AMQFrame.h"
-
+#include "DummyChannel.h"
using std::list;
using std::string;
using boost::dynamic_pointer_cast;
using namespace qpid::broker;
using namespace qpid::framing;
-struct DummyHandler : OutputHandler{
- std::vector<AMQFrame*> frames;
-
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
- }
-};
class LazyLoadedContentTest : public CppUnit::TestCase
@@ -99,21 +92,16 @@ public:
{
TestMessageStore store(in);
LazyLoadedContent content(&store, 0, in.size());
- DummyHandler handler;
- u_int16_t channel = 3;
- content.send(highestProtocolVersion, &handler, channel, framesize);
- check(handler, channel, outCount, out);
- }
-
- void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks)
- {
- CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size());
+ DummyChannel channel(3);
+ content.send(channel, framesize);
+ CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
- for (unsigned int i = 0; i < expectedChunkCount; i++) {
- AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody()));
+ for (unsigned int i = 0; i < outCount; i++) {
+ AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[i]->getBody()));
CPPUNIT_ASSERT(chunk);
- CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData());
- CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
+ CPPUNIT_ASSERT_EQUAL(
+ ChannelId(3), channel.out.frames[i]->getChannel());
}
}
};
diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp
index e84d1df0e7..dc660751b7 100644
--- a/cpp/tests/MessageBuilderTest.cpp
+++ b/cpp/tests/MessageBuilderTest.cpp
@@ -26,6 +26,7 @@
#include <qpid_test_plugin.h>
#include <iostream>
#include <memory>
+#include "DummyChannel.h"
using namespace boost;
using namespace qpid::broker;
@@ -36,7 +37,7 @@ class MessageBuilderTest : public CppUnit::TestCase
{
struct DummyHandler : MessageBuilder::CompletionHandler{
Message::shared_ptr msg;
-
+
virtual void complete(Message::shared_ptr& _msg){
msg = _msg;
}
@@ -48,7 +49,7 @@ class MessageBuilderTest : public CppUnit::TestCase
Buffer* content;
const u_int32_t contentBufferSize;
- public:
+ public:
void stage(Message* const msg)
{
@@ -98,7 +99,7 @@ class MessageBuilderTest : public CppUnit::TestCase
}
//dont care about any of the other methods:
- TestMessageStore(u_int32_t _contentBufferSize) : NullMessageStore(false), header(0), content(0),
+ TestMessageStore(u_int32_t _contentBufferSize) : NullMessageStore(), header(0), content(0),
contentBufferSize(_contentBufferSize) {}
~TestMessageStore(){}
};
@@ -116,7 +117,10 @@ class MessageBuilderTest : public CppUnit::TestCase
DummyHandler handler;
MessageBuilder builder(&handler);
- Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(
+ new BasicMessage(
+ 0, "test", "my_routing_key", false, false,
+ DummyChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(0);
@@ -133,7 +137,9 @@ class MessageBuilderTest : public CppUnit::TestCase
string data1("abcdefg");
- Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(
+ new BasicMessage(0, "test", "my_routing_key", false, false,
+ DummyChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(7);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -154,7 +160,9 @@ class MessageBuilderTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(
+ new BasicMessage(0, "test", "my_routing_key", false, false,
+ DummyChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -183,7 +191,9 @@ class MessageBuilderTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(
+ new BasicMessage(0, "test", "my_routing_key", false, false,
+ DummyChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties());
diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp
index 62249e1f5f..103a23f0df 100644
--- a/cpp/tests/MessageTest.cpp
+++ b/cpp/tests/MessageTest.cpp
@@ -23,19 +23,12 @@
#include <iostream>
#include <AMQP_HighestVersion.h>
#include "AMQFrame.h"
+#include "DummyChannel.h"
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
-struct DummyHandler : OutputHandler{
- std::vector<AMQFrame*> frames;
-
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
- }
-};
-
class MessageTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(MessageTest);
@@ -52,7 +45,9 @@ class MessageTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr msg = Message::shared_ptr(new BasicMessage(0, exchange, routingKey, false, false));
+ BasicMessage::shared_ptr msg(
+ new BasicMessage(0, exchange, routingKey, false, false,
+ DummyChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -69,7 +64,8 @@ class MessageTest : public CppUnit::TestCase
msg->encode(buffer);
buffer.flip();
- msg = Message::shared_ptr(new BasicMessage(buffer));
+ msg.reset(new BasicMessage());
+ msg->decode(buffer);
CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId());
@@ -77,10 +73,11 @@ class MessageTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc"));
CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
- DummyHandler handler;
- msg->deliver(&handler, 0, "ignore", 0, 100, &(qpid::framing::highestProtocolVersion));
- CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+ DummyChannel channel(1);
+ // FIXME aconway 2007-02-02: deliver should take const ProtocolVersion&
+ msg->deliver(channel, "ignore", 0, 100);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size());
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[2]->getBody()));
CPPUNIT_ASSERT(contentBody);
CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
}
diff --git a/cpp/tests/QueueTest.cpp b/cpp/tests/QueueTest.cpp
index e156efc507..7105509de6 100644
--- a/cpp/tests/QueueTest.cpp
+++ b/cpp/tests/QueueTest.cpp
@@ -22,6 +22,7 @@
#include <QueueRegistry.h>
#include <qpid_test_plugin.h>
#include <iostream>
+#include "DummyChannel.h"
using namespace qpid::broker;
using namespace qpid::sys;
@@ -54,6 +55,12 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_TEST_SUITE_END();
public:
+ Message::shared_ptr message(std::string exchange, std::string routingKey) {
+ return Message::shared_ptr(
+ new BasicMessage(0, exchange, routingKey, true, true,
+ DummyChannel::basicGetBody()));
+ }
+
void testConsumers(){
Queue::shared_ptr queue(new Queue("my_queue", true));
@@ -66,9 +73,9 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount());
//Test basic delivery:
- Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true));
- Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true));
- Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true));
+ Message::shared_ptr msg1 = message("e", "A");
+ Message::shared_ptr msg2 = message("e", "B");
+ Message::shared_ptr msg3 = message("e", "C");
queue->deliver(msg1);
CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
@@ -122,10 +129,9 @@ class QueueTest : public CppUnit::TestCase
void testDequeue(){
Queue::shared_ptr queue(new Queue("my_queue", true));
-
- Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true));
- Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true));
- Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true));
+ Message::shared_ptr msg1 = message("e", "A");
+ Message::shared_ptr msg2 = message("e", "B");
+ Message::shared_ptr msg3 = message("e", "C");
Message::shared_ptr received;
queue->deliver(msg1);
diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp
index a6fe0f1010..07381d187d 100644
--- a/cpp/tests/TxAckTest.cpp
+++ b/cpp/tests/TxAckTest.cpp
@@ -25,6 +25,7 @@
#include <iostream>
#include <list>
#include <vector>
+#include "DummyChannel.h"
using std::list;
using std::vector;
@@ -44,7 +45,7 @@ class TxAckTest : public CppUnit::TestCase
dequeued.push_back(std::pair<Message*, const string*>(msg, xid));
}
- TestMessageStore() : NullMessageStore(false) {}
+ TestMessageStore() : NullMessageStore() {}
~TestMessageStore(){}
};
@@ -69,7 +70,9 @@ public:
TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid)
{
for(int i = 0; i < 10; i++){
- Message::shared_ptr msg(new BasicMessage(0, "exchange", "routing_key", false, false));
+ Message::shared_ptr msg(
+ new BasicMessage(0, "exchange", "routing_key", false, false,
+ DummyChannel::basicGetBody()));
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
messages.push_back(msg);
diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp
index 87658af62e..08e5339048 100644
--- a/cpp/tests/TxPublishTest.cpp
+++ b/cpp/tests/TxPublishTest.cpp
@@ -25,6 +25,7 @@
#include <iostream>
#include <list>
#include <vector>
+#include "DummyChannel.h"
using std::list;
using std::pair;
@@ -73,10 +74,12 @@ class TxPublishTest : public CppUnit::TestCase
public:
- TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)),
- queue2(new Queue("queue2", false, &store, 0)),
- msg(new BasicMessage(0, "exchange", "routing_key", false, false)),
- op(msg, &xid)
+ TxPublishTest() :
+ queue1(new Queue("queue1", false, &store, 0)),
+ queue2(new Queue("queue2", false, &store, 0)),
+ msg(new BasicMessage(0, "exchange", "routing_key", false, false,
+ DummyChannel::basicGetBody())),
+ op(msg, &xid)
{
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);