diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2006-12-20 22:29:38 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2006-12-20 22:29:38 +0000 |
commit | 786c13d1833f626bf47262dd16ea48c81ac3887f (patch) | |
tree | a24df1b5de4584d3055a754235e93bdee1ad0075 /cpp/lib | |
parent | dc0593dbce33328266edade35431a6571342786c (diff) | |
download | qpid-python-786c13d1833f626bf47262dd16ea48c81ac3887f.tar.gz |
Support for multi version, merge part 1. - can still refactor out dup use of
version object in client and server opperations.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489212 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 11 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 3 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 20 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 8 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 9 | ||||
-rw-r--r-- | cpp/lib/client/ClientChannel.h | 2 | ||||
-rw-r--r-- | cpp/lib/client/Connection.cpp | 8 | ||||
-rw-r--r-- | cpp/lib/client/Connection.h | 6 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQFrame.h | 5 |
9 files changed, 43 insertions, 29 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 5d4f68a8af..f569872770 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -31,7 +31,7 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) : +Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) : id(_id), out(_out), currentDeliveryTag(1), @@ -41,7 +41,8 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStor framesize(_framesize), tagGenerator("sgen"), store(_store), - messageBuilder(this, _store, _stagingThreshold){ + messageBuilder(this, _store, _stagingThreshold), + version(_version){ outstanding.reset(); } @@ -118,7 +119,7 @@ void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue outstanding.count++; } //send deliver method, header and content(s) - msg->deliver(out, id, consumerTag, deliveryTag, framesize); + msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ @@ -242,7 +243,7 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ if(msg){ Mutex::ScopedLock locker(deliveryLock); u_int64_t myDeliveryTag = currentDeliveryTag++; - msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); + msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -253,5 +254,5 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ } void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ - msg->deliver(out, id, consumerTag, deliveryTag, framesize); + msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); } diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index fa3912c78e..888ca3c051 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -88,6 +88,7 @@ namespace qpid { MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to + qpid::framing::ProtocolVersion version; // version used for this channel virtual void complete(Message::shared_ptr& msg); void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); @@ -95,7 +96,7 @@ namespace qpid { bool checkPrefetch(Message::shared_ptr& msg); public: - Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize, + Channel(qpid::framing::ProtocolVersion& _version, qpid::framing::OutputHandler* out, int id, u_int32_t framesize, MessageStore* const _store = 0, u_int64_t stagingThreshold = 0); ~Channel(); inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 598de2d590..7fef77e1ff 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -24,8 +24,6 @@ #include <InMemoryContent.h> #include <LazyLoadedContent.h> #include <MessageStore.h> -// AMQP version change - kpvdr 2006-11-17 -#include <ProtocolVersion.h> #include <BasicDeliverBody.h> #include <BasicGetOkBody.h> @@ -79,11 +77,10 @@ void Message::redeliver(){ void Message::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, - u_int32_t framesize){ - - // AMQP version change - kpvdr 2006-11-17 - // TODO: Make this class version-aware and link these hard-wired numbers to that version - out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey))); + u_int32_t framesize, + ProtocolVersion* version){ + // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction + out->send(new AMQFrame(channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey))); sendContent(out, channel, framesize); } @@ -91,11 +88,10 @@ void Message::sendGetOk(OutputHandler* out, int channel, u_int32_t messageCount, u_int64_t deliveryTag, - u_int32_t framesize){ - - // AMQP version change - kpvdr 2006-11-17 - // TODO: Make this class version-aware and link these hard-wired numbers to that version - out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount))); + u_int32_t framesize, + ProtocolVersion* version){ + // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction + out->send(new AMQFrame(channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount))); sendContent(out, channel, framesize); } diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 3bf70551d3..39142546bc 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -25,6 +25,7 @@ #include <boost/shared_ptr.hpp> #include <AMQContentBody.h> #include <AMQHeaderBody.h> +#include <ProtocolVersion.h> #include <BasicHeaderProperties.h> #include <ConnectionToken.h> #include <Content.h> @@ -37,6 +38,7 @@ namespace qpid { class MessageStore; using qpid::framing::string; + /** * Represents an AMQP message, i.e. a header body, a list of * content bodies and some details about the publication @@ -76,12 +78,14 @@ namespace qpid { int channel, const string& consumerTag, u_int64_t deliveryTag, - u_int32_t framesize); + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); void sendGetOk(qpid::framing::OutputHandler* out, int channel, u_int32_t messageCount, u_int64_t deliveryTag, - u_int32_t framesize); + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); void redeliver(); qpid::framing::BasicHeaderProperties* getHeaderProperties(); diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index bd6ca9dee9..9131060b81 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -131,7 +131,10 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ if (client == NULL) { client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); - + + + std::cout << "---------------" << this << std::endl; + //send connection start FieldTable properties; string mechanisms("PLAIN"); @@ -212,7 +215,9 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ - parent->channels[channel] = new Channel(parent->context, channel, parent->framemax, + + + parent->channels[channel] = new Channel(parent->client->getProtocolVersion() , parent->context, channel, parent->framemax, parent->queues->getStore(), parent->settings.stagingThreshold); parent->client->getChannel().openOk(channel); } diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index 27509a10d9..066f837430 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -92,7 +92,7 @@ namespace client { u_int16_t prefetch; const bool transactional; - qpid::framing::ProtocolVersion version; + qpid::framing::ProtocolVersion version; void enqueue(); void retrieve(Message& msg); diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 9c81192573..78aeafb37b 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -32,10 +32,8 @@ using namespace qpid::sys; u_int16_t Connection::channelIdCounter; -Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true), -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - version(8, 0) +Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true), + version(_version->getMajor(),_version->getMinor()) { connector = new Connector(debug, _max_frame_size); } @@ -53,7 +51,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui out = connector->getOutputHandler(); connector->connect(host, port); - ProtocolInitiation* header = new ProtocolInitiation(8, 0); + ProtocolInitiation* header = new ProtocolInitiation(version); responses.expect(); connector->init(header); responses.receive(method_bodies.connection_start); diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 7d3f1a1446..3de9b6bf31 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -38,6 +38,7 @@ #include <ResponseHandler.h> namespace qpid { + /** * The client namespace contains all classes that make up a client * implementation of the AMQP protocol. The key classes that form @@ -93,6 +94,8 @@ namespace client { * Creates a connection object, but does not open the * connection. * + * @param _version the version of the protocol to connect with + * * @param debug turns on tracing for the connection * (i.e. prints details of the frames sent and received to std * out). Optional and defaults to false. @@ -100,7 +103,8 @@ namespace client { * @param max_frame_size the maximum frame size that the * client will accept. Optional and defaults to 65536. */ - Connection(bool debug = false, u_int32_t max_frame_size = 65536); + Connection( bool debug = false, u_int32_t max_frame_size = 65536, + qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestVersion)); ~Connection(); /** diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index bec1946fb7..0cdd5674c1 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -35,6 +35,11 @@ namespace qpid { namespace framing { + + // TODO - replace with generated file + // CCT + static ProtocolVersion highestVersion(8,0); + class AMQFrame : virtual public AMQDataBlock { static AMQP_MethodVersionMap versionMap; |