summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2006-12-20 22:29:38 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2006-12-20 22:29:38 +0000
commit786c13d1833f626bf47262dd16ea48c81ac3887f (patch)
treea24df1b5de4584d3055a754235e93bdee1ad0075 /cpp/lib
parentdc0593dbce33328266edade35431a6571342786c (diff)
downloadqpid-python-786c13d1833f626bf47262dd16ea48c81ac3887f.tar.gz
Support for multi version, merge part 1. - can still refactor out dup use of
version object in client and server opperations. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489212 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp11
-rw-r--r--cpp/lib/broker/BrokerChannel.h3
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp20
-rw-r--r--cpp/lib/broker/BrokerMessage.h8
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp9
-rw-r--r--cpp/lib/client/ClientChannel.h2
-rw-r--r--cpp/lib/client/Connection.cpp8
-rw-r--r--cpp/lib/client/Connection.h6
-rw-r--r--cpp/lib/common/framing/AMQFrame.h5
9 files changed, 43 insertions, 29 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 5d4f68a8af..f569872770 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -31,7 +31,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) :
+Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) :
id(_id),
out(_out),
currentDeliveryTag(1),
@@ -41,7 +41,8 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStor
framesize(_framesize),
tagGenerator("sgen"),
store(_store),
- messageBuilder(this, _store, _stagingThreshold){
+ messageBuilder(this, _store, _stagingThreshold),
+ version(_version){
outstanding.reset();
}
@@ -118,7 +119,7 @@ void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue
outstanding.count++;
}
//send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, deliveryTag, framesize);
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -242,7 +243,7 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
if(msg){
Mutex::ScopedLock locker(deliveryLock);
u_int64_t myDeliveryTag = currentDeliveryTag++;
- msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
+ msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -253,5 +254,5 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
}
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
- msg->deliver(out, id, consumerTag, deliveryTag, framesize);
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
}
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index fa3912c78e..888ca3c051 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -88,6 +88,7 @@ namespace qpid {
MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
+ qpid::framing::ProtocolVersion version; // version used for this channel
virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
@@ -95,7 +96,7 @@ namespace qpid {
bool checkPrefetch(Message::shared_ptr& msg);
public:
- Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize,
+ Channel(qpid::framing::ProtocolVersion& _version, qpid::framing::OutputHandler* out, int id, u_int32_t framesize,
MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
~Channel();
inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 598de2d590..7fef77e1ff 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -24,8 +24,6 @@
#include <InMemoryContent.h>
#include <LazyLoadedContent.h>
#include <MessageStore.h>
-// AMQP version change - kpvdr 2006-11-17
-#include <ProtocolVersion.h>
#include <BasicDeliverBody.h>
#include <BasicGetOkBody.h>
@@ -79,11 +77,10 @@ void Message::redeliver(){
void Message::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
- u_int32_t framesize){
-
- // AMQP version change - kpvdr 2006-11-17
- // TODO: Make this class version-aware and link these hard-wired numbers to that version
- out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ u_int32_t framesize,
+ ProtocolVersion* version){
+ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
+ out->send(new AMQFrame(channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey)));
sendContent(out, channel, framesize);
}
@@ -91,11 +88,10 @@ void Message::sendGetOk(OutputHandler* out,
int channel,
u_int32_t messageCount,
u_int64_t deliveryTag,
- u_int32_t framesize){
-
- // AMQP version change - kpvdr 2006-11-17
- // TODO: Make this class version-aware and link these hard-wired numbers to that version
- out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ u_int32_t framesize,
+ ProtocolVersion* version){
+ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
+ out->send(new AMQFrame(channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount)));
sendContent(out, channel, framesize);
}
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index 3bf70551d3..39142546bc 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -25,6 +25,7 @@
#include <boost/shared_ptr.hpp>
#include <AMQContentBody.h>
#include <AMQHeaderBody.h>
+#include <ProtocolVersion.h>
#include <BasicHeaderProperties.h>
#include <ConnectionToken.h>
#include <Content.h>
@@ -37,6 +38,7 @@ namespace qpid {
class MessageStore;
using qpid::framing::string;
+
/**
* Represents an AMQP message, i.e. a header body, a list of
* content bodies and some details about the publication
@@ -76,12 +78,14 @@ namespace qpid {
int channel,
const string& consumerTag,
u_int64_t deliveryTag,
- u_int32_t framesize);
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
void sendGetOk(qpid::framing::OutputHandler* out,
int channel,
u_int32_t messageCount,
u_int64_t deliveryTag,
- u_int32_t framesize);
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
void redeliver();
qpid::framing::BasicHeaderProperties* getHeaderProperties();
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index bd6ca9dee9..9131060b81 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -131,7 +131,10 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
if (client == NULL)
{
client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
-
+
+
+ std::cout << "---------------" << this << std::endl;
+
//send connection start
FieldTable properties;
string mechanisms("PLAIN");
@@ -212,7 +215,9 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
- parent->channels[channel] = new Channel(parent->context, channel, parent->framemax,
+
+
+ parent->channels[channel] = new Channel(parent->client->getProtocolVersion() , parent->context, channel, parent->framemax,
parent->queues->getStore(), parent->settings.stagingThreshold);
parent->client->getChannel().openOk(channel);
}
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index 27509a10d9..066f837430 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -92,7 +92,7 @@ namespace client {
u_int16_t prefetch;
const bool transactional;
- qpid::framing::ProtocolVersion version;
+ qpid::framing::ProtocolVersion version;
void enqueue();
void retrieve(Message& msg);
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index 9c81192573..78aeafb37b 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -32,10 +32,8 @@ using namespace qpid::sys;
u_int16_t Connection::channelIdCounter;
-Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true),
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- version(8, 0)
+Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true),
+ version(_version->getMajor(),_version->getMinor())
{
connector = new Connector(debug, _max_frame_size);
}
@@ -53,7 +51,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
out = connector->getOutputHandler();
connector->connect(host, port);
- ProtocolInitiation* header = new ProtocolInitiation(8, 0);
+ ProtocolInitiation* header = new ProtocolInitiation(version);
responses.expect();
connector->init(header);
responses.receive(method_bodies.connection_start);
diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h
index 7d3f1a1446..3de9b6bf31 100644
--- a/cpp/lib/client/Connection.h
+++ b/cpp/lib/client/Connection.h
@@ -38,6 +38,7 @@
#include <ResponseHandler.h>
namespace qpid {
+
/**
* The client namespace contains all classes that make up a client
* implementation of the AMQP protocol. The key classes that form
@@ -93,6 +94,8 @@ namespace client {
* Creates a connection object, but does not open the
* connection.
*
+ * @param _version the version of the protocol to connect with
+ *
* @param debug turns on tracing for the connection
* (i.e. prints details of the frames sent and received to std
* out). Optional and defaults to false.
@@ -100,7 +103,8 @@ namespace client {
* @param max_frame_size the maximum frame size that the
* client will accept. Optional and defaults to 65536.
*/
- Connection(bool debug = false, u_int32_t max_frame_size = 65536);
+ Connection( bool debug = false, u_int32_t max_frame_size = 65536,
+ qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestVersion));
~Connection();
/**
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
index bec1946fb7..0cdd5674c1 100644
--- a/cpp/lib/common/framing/AMQFrame.h
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -35,6 +35,11 @@
namespace qpid {
namespace framing {
+
+ // TODO - replace with generated file
+ // CCT
+ static ProtocolVersion highestVersion(8,0);
+
class AMQFrame : virtual public AMQDataBlock
{
static AMQP_MethodVersionMap versionMap;