summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-19 08:27:36 +0000
committerGordon Sim <gsim@apache.org>2007-07-19 08:27:36 +0000
commitb87a1e9d27755e2f98792567c29a0625b92c8654 (patch)
treecb1232987efbfa1cc0ef8ec5e62b07b6b6c918b6
parentdfe8a370b6580446cf970e27562ad0385178922f (diff)
downloadqpid-python-b87a1e9d27755e2f98792567c29a0625b92c8654.tar.gz
removed the need to pass MethodContext/RequestId through proxy and handler/adapter interfaces
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557522 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/gentools/src/org/apache/qpid/gentools/AmqpClass.java3
-rw-r--r--cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java1
-rw-r--r--cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java43
-rw-r--r--cpp/gentools/src/org/apache/qpid/gentools/Utils.java7
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp144
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h63
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp18
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.h8
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h4
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp6
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.h5
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.cpp25
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h20
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp65
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.h46
-rw-r--r--cpp/src/qpid/broker/GetAdapter.cpp3
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp108
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.h66
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp25
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h2
-rw-r--r--cpp/src/qpid/framing/FramingContent.h20
-rw-r--r--cpp/src/qpid/framing/MethodContext.cpp7
-rw-r--r--cpp/src/qpid/framing/Proxy.h1
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.cpp62
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.h51
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp7
-rw-r--r--cpp/src/tests/ExchangeTest.cpp6
-rw-r--r--cpp/src/tests/Makefile.am3
-rw-r--r--cpp/src/tests/MessageBuilderTest.cpp12
-rw-r--r--cpp/src/tests/MessageTest.cpp3
-rw-r--r--cpp/src/tests/QueueTest.cpp3
-rw-r--r--cpp/src/tests/SequenceNumberTest.cpp108
-rw-r--r--cpp/src/tests/TxAckTest.cpp3
-rw-r--r--cpp/src/tests/TxPublishTest.cpp3
35 files changed, 570 insertions, 383 deletions
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/AmqpClass.java b/cpp/gentools/src/org/apache/qpid/gentools/AmqpClass.java
index 2e8bdaf971..1bc9dc456a 100644
--- a/cpp/gentools/src/org/apache/qpid/gentools/AmqpClass.java
+++ b/cpp/gentools/src/org/apache/qpid/gentools/AmqpClass.java
@@ -93,6 +93,9 @@ public class AmqpClass implements Printable, NodeAware
thisMethod = new AmqpMethod(methodName, converter);
methodMap.put(methodName, thisMethod);
}
+ int content = Utils.getNamedIntegerAttribute(child, Utils.ATTRIBUTE_CONTENT, 0);
+ thisMethod.content = (content == 1);
+
if (!thisMethod.addFromNode(child, 0, version))
{
String className = converter.prepareClassName(Utils.getNamedAttribute(classNode,
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java b/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
index f3c5bdd935..e78eec112f 100644
--- a/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
+++ b/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
@@ -36,6 +36,7 @@ public class AmqpMethod implements Printable, NodeAware, VersionConsistencyCheck
public AmqpFlagMap clientMethodFlagMap; // Method called on client (<chassis name="server"> in XML)
public AmqpFlagMap serverMethodFlagMap; // Method called on server (<chassis name="client"> in XML)
public AmqpFlagMap isResponseFlagMap;
+ public boolean content;
public AmqpMethod(String name, LanguageConverter converter)
{
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
index 26b189a1ff..edcf9e09e3 100644
--- a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
+++ b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
@@ -763,14 +763,23 @@ public class CppGenerator extends Generator
AmqpVersionSet versionSet = overloadededParameterMap.get(thisFieldMap);
if (!first) sb.append(cr);
sb.append(indent + "virtual "+returnType+" "+ methodName + "(");
- if (abstractMethodFlag) sb.append("const MethodContext& context");
- boolean leadingComma = abstractMethodFlag;
- int paramIndent = indentSize + (5*tabSize);
- sb.append(generateMethodParameterList(thisFieldMap, paramIndent, leadingComma, true, true));
+
+ if (abstractMethodFlag && isSpecialCase(thisClass.name, method.name)) {
+ sb.append("const MethodContext& context");
+ } else {
+ sb.append(generateMethodParameterList(thisFieldMap, indentSize + (5*tabSize), false, true, true));
+ }
+
+ //if (abstractMethodFlag) sb.append("const MethodContext& context");
+ //boolean leadingComma = abstractMethodFlag;
+ //int paramIndent = indentSize + (5*tabSize);
+ // sb.append(generateMethodParameterList(thisFieldMap, paramIndent, leadingComma, true, true));
+ /*
if (!abstractMethodFlag && method.isResponse(null)) {
if (!thisFieldMap.isEmpty()) sb.append(", \n"+Utils.createSpaces(paramIndent));
sb.append(" RequestId responseTo");
}
+ */
sb.append(" )");
if (abstractMethodFlag)
sb.append(" = 0");
@@ -888,6 +897,7 @@ public class CppGenerator extends Generator
sb.append(indent + "{" + cr);
sb.append(indent + "private:" + cr);
sb.append(indent + tab + "ChannelAdapter& channel;" + cr);
+ sb.append(indent + tab + "RequestId responseTo;" + cr);
sb.append(cr);
sb.append(indent + "public:" + cr);
sb.append(indent + tab + "// Constructors and destructors" + cr);
@@ -897,6 +907,9 @@ public class CppGenerator extends Generator
sb.append(indent + tab + "virtual ~" + className + "() {}" + cr);
sb.append(cr);
sb.append(indent + tab + "static "+className+"& get(" + proxyOuterClassName(serverFlag)+"& proxy) { return proxy.get"+className+"();}\n\n");
+ sb.append(indent + tab + "// set for response correlation" + cr);
+ sb.append(indent + tab + "void setResponseTo(RequestId r) { responseTo = r; }" + cr);
+ sb.append(cr);
sb.append(indent + tab + "// Protocol methods" + cr);
sb.append(cr);
sb.append(generateInnerClassMethods(thisClass, serverFlag, false, indentSize + tabSize, tabSize));
@@ -1004,8 +1017,10 @@ public class CppGenerator extends Generator
methodName + "(");
sb.append(generateMethodParameterList(thisFieldMap, indentSize + (5*tabSize), false, true, true));
if (method.isResponse(null)) {
+ /*
if (!thisFieldMap.isEmpty()) sb.append(", ");
- sb.append("RequestId responseTo");
+ sb.append("RequestId responseTo");
+ */
}
sb.append(")");
if (versionSet.size() != globalVersionSet.size())
@@ -1457,6 +1472,7 @@ public class CppGenerator extends Generator
String indent = Utils.createSpaces(indentSize);
String tab = Utils.createSpaces(tabSize);
StringBuffer sb = new StringBuffer();
+ boolean special = isSpecialCase(thisClass.name, method.name);
if (method.serverMethodFlagMap.size() > 0) // At least one AMQP version defines this method as a server method
{
@@ -1466,14 +1482,18 @@ public class CppGenerator extends Generator
if (bItr.next()) // This is a server operation
{
boolean fieldMapNotEmptyFlag = method.fieldMap.size() > 0;
- sb.append(indent + "void invoke(AMQP_ServerOperations& target, const MethodContext& context)" + cr);
+ sb.append(indent + "void invoke(AMQP_ServerOperations& target, const MethodContext&"
+ + (special ? " context)" : " )") + cr);
sb.append(indent + "{" + cr);
sb.append(indent + tab + "target.get" + thisClass.name + "Handler()->" +
parseForReservedWords(Utils.firstLower(method.name),
- thisClass.name + Utils.firstUpper(method.name) + "Body.invoke()") + "(context");
- if (fieldMapNotEmptyFlag)
+ thisClass.name + Utils.firstUpper(method.name) + "Body.invoke()") + "(");
+ if (special)
+ {
+ sb.append("context");
+ }
+ else if (fieldMapNotEmptyFlag)
{
- sb.append("," + cr);
sb.append(generateFieldList(method.fieldMap, version, false, false, indentSize + 4*tabSize));
sb.append(indent + tab + tab + tab + tab);
}
@@ -1632,4 +1652,9 @@ public class CppGenerator extends Generator
}
return ccn.toString();
}
+
+ private boolean isSpecialCase(String className, String methodName)
+ {
+ return "message".equalsIgnoreCase(className) && ("transfer".equalsIgnoreCase(methodName) || "append".equalsIgnoreCase(methodName));
+ }
}
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/Utils.java b/cpp/gentools/src/org/apache/qpid/gentools/Utils.java
index ade00cd8c7..e1c4661acf 100644
--- a/cpp/gentools/src/org/apache/qpid/gentools/Utils.java
+++ b/cpp/gentools/src/org/apache/qpid/gentools/Utils.java
@@ -70,6 +70,13 @@ public class Utils
{
return Integer.parseInt(getNamedAttribute(n, attrName));
}
+
+ public static int getNamedIntegerAttribute(Node node, String name, int defaultValue) throws AmqpParseException
+ {
+ NamedNodeMap attributes = node.getAttributes();
+ Attr a = attributes == null ? null : (Attr) attributes.getNamedItem(name);
+ return a == null ? defaultValue : Integer.parseInt(a.getNodeValue());
+ }
// Element functions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index e215272bec..0231a58217 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -151,6 +151,7 @@ libqpidcommon_la_SOURCES = \
qpid/framing/ProtocolVersionException.cpp \
qpid/framing/Requester.cpp \
qpid/framing/Responder.cpp \
+ qpid/framing/SequenceNumber.cpp \
qpid/framing/Correlator.cpp \
qpid/framing/Value.cpp \
qpid/framing/Proxy.cpp \
@@ -360,6 +361,7 @@ nobase_include_HEADERS = \
qpid/framing/Requester.h \
qpid/framing/Responder.h \
qpid/framing/SerializeHandler.h \
+ qpid/framing/SequenceNumber.h \
qpid/framing/Value.h \
qpid/framing/Uuid.h \
qpid/framing/amqp_framing.h \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 2a0aa9ffee..be43dacb27 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -53,36 +53,33 @@ ProtocolVersion BrokerAdapter::getVersion() const {
return connection.getVersion();
}
-void BrokerAdapter::ChannelHandlerImpl::open(
- const MethodContext& context, const string& /*outOfBand*/){
+void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- client.openOk(
- std::string()/* ID */, context.getRequestId());
+ client.openOk(std::string()/* ID */);//GRS, context.getRequestId());
}
-void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext& context, bool active){
+void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
channel.flow(active);
- client.flowOk(active, context.getRequestId());
+ client.flowOk(active);//GRS, context.getRequestId());
}
-void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){}
-void BrokerAdapter::ChannelHandlerImpl::close(
- const MethodContext& context, uint16_t /*replyCode*/,
+void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/,
const string& /*replyText*/,
uint16_t /*classId*/, uint16_t /*methodId*/)
{
- client.closeOk(context.getRequestId());
+ client.closeOk();//GRS context.getRequestId());
// FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
connection.closeChannel(channel.getId());
}
-void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
+void BrokerAdapter::ChannelHandlerImpl::closeOk(){}
-void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type,
+void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait,
const FieldTable& args){
@@ -107,31 +104,30 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u
}
}
if(!nowait){
- client.declareOk(context.getRequestId());
+ client.declareOk();//GRS context.getRequestId());
}
}
-void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/,
+void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/,
const string& name, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
if (exchange->isDurable()) broker.getStore().destroy(*exchange);
broker.getExchanges().destroy(name);
- if(!nowait) client.deleteOk(context.getRequestId());
+ if(!nowait) client.deleteOk();//GRS context.getRequestId());
}
-void BrokerAdapter::ExchangeHandlerImpl::query(const MethodContext& context, u_int16_t /*ticket*/, const string& name)
+void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
- client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs(), context.getRequestId());
+ client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());//GRS, context.getRequestId());
} catch (const ChannelException& e) {
- client.queryOk("", false, true, FieldTable(), context.getRequestId());
+ client.queryOk("", false, true, FieldTable());//GRS, context.getRequestId());
}
}
-void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& context,
- u_int16_t /*ticket*/,
+void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
const std::string& exchangeName,
const std::string& queueName,
const std::string& key,
@@ -148,22 +144,22 @@ void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& cont
}
if (!exchange) {
- client.queryOk(true, false, false, false, false, context.getRequestId());
+ client.queryOk(true, false, false, false, false);//GRS, context.getRequestId());
} else if (!queueName.empty() && !queue) {
- client.queryOk(false, true, false, false, false, context.getRequestId());
+ client.queryOk(false, true, false, false, false);//GRS, context.getRequestId());
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- client.queryOk(false, false, false, false, false, context.getRequestId());
+ client.queryOk(false, false, false, false, false);//GRS, context.getRequestId());
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched, context.getRequestId());
+ client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);//GRS, context.getRequestId());
}
}
-void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name,
+void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
@@ -200,12 +196,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint
if (!nowait) {
string queueName = queue->getName();
client.declareOk(
- queueName, queue->getMessageCount(), queue->getConsumerCount(),
- context.getRequestId());
+ queueName, queue->getMessageCount(), queue->getConsumerCount());//GRS, context.getRequestId());
}
}
-void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_t /*ticket*/, const string& queueName,
+void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
@@ -219,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_
broker.getStore().bind(*exchange, *queue, routingKey, arguments);
}
}
- if(!nowait) client.bindOk(context.getRequestId());
+ if(!nowait) client.bindOk();//GRS context.getRequestId());
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -227,9 +222,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_
}
void
-BrokerAdapter::QueueHandlerImpl::unbind(
- const MethodContext& context,
- uint16_t /*ticket*/,
+BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
const string& queueName,
const string& exchangeName,
const string& routingKey,
@@ -245,17 +238,17 @@ BrokerAdapter::QueueHandlerImpl::unbind(
broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
}
- client.unbindOk(context.getRequestId());
+ client.unbindOk();//GRS context.getRequestId());
}
-void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = getQueue(queueName);
int count = queue->purge();
- if(!nowait) client.purgeOk( count, context.getRequestId());
+ if(!nowait) client.purgeOk( count);//GRS, context.getRequestId());
}
-void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, const string& queue,
+void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue,
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
@@ -277,21 +270,20 @@ void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint
}
if(!nowait)
- client.deleteOk(count, context.getRequestId());
+ client.deleteOk(count);//GRS, context.getRequestId());
}
-void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.qosOk(context.getRequestId());
+ client.qosOk();//GRS context.getRequestId());
}
-void BrokerAdapter::BasicHandlerImpl::consume(
- const MethodContext& context, uint16_t /*ticket*/,
+void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
bool nowait, const FieldTable& fields)
@@ -308,29 +300,26 @@ void BrokerAdapter::BasicHandlerImpl::consume(
channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())),
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) client.consumeOk(newTag, context.getRequestId());
+ if(!nowait) client.consumeOk(newTag);//GRS, context.getRequestId());
//allow messages to be dispatched if required as there is now a consumer:
queue->requestDispatch();
}
-void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) client.cancelOk(consumerTag, context.getRequestId());
+ if(!nowait) client.cancelOk(consumerTag);//GRS, context.getRequestId());
}
-void BrokerAdapter::BasicHandlerImpl::publish(
- const MethodContext& context, uint16_t /*ticket*/,
+void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate)
{
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- BasicMessage* msg = new BasicMessage(
- &connection, exchangeName, routingKey, mandatory, immediate,
- context.methodBody);
+ BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
channel.handlePublish(msg);
}else{
throw ChannelException(
@@ -338,45 +327,47 @@ void BrokerAdapter::BasicHandlerImpl::publish(
}
}
-void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = getQueue(queueName);
GetAdapter out(adapter, queue, "", connection.getFrameMax());
if(!channel.get(out, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- client.getEmpty(clusterId, context.getRequestId());
+ client.getEmpty(clusterId);//GRS, context.getRequestId());
}
}
-void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, uint64_t deliveryTag, bool multiple){
+void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
channel.ack(deliveryTag, multiple);
}
-void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, uint64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
+{
channel.recover(requeue);
}
-void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::select()
+{
channel.startTx();
- client.selectOk(context.getRequestId());
+ client.selectOk();//GRS context.getRequestId());
}
-void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::commit()
+{
channel.commit();
- client.commitOk(context.getRequestId());
+ client.commitOk();//GRS context.getRequestId());
}
-void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
-
+void BrokerAdapter::TxHandlerImpl::rollback()
+{
channel.rollback();
- client.rollbackOk(context.getRequestId());
+ client.rollbackOk();//GRS context.getRequestId());
channel.recover(false);
}
-void
-BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
+void BrokerAdapter::ChannelHandlerImpl::ok()
{
//no specific action required, generic response handling should be sufficient
}
@@ -385,27 +376,36 @@ BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
//
// Message class method handlers
//
-void
-BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
+void BrokerAdapter::ChannelHandlerImpl::ping()
{
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
client.pong();
}
void
-BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
+BrokerAdapter::ChannelHandlerImpl::pong()
{
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
-void
-BrokerAdapter::ChannelHandlerImpl::resume(
- const MethodContext&,
- const string& /*channel*/ )
+void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/)
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
+void BrokerAdapter::setResponseTo(RequestId r)
+{
+ basicHandler.client.setResponseTo(r);
+ channelHandler.client.setResponseTo(r);
+ exchangeHandler.client.setResponseTo(r);
+ bindingHandler.client.setResponseTo(r);
+ messageHandler.client.setResponseTo(r);
+ queueHandler.client.setResponseTo(r);
+ txHandler.client.setResponseTo(r);
+ dtxHandler.setResponseTo(r);
+}
+
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 795744aa9a..41013e8bf6 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -85,6 +85,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
}
framing::AMQP_ClientProxy& getProxy() { return proxy; }
+ void setResponseTo(framing::RequestId r);
private:
@@ -95,16 +96,16 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public:
ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
- void open(const framing::MethodContext& context, const std::string& outOfBand);
- void flow(const framing::MethodContext& context, bool active);
- void flowOk(const framing::MethodContext& context, bool active);
- void ok( const framing::MethodContext& context );
- void ping( const framing::MethodContext& context );
- void pong( const framing::MethodContext& context );
- void resume( const framing::MethodContext& context, const std::string& channelId );
- void close(const framing::MethodContext& context, uint16_t replyCode, const
+ void open(const std::string& outOfBand);
+ void flow(bool active);
+ void flowOk(bool active);
+ void ok( );
+ void ping( );
+ void pong( );
+ void resume( const std::string& channelId );
+ void close(uint16_t replyCode, const
std::string& replyText, uint16_t classId, uint16_t methodId);
- void closeOk(const framing::MethodContext& context);
+ void closeOk();
};
class ExchangeHandlerImpl :
@@ -114,16 +115,14 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public:
ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
- void declare(const framing::MethodContext& context, uint16_t ticket,
+ void declare(uint16_t ticket,
const std::string& exchange, const std::string& type,
bool passive, bool durable, bool autoDelete,
bool internal, bool nowait,
const qpid::framing::FieldTable& arguments);
- void delete_(const framing::MethodContext& context, uint16_t ticket,
+ void delete_(uint16_t ticket,
const std::string& exchange, bool ifUnused, bool nowait);
- void query(const framing::MethodContext& context,
- u_int16_t ticket,
- const string& name);
+ void query(u_int16_t ticket, const string& name);
};
class BindingHandlerImpl :
@@ -133,8 +132,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public:
BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
- void query(const framing::MethodContext& context,
- u_int16_t ticket,
+ void query(u_int16_t ticket,
const std::string& exchange,
const std::string& queue,
const std::string& routingKey,
@@ -148,22 +146,21 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public:
QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
- void declare(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ void declare(uint16_t ticket, const std::string& queue,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait,
const qpid::framing::FieldTable& arguments);
- void bind(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ void bind(uint16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
bool nowait, const qpid::framing::FieldTable& arguments);
- void unbind(const framing::MethodContext& context,
- uint16_t ticket,
+ void unbind(uint16_t ticket,
const std::string& queue,
const std::string& exchange,
const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
- void purge(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ void purge(uint16_t ticket, const std::string& queue,
bool nowait);
- void delete_(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ void delete_(uint16_t ticket, const std::string& queue,
bool ifUnused, bool ifEmpty,
bool nowait);
};
@@ -177,23 +174,23 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public:
BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent), tagGenerator("sgen") {}
- void qos(const framing::MethodContext& context, uint32_t prefetchSize,
+ void qos(uint32_t prefetchSize,
uint16_t prefetchCount, bool global);
void consume(
- const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ uint16_t ticket, const std::string& queue,
const std::string& consumerTag, bool noLocal, bool noAck,
bool exclusive, bool nowait,
const qpid::framing::FieldTable& fields);
- void cancel(const framing::MethodContext& context, const std::string& consumerTag,
+ void cancel(const std::string& consumerTag,
bool nowait);
- void publish(const framing::MethodContext& context, uint16_t ticket,
+ void publish(uint16_t ticket,
const std::string& exchange, const std::string& routingKey,
bool mandatory, bool immediate);
- void get(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ void get(uint16_t ticket, const std::string& queue,
bool noAck);
- void ack(const framing::MethodContext& context, uint64_t deliveryTag, bool multiple);
- void reject(const framing::MethodContext& context, uint64_t deliveryTag, bool requeue);
- void recover(const framing::MethodContext& context, bool requeue);
+ void ack(uint64_t deliveryTag, bool multiple);
+ void reject(uint64_t deliveryTag, bool requeue);
+ void recover(bool requeue);
};
class TxHandlerImpl :
@@ -203,9 +200,9 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public:
TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
- void select(const framing::MethodContext& context);
- void commit(const framing::MethodContext& context);
- void rollback(const framing::MethodContext& context);
+ void select();
+ void commit();
+ void rollback();
};
Connection& connection;
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
index 83b6f2bf18..21b56869d4 100644
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -29,6 +29,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/BasicDeliverBody.h"
#include "qpid/framing/BasicGetOkBody.h"
+#include "qpid/framing/BasicPublishBody.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
@@ -44,10 +45,10 @@ using namespace qpid::sys;
BasicMessage::BasicMessage(
const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo
+ bool _mandatory, bool _immediate
) :
Message(_publisher, _exchange, _routingKey, _mandatory,
- _immediate, respondTo),
+ _immediate, framing::AMQMethodBody::shared_ptr(new BasicPublishBody(ProtocolVersion(0,9)))),
size(0)
{}
@@ -84,19 +85,20 @@ void BasicMessage::deliver(ChannelAdapter& channel,
sendContent(channel, framesize);
}
-void BasicMessage::sendGetOk(const MethodContext& context,
- const std::string& /*destination*/,
+void BasicMessage::sendGetOk(ChannelAdapter& channel,
+ const std::string& /*destination*/,
uint32_t messageCount,
+ uint64_t responseTo,
uint64_t deliveryTag,
uint32_t framesize)
{
- context.channel->send(
+ channel.send(
new BasicGetOkBody(
- context.channel->getVersion(),
- context.methodBody->getRequestId(),
+ channel.getVersion(),
+ responseTo,
deliveryTag, getRedelivered(), getExchange(),
getRoutingKey(), messageCount));
- sendContent(*context.channel, framesize);
+ sendContent(channel, framesize);
}
void BasicMessage::sendContent(
diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h
index aee5092dab..2e031d0bb2 100644
--- a/cpp/src/qpid/broker/BrokerMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessage.h
@@ -63,8 +63,7 @@ class BasicMessage : public Message {
BasicMessage(const ConnectionToken* const publisher,
const string& exchange, const string& routingKey,
- bool mandatory, bool immediate,
- boost::shared_ptr<framing::AMQMethodBody> respondTo);
+ bool mandatory, bool immediate);
BasicMessage();
~BasicMessage();
void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header);
@@ -76,9 +75,10 @@ class BasicMessage : public Message {
uint64_t deliveryTag,
uint32_t framesize);
- void sendGetOk(const framing::MethodContext&,
- const std::string& destination,
+ void sendGetOk(framing::ChannelAdapter& channel,
+ const std::string& destination,
uint32_t messageCount,
+ uint64_t responseTo,
uint64_t deliveryTag,
uint32_t framesize);
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
index 048b1c80e2..a6b039cd4d 100644
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -85,7 +85,6 @@ class Message : public PersistableMessage{
const std::string& getExchange() const { return exchange; }
uint64_t getPersistenceId() const { return persistenceId; }
bool getRedelivered() const { return redelivered; }
- AMQMethodBodyPtr getRespondTo() const { return respondTo; }
void setRouting(const std::string& _exchange, const std::string& _routingKey)
{ exchange = _exchange; routingKey = _routingKey; }
@@ -102,9 +101,10 @@ class Message : public PersistableMessage{
/**
* Used to return a message in response to a get from a queue
*/
- virtual void sendGetOk(const framing::MethodContext& context,
+ virtual void sendGetOk(framing::ChannelAdapter& channel,
const std::string& destination,
uint32_t messageCount,
+ uint64_t responseTo,
uint64_t deliveryTag,
uint32_t framesize) = 0;
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index 17ccdc03b8..b23ebaf50b 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -185,14 +185,14 @@ void MessageMessage::deliver(
}
void MessageMessage::sendGetOk(
- const framing::MethodContext& context,
+ framing::ChannelAdapter& channel,
const std::string& destination,
uint32_t /*messageCount*/,
+ uint64_t /*responseTo*/,
uint64_t /*deliveryTag*/,
uint32_t framesize)
{
- framing::ChannelAdapter* channel = context.channel;
- transferMessage(*channel, destination, framesize);
+ transferMessage(channel, destination, framesize);
}
bool MessageMessage::isComplete()
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h
index e64e08fcef..c2d4b7f20b 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.h
@@ -58,9 +58,10 @@ class MessageMessage: public Message{
uint64_t deliveryTag,
uint32_t framesize);
- void sendGetOk(const framing::MethodContext& context,
- const std::string& destination,
+ void sendGetOk(framing::ChannelAdapter& channel,
+ const std::string& destination,
uint32_t messageCount,
+ uint64_t responseTo,
uint64_t deliveryTag,
uint32_t framesize);
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp
index 8a4450c881..bb2a66bfdb 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.cpp
+++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -50,10 +50,14 @@ void ConnectionAdapter::handleMethodInContext(
)
{
try{
+ handler->client.setResponseTo(context.getRequestId());
method->invoke(*this, context);
+ handler->client.setResponseTo(0);
}catch(ConnectionException& e){
+ handler->client.setResponseTo(0);
handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
+ handler->client.setResponseTo(0);
handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
}
@@ -82,43 +86,38 @@ Handler::Handler(Connection& c, ConnectionAdapter& a) :
proxy(a), client(proxy.getConnection()), connection(c) {}
-void Handler::startOk(
- const MethodContext&, const FieldTable& /*clientProperties*/,
+void Handler::startOk(const FieldTable& /*clientProperties*/,
const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/)
{
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
}
-void Handler::secureOk(
- const MethodContext&, const string& /*response*/){}
+void Handler::secureOk(const string& /*response*/){}
-void Handler::tuneOk(
- const MethodContext&, uint16_t /*channelmax*/,
+void Handler::tuneOk(uint16_t /*channelmax*/,
uint32_t framemax, uint16_t heartbeat)
{
connection.setFrameMax(framemax);
connection.setHeartbeat(heartbeat);
}
-void Handler::open(
- const MethodContext& context, const string& /*virtualHost*/,
+void Handler::open(const string& /*virtualHost*/,
const string& /*capabilities*/, bool /*insist*/)
{
string knownhosts;
client.openOk(
- knownhosts, context.getRequestId());
+ knownhosts);//GRS, context.getRequestId());
}
-void Handler::close(
- const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/,
+void Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
uint16_t /*classId*/, uint16_t /*methodId*/)
{
- client.closeOk(context.getRequestId());
+ client.closeOk();//GRS context.getRequestId());
connection.getOutput().close();
}
-void Handler::closeOk(const MethodContext&){
+void Handler::closeOk(){
connection.getOutput().close();
}
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index 383fbf84c0..2e27abd333 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -81,22 +81,16 @@ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
Connection& connection;
Handler(Connection& connection, ConnectionAdapter& adapter);
- void startOk(const framing::MethodContext& context,
- const qpid::framing::FieldTable& clientProperties,
+ void startOk(const qpid::framing::FieldTable& clientProperties,
const std::string& mechanism, const std::string& response,
const std::string& locale);
- void secureOk(const framing::MethodContext& context,
- const std::string& response);
- void tuneOk(const framing::MethodContext& context,
- uint16_t channelMax,
- uint32_t frameMax, uint16_t heartbeat);
- void open(const framing::MethodContext& context,
- const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(const framing::MethodContext& context, uint16_t replyCode,
- const std::string& replyText,
+ void secureOk(const std::string& response);
+ void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
+ void open(const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(uint16_t replyCode, const std::string& replyText,
uint16_t classId, uint16_t methodId);
- void closeOk(const framing::MethodContext& context);
+ void closeOk();
};
}}
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index aeb1c3014c..2aa53e66bd 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -49,14 +49,13 @@ const int XA_OK(8);
// DtxDemarcationHandler:
-void DtxHandlerImpl::select(const MethodContext& context )
+void DtxHandlerImpl::select()
{
channel.selectDtx();
- dClient.selectOk(context.getRequestId());
+ dClient.selectOk();//GRS context.getRequestId());
}
-void DtxHandlerImpl::end(const MethodContext& context,
- u_int16_t /*ticket*/,
+void DtxHandlerImpl::end(u_int16_t /*ticket*/,
const string& xid,
bool fail,
bool suspend)
@@ -67,7 +66,7 @@ void DtxHandlerImpl::end(const MethodContext& context,
if (suspend) {
throw ConnectionException(503, "End and suspend cannot both be set.");
} else {
- dClient.endOk(XA_RBROLLBACK, context.getRequestId());
+ dClient.endOk(XA_RBROLLBACK);//GRS, context.getRequestId());
}
} else {
if (suspend) {
@@ -75,15 +74,14 @@ void DtxHandlerImpl::end(const MethodContext& context,
} else {
channel.endDtx(xid, false);
}
- dClient.endOk(XA_OK, context.getRequestId());
+ dClient.endOk(XA_OK);//GRS, context.getRequestId());
}
} catch (const DtxTimeoutException& e) {
- dClient.endOk(XA_RBTIMEOUT, context.getRequestId());
+ dClient.endOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
}
}
-void DtxHandlerImpl::start(const MethodContext& context,
- u_int16_t /*ticket*/,
+void DtxHandlerImpl::start(u_int16_t /*ticket*/,
const string& xid,
bool join,
bool resume)
@@ -97,54 +95,50 @@ void DtxHandlerImpl::start(const MethodContext& context,
} else {
channel.startDtx(xid, broker.getDtxManager(), join);
}
- dClient.startOk(XA_OK, context.getRequestId());
+ dClient.startOk(XA_OK);//GRS, context.getRequestId());
} catch (const DtxTimeoutException& e) {
- dClient.startOk(XA_RBTIMEOUT, context.getRequestId());
+ dClient.startOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
}
}
// DtxCoordinationHandler:
-void DtxHandlerImpl::prepare(const MethodContext& context,
- u_int16_t /*ticket*/,
+void DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
const string& xid)
{
try {
bool ok = broker.getDtxManager().prepare(xid);
- cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+ cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId());
} catch (const DtxTimeoutException& e) {
- cClient.prepareOk(XA_RBTIMEOUT, context.getRequestId());
+ cClient.prepareOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
}
}
-void DtxHandlerImpl::commit(const MethodContext& context,
- u_int16_t /*ticket*/,
+void DtxHandlerImpl::commit(u_int16_t /*ticket*/,
const string& xid,
bool onePhase)
{
try {
bool ok = broker.getDtxManager().commit(xid, onePhase);
- cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+ cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId());
} catch (const DtxTimeoutException& e) {
- cClient.commitOk(XA_RBTIMEOUT, context.getRequestId());
+ cClient.commitOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
}
}
-void DtxHandlerImpl::rollback(const MethodContext& context,
- u_int16_t /*ticket*/,
+void DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
const string& xid )
{
try {
broker.getDtxManager().rollback(xid);
- cClient.rollbackOk(XA_OK, context.getRequestId());
+ cClient.rollbackOk(XA_OK);//GRS, context.getRequestId());
} catch (const DtxTimeoutException& e) {
- cClient.rollbackOk(XA_RBTIMEOUT, context.getRequestId());
+ cClient.rollbackOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
}
}
-void DtxHandlerImpl::recover(const MethodContext& context,
- u_int16_t /*ticket*/,
+void DtxHandlerImpl::recover(u_int16_t /*ticket*/,
bool /*startscan*/,
u_int32_t /*endscan*/ )
{
@@ -177,33 +171,34 @@ void DtxHandlerImpl::recover(const MethodContext& context,
FieldTable response;
response.setString("xids", data);
- cClient.recoverOk(response, context.getRequestId());
+ cClient.recoverOk(response);//GRS, context.getRequestId());
}
-void DtxHandlerImpl::forget(const MethodContext& /*context*/,
- u_int16_t /*ticket*/,
+void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
const string& xid)
{
//Currently no heuristic completion is supported, so this should never be used.
throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
}
-void DtxHandlerImpl::getTimeout(const MethodContext& context,
- const string& xid)
+void DtxHandlerImpl::getTimeout(const string& xid)
{
uint32_t timeout = broker.getDtxManager().getTimeout(xid);
- cClient.getTimeoutOk(timeout, context.getRequestId());
+ cClient.getTimeoutOk(timeout);//GRS, context.getRequestId());
}
-void DtxHandlerImpl::setTimeout(const MethodContext& context,
- u_int16_t /*ticket*/,
+void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/,
const string& xid,
u_int32_t timeout)
{
broker.getDtxManager().setTimeout(xid, timeout);
- cClient.setTimeoutOk(context.getRequestId());
+ cClient.setTimeoutOk();//GRS context.getRequestId());
}
-
+void DtxHandlerImpl::setResponseTo(framing::RequestId r)
+{
+ dClient.setResponseTo(r);
+ cClient.setResponseTo(r);
+}
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h
index eda9e83a91..e18d3c153d 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.h
@@ -36,53 +36,31 @@ class DtxHandlerImpl
public:
DtxHandlerImpl(CoreRefs& parent);
+ void setResponseTo(framing::RequestId r);
+
// DtxCoordinationHandler:
- void commit(const framing::MethodContext& context,
- u_int16_t ticket,
- const std::string& xid,
- bool onePhase );
+ void commit(u_int16_t ticket, const std::string& xid, bool onePhase);
- void forget(const framing::MethodContext& context,
- u_int16_t ticket,
- const std::string& xid );
+ void forget(u_int16_t ticket, const std::string& xid);
- void getTimeout(const framing::MethodContext& context,
- const std::string& xid );
+ void getTimeout(const std::string& xid);
- void prepare(const framing::MethodContext& context,
- u_int16_t ticket,
- const std::string& xid );
+ void prepare(u_int16_t ticket, const std::string& xid);
- void recover(const framing::MethodContext& context,
- u_int16_t ticket,
- bool startscan,
- u_int32_t endscan );
+ void recover(u_int16_t ticket, bool startscan, u_int32_t endscan);
- void rollback(const framing::MethodContext& context,
- u_int16_t ticket,
- const std::string& xid );
+ void rollback(u_int16_t ticket, const std::string& xid);
- void setTimeout(const framing::MethodContext& context,
- u_int16_t ticket,
- const std::string& xid,
- u_int32_t timeout );
+ void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout);
// DtxDemarcationHandler:
- void end(const framing::MethodContext& context,
- u_int16_t ticket,
- const std::string& xid,
- bool fail,
- bool suspend );
+ void end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
- void select(const framing::MethodContext& context );
+ void select();
- void start(const framing::MethodContext& context,
- u_int16_t ticket,
- const std::string& xid,
- bool join,
- bool resume );
+ void start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
};
diff --git a/cpp/src/qpid/broker/GetAdapter.cpp b/cpp/src/qpid/broker/GetAdapter.cpp
index 4a2f6d34d4..bbffade712 100644
--- a/cpp/src/qpid/broker/GetAdapter.cpp
+++ b/cpp/src/qpid/broker/GetAdapter.cpp
@@ -36,6 +36,5 @@ RequestId GetAdapter::getNextDeliveryTag()
void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag)
{
- msg->sendGetOk(MethodContext(&adapter, msg->getRespondTo()), destination,
- queue->getMessageCount(), deliveryTag, framesize);
+ msg->sendGetOk(adapter, destination, queue->getMessageCount(), 1, deliveryTag, framesize);
}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 252b465cc5..c9fbc2b95d 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -42,72 +42,64 @@ MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
//
void
-MessageHandlerImpl::cancel(const MethodContext& context,
- const string& destination )
+MessageHandlerImpl::cancel(const string& destination )
{
channel.cancel(destination);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::open(const MethodContext& context,
- const string& reference)
+MessageHandlerImpl::open(const string& reference)
{
references.open(reference);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::append(const MethodContext& context,
- const string& reference,
- const string& /*bytes*/ )
+MessageHandlerImpl::append(const framing::MethodContext& context)
{
- references.get(reference)->append(
- boost::shared_polymorphic_downcast<MessageAppendBody>(
- context.methodBody));
- client.ok(context.getRequestId());
+ MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody));
+ references.get(body->getReference())->append(body);
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::close(const MethodContext& context,
- const string& reference)
+MessageHandlerImpl::close(const string& reference)
{
Reference::shared_ptr ref = references.get(reference);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
// Send any transfer messages to their correct exchanges and okay them
const Reference::Messages& msgs = ref->getMessages();
for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) {
channel.handleInlineTransfer(*m);
- client.ok((*m)->getRequestId());
+ client.setResponseTo((*m)->getRequestId());
+ client.ok();
}
ref->close();
}
void
-MessageHandlerImpl::checkpoint(const MethodContext& context,
- const string& /*reference*/,
+MessageHandlerImpl::checkpoint(const string& /*reference*/,
const string& /*identifier*/ )
{
// Initial implementation (which is conforming) is to do nothing here
// and return offset zero for the resume
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::resume(const MethodContext& context,
- const string& reference,
+MessageHandlerImpl::resume(const string& reference,
const string& /*identifier*/ )
{
// Initial (null) implementation
// open reference and return 0 offset
references.open(reference);
- client.offset(0, context.getRequestId());
+ client.offset(0);//GRS, );//GRS, context.getRequestId());
}
void
-MessageHandlerImpl::offset(const MethodContext&,
- uint64_t /*value*/ )
+MessageHandlerImpl::offset(uint64_t /*value*/ )
{
// Shouldn't ever receive this as it is reponse to resume
// which is never sent
@@ -116,8 +108,7 @@ MessageHandlerImpl::offset(const MethodContext&,
}
void
-MessageHandlerImpl::consume(const MethodContext& context,
- uint16_t /*ticket*/,
+MessageHandlerImpl::consume(uint16_t /*ticket*/,
const string& queueName,
const string& destination,
bool noLocal,
@@ -132,14 +123,13 @@ MessageHandlerImpl::consume(const MethodContext& context,
channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
void
-MessageHandlerImpl::get( const MethodContext& context,
- uint16_t /*ticket*/,
+MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& queueName,
const string& destination,
bool noAck )
@@ -148,13 +138,13 @@ MessageHandlerImpl::get( const MethodContext& context,
GetAdapter out(adapter, queue, destination, connection.getFrameMax());
if(channel.get(out, queue, !noAck))
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
else
- client.empty(context.getRequestId());
+ client.empty();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::empty( const MethodContext& )
+MessageHandlerImpl::empty()
{
// Shouldn't ever receive this as it is a response to get
// which is never sent
@@ -163,34 +153,31 @@ MessageHandlerImpl::empty( const MethodContext& )
}
void
-MessageHandlerImpl::ok(const MethodContext& /*context*/)
+MessageHandlerImpl::ok()
{
channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest());
}
void
-MessageHandlerImpl::qos(const MethodContext& context,
- uint32_t prefetchSize,
+MessageHandlerImpl::qos(uint32_t prefetchSize,
uint16_t prefetchCount,
bool /*global*/ )
{
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::recover(const MethodContext& context,
- bool requeue)
+MessageHandlerImpl::recover(bool requeue)
{
channel.recover(requeue);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::reject(const MethodContext& /*context*/,
- uint16_t /*code*/,
+MessageHandlerImpl::reject(uint16_t /*code*/,
const string& /*text*/ )
{
//channel.ack();
@@ -198,45 +185,20 @@ MessageHandlerImpl::reject(const MethodContext& /*context*/,
}
void
-MessageHandlerImpl::transfer(const MethodContext& context,
- uint16_t /*ticket*/,
- const string& /* destination */,
- bool /*redelivered*/,
- bool /*immediate*/,
- uint64_t /*ttl*/,
- uint8_t /*priority*/,
- uint64_t /*timestamp*/,
- uint8_t /*deliveryMode*/,
- uint64_t /*expiration*/,
- const string& /*exchangeName*/,
- const string& /*routingKey*/,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const framing::FieldTable& /*applicationHeaders*/,
- const framing::Content& body,
- bool /*mandatory*/)
+MessageHandlerImpl::transfer(const framing::MethodContext& context)
{
MessageTransferBody::shared_ptr transfer(
boost::shared_polymorphic_downcast<MessageTransferBody>(
context.methodBody));
RequestId requestId = context.getRequestId();
- if (body.isInline()) {
- MessageMessage::shared_ptr message(
- new MessageMessage(&connection, requestId, transfer));
+ if (transfer->getBody().isInline()) {
+ MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer));
channel.handleInlineTransfer(message);
- client.ok(requestId);
+ client.ok();
} else {
- Reference::shared_ptr ref(references.get(body.getValue()));
- MessageMessage::shared_ptr message(
- new MessageMessage(&connection, requestId, transfer, ref));
+ Reference::shared_ptr ref(references.get(transfer->getBody().getValue()));
+ MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref));
ref->addMessage(message);
}
}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h
index 47c8cf0ce0..86c54023b5 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.h
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.h
@@ -40,22 +40,16 @@ class MessageHandlerImpl :
public:
MessageHandlerImpl(CoreRefs& parent);
- void append(const framing::MethodContext&,
- const std::string& reference,
- const std::string& bytes );
+ void append(const framing::MethodContext& context);
- void cancel(const framing::MethodContext&,
- const std::string& destination );
+ void cancel(const std::string& destination );
- void checkpoint(const framing::MethodContext&,
- const std::string& reference,
+ void checkpoint(const std::string& reference,
const std::string& identifier );
- void close(const framing::MethodContext&,
- const std::string& reference );
+ void close(const std::string& reference );
- void consume(const framing::MethodContext&,
- uint16_t ticket,
+ void consume(uint16_t ticket,
const std::string& queue,
const std::string& destination,
bool noLocal,
@@ -63,62 +57,32 @@ class MessageHandlerImpl :
bool exclusive,
const framing::FieldTable& filter );
- void empty( const framing::MethodContext& );
+ void empty();
- void get(const framing::MethodContext&,
- uint16_t ticket,
+ void get(uint16_t ticket,
const std::string& queue,
const std::string& destination,
bool noAck );
- void offset(const framing::MethodContext&,
- uint64_t value );
+ void offset(uint64_t value);
- void ok( const framing::MethodContext& );
+ void ok();
- void open(const framing::MethodContext&,
- const std::string& reference );
+ void open(const std::string& reference );
- void qos(const framing::MethodContext&,
- uint32_t prefetchSize,
+ void qos(uint32_t prefetchSize,
uint16_t prefetchCount,
bool global );
- void recover(const framing::MethodContext&,
- bool requeue );
+ void recover(bool requeue );
- void reject(const framing::MethodContext&,
- uint16_t code,
+ void reject(uint16_t code,
const std::string& text );
- void resume(const framing::MethodContext&,
- const std::string& reference,
+ void resume(const std::string& reference,
const std::string& identifier );
- void transfer(const framing::MethodContext&,
- uint16_t ticket,
- const std::string& destination,
- bool redelivered,
- bool immediate,
- uint64_t ttl,
- uint8_t priority,
- uint64_t timestamp,
- uint8_t deliveryMode,
- uint64_t expiration,
- const std::string& exchange,
- const std::string& routingKey,
- const std::string& messageId,
- const std::string& correlationId,
- const std::string& replyTo,
- const std::string& contentType,
- const std::string& contentEncoding,
- const std::string& userId,
- const std::string& appId,
- const std::string& transactionId,
- const std::string& securityToken,
- const framing::FieldTable& applicationHeaders,
- const framing::Content& body,
- bool mandatory );
+ void transfer(const framing::MethodContext& context);
private:
ReferenceRegistry references;
};
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index df92f74b14..a96a8c5cde 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -37,7 +37,29 @@ SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
void SemanticHandler::handle(framing::AMQFrame& frame)
{
+ //TODO: assembly etc when move to 0-10 framing
+ //
+ //have potentially three separate tracks at this point:
+ //
+ // (1) execution controls
+ // (2) commands
+ // (3) data i.e. content-bearing commands
+ //
+ //framesets on each can be interleaved. framesets on the latter
+ //two share a command-id sequence.
+ //
+ //need to decide what to do if a frame on the command track
+ //arrives while a frameset on the data track is still
+ //open. execute it (i.e. out-of order execution with respect to
+ //the command id sequence) or queue it up.
+
+ //if ready to execute (i.e. if segment is complete or frame is
+ //message content):
handleBody(frame.getBody());
+ //if the frameset is complete, we can move the execution-mark
+ //forward (not for execution controls)
+ //note: need to be more sophisticated than this if we execute
+ //commands that arrive within an active message frameset
}
//ChannelAdapter virtual methods:
@@ -52,9 +74,12 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ
throw ConnectionException(504, out.str());
}
} else {
+ adapter->setResponseTo(context.getRequestId());
method->invoke(*adapter, context);
+ adapter->setResponseTo(0);
}
}catch(ChannelException& e){
+ adapter->setResponseTo(0);
adapter->getProxy().getChannel().close(
e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index a179969ece..eecf61466b 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -26,6 +26,7 @@
#include "Connection.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SequenceNumber.h"
namespace qpid {
namespace broker {
@@ -37,6 +38,7 @@ class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHa
Connection& connection;
Channel channel;
std::auto_ptr<BrokerAdapter> adapter;
+ framing::SequenceNumber executionMark;
//ChannelAdapter virtual methods:
void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
diff --git a/cpp/src/qpid/framing/FramingContent.h b/cpp/src/qpid/framing/FramingContent.h
index 876e90c905..c813408cf3 100644
--- a/cpp/src/qpid/framing/FramingContent.h
+++ b/cpp/src/qpid/framing/FramingContent.h
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
#ifndef _framing_FramingContent_h
#define _framing_FramingContent_h
diff --git a/cpp/src/qpid/framing/MethodContext.cpp b/cpp/src/qpid/framing/MethodContext.cpp
index 73af73f8e5..9dc42dcfd7 100644
--- a/cpp/src/qpid/framing/MethodContext.cpp
+++ b/cpp/src/qpid/framing/MethodContext.cpp
@@ -24,8 +24,11 @@ namespace qpid {
namespace framing {
RequestId MethodContext::getRequestId() const {
- return boost::shared_polymorphic_downcast<AMQRequestBody>(methodBody)
- ->getRequestId();
+ if (methodBody->type() == REQUEST_BODY) {
+ return boost::shared_polymorphic_cast<AMQRequestBody>(methodBody)->getRequestId();
+ } else {
+ return 0;
+ }
}
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/Proxy.h b/cpp/src/qpid/framing/Proxy.h
index 8ed46ed748..4af496387b 100644
--- a/cpp/src/qpid/framing/Proxy.h
+++ b/cpp/src/qpid/framing/Proxy.h
@@ -20,6 +20,7 @@
*/
#include "ProtocolVersion.h"
+#include "amqp_types.h"
namespace qpid {
namespace framing {
diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp
new file mode 100644
index 0000000000..9bba67d4ae
--- /dev/null
+++ b/cpp/src/qpid/framing/SequenceNumber.cpp
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SequenceNumber.h"
+
+using qpid::framing::SequenceNumber;
+
+SequenceNumber::SequenceNumber() : value(0) {}
+
+SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {}
+
+bool SequenceNumber::operator==(const SequenceNumber& other) const
+{
+ return value == other.value;
+}
+
+bool SequenceNumber::operator!=(const SequenceNumber& other) const
+{
+ return !(value == other.value);
+}
+
+
+SequenceNumber& SequenceNumber::operator++()
+{
+ value = value + 1;
+ return *this;
+}
+
+const SequenceNumber SequenceNumber::operator++(int)
+{
+ SequenceNumber old(value);
+ value = value + 1;
+ return old;
+}
+
+bool SequenceNumber::operator<(const SequenceNumber& other) const
+{
+ return (value - other.value) < 0;
+}
+
+bool SequenceNumber::operator>(const SequenceNumber& other) const
+{
+ return other < *this;
+}
diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h
new file mode 100644
index 0000000000..6d38084a25
--- /dev/null
+++ b/cpp/src/qpid/framing/SequenceNumber.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _framing_SequenceNumber_h
+#define _framing_SequenceNumber_h
+
+#include "amqp_types.h"
+
+namespace qpid {
+namespace framing {
+
+/**
+ * 4-byte sequence number that 'wraps around'.
+ */
+class SequenceNumber
+{
+ int32_t value;
+ public:
+ SequenceNumber();
+ SequenceNumber(uint32_t v);
+
+ SequenceNumber& operator++();//prefix ++
+ const SequenceNumber operator++(int);//postfix ++
+ bool operator==(const SequenceNumber& other) const;
+ bool operator!=(const SequenceNumber& other) const;
+ bool operator<(const SequenceNumber& other) const;
+ bool operator>(const SequenceNumber& other) const;
+ uint32_t getValue() const { return (uint32_t) value; }
+};
+
+}} // namespace qpid::framing
+
+
+#endif
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index acbe867cfa..05bdb7b3f0 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -225,9 +225,7 @@ class BrokerChannelTest : public CppUnit::TestCase
Channel channel(connection, 1, &store);
const string data[] = {"abcde", "fghij", "klmno"};
- Message* msg = new BasicMessage(
- 0, "my_exchange", "my_routing_key", false, false,
- MockChannel::basicGetBody());
+ Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
store.expect();
store.stage(*msg);
@@ -347,8 +345,7 @@ class BrokerChannelTest : public CppUnit::TestCase
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
{
BasicMessage* msg = new BasicMessage(
- 0, exchange, routingKey, false, false,
- MockChannel::basicGetBody());
+ 0, exchange, routingKey, false, false);
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(contentSize);
msg->setHeader(header);
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 595b025e86..ef2646519d 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -63,11 +63,7 @@ class ExchangeTest : public CppUnit::TestCase
queue.reset();
queue2.reset();
- Message::shared_ptr msgPtr(
- new BasicMessage(
- 0, "e", "A", true, true,
- AMQMethodBody::shared_ptr(
- new BasicGetBody(ProtocolVersion()))));
+ Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true));
DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 69d1d8def1..255f6348b9 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -84,7 +84,8 @@ broker_unit_tests = \
framing_unit_tests = \
FieldTableTest \
FramingTest \
- HeaderTest
+ HeaderTest \
+ SequenceNumberTest
misc_unit_tests = \
ProducerConsumerTest
diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp
index 526db85c31..3d2ee1aaea 100644
--- a/cpp/src/tests/MessageBuilderTest.cpp
+++ b/cpp/src/tests/MessageBuilderTest.cpp
@@ -118,8 +118,7 @@ class MessageBuilderTest : public CppUnit::TestCase
Message::shared_ptr message(
new BasicMessage(
- 0, "test", "my_routing_key", false, false,
- MockChannel::basicGetBody()));
+ 0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(0);
@@ -137,8 +136,7 @@ class MessageBuilderTest : public CppUnit::TestCase
string data1("abcdefg");
Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false,
- MockChannel::basicGetBody()));
+ new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(7);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -160,8 +158,7 @@ class MessageBuilderTest : public CppUnit::TestCase
string data2("hijklmn");
Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false,
- MockChannel::basicGetBody()));
+ new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -191,8 +188,7 @@ class MessageBuilderTest : public CppUnit::TestCase
string data2("hijklmn");
Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false,
- MockChannel::basicGetBody()));
+ new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties());
diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp
index d3869b552e..1e976312be 100644
--- a/cpp/src/tests/MessageTest.cpp
+++ b/cpp/src/tests/MessageTest.cpp
@@ -46,8 +46,7 @@ class MessageTest : public CppUnit::TestCase
string data2("hijklmn");
BasicMessage::shared_ptr msg(
- new BasicMessage(0, exchange, routingKey, false, false,
- MockChannel::basicGetBody()));
+ new BasicMessage(0, exchange, routingKey, false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index ddf2314f8d..eb9d567b82 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -69,8 +69,7 @@ class QueueTest : public CppUnit::TestCase
public:
Message::shared_ptr message(std::string exchange, std::string routingKey) {
return Message::shared_ptr(
- new BasicMessage(0, exchange, routingKey, true, true,
- MockChannel::basicGetBody()));
+ new BasicMessage(0, exchange, routingKey, true, true));
}
void testConsumers(){
diff --git a/cpp/src/tests/SequenceNumberTest.cpp b/cpp/src/tests/SequenceNumberTest.cpp
new file mode 100644
index 0000000000..f42ccfc061
--- /dev/null
+++ b/cpp/src/tests/SequenceNumberTest.cpp
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid_test_plugin.h"
+#include <iostream>
+#include "qpid/framing/SequenceNumber.h"
+
+using namespace qpid::framing;
+
+class SequenceNumberTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(SequenceNumberTest);
+ CPPUNIT_TEST(testIncrementPostfix);
+ CPPUNIT_TEST(testIncrementPrefix);
+ CPPUNIT_TEST(testWrapAround);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void testIncrementPostfix()
+ {
+ SequenceNumber a;
+ SequenceNumber b;
+ CPPUNIT_ASSERT(!(a > b));
+ CPPUNIT_ASSERT(!(b < a));
+ CPPUNIT_ASSERT(a == b);
+
+ SequenceNumber c = a++;
+ CPPUNIT_ASSERT(a > b);
+ CPPUNIT_ASSERT(b < a);
+ CPPUNIT_ASSERT(a != b);
+ CPPUNIT_ASSERT(c < a);
+ CPPUNIT_ASSERT(a != c);
+
+ b++;
+ CPPUNIT_ASSERT(!(a > b));
+ CPPUNIT_ASSERT(!(b < a));
+ CPPUNIT_ASSERT(a == b);
+ CPPUNIT_ASSERT(c < b);
+ CPPUNIT_ASSERT(b != c);
+ }
+
+ void testIncrementPrefix()
+ {
+ SequenceNumber a;
+ SequenceNumber b;
+ CPPUNIT_ASSERT(!(a > b));
+ CPPUNIT_ASSERT(!(b < a));
+ CPPUNIT_ASSERT(a == b);
+
+ SequenceNumber c = ++a;
+ CPPUNIT_ASSERT(a > b);
+ CPPUNIT_ASSERT(b < a);
+ CPPUNIT_ASSERT(a != b);
+ CPPUNIT_ASSERT(a == c);
+
+ ++b;
+ CPPUNIT_ASSERT(!(a > b));
+ CPPUNIT_ASSERT(!(b < a));
+ CPPUNIT_ASSERT(a == b);
+ }
+
+ void testWrapAround()
+ {
+ const uint32_t max = 0xFFFFFFFF;
+ SequenceNumber a(max - 10);
+ SequenceNumber b(max - 5);
+
+ //increment until b wraps around
+ for (int i = 0; i < 6; i++) {
+ CPPUNIT_ASSERT(++a < ++b);//test prefix
+ }
+ //verify we have wrapped around
+ CPPUNIT_ASSERT(a.getValue() > b.getValue());
+ //keep incrementing until a also wraps around
+ for (int i = 0; i < 6; i++) {
+ CPPUNIT_ASSERT(a++ < b++);//test postfix
+ }
+ //let a 'catch up'
+ for (int i = 0; i < 5; i++) {
+ a++;
+ }
+ CPPUNIT_ASSERT(a == b);
+ CPPUNIT_ASSERT(++a > b);
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(SequenceNumberTest);
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index 3b9b1db308..df9fa89501 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -69,8 +69,7 @@ public:
{
for(int i = 0; i < 10; i++){
Message::shared_ptr msg(
- new BasicMessage(0, "exchange", "routing_key", false, false,
- MockChannel::basicGetBody()));
+ new BasicMessage(0, "exchange", "routing_key", false, false));
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
messages.push_back(msg);
diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp
index 8c3c7afe31..05c03754f9 100644
--- a/cpp/src/tests/TxPublishTest.cpp
+++ b/cpp/src/tests/TxPublishTest.cpp
@@ -69,8 +69,7 @@ public:
TxPublishTest() :
queue1(new Queue("queue1", false, &store, 0)),
queue2(new Queue("queue2", false, &store, 0)),
- msg(new BasicMessage(0, "exchange", "routing_key", false, false,
- MockChannel::basicGetBody())),
+ msg(new BasicMessage(0, "exchange", "routing_key", false, false)),
op(msg)
{
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));