diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-19 08:27:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-19 08:27:36 +0000 |
commit | b87a1e9d27755e2f98792567c29a0625b92c8654 (patch) | |
tree | cb1232987efbfa1cc0ef8ec5e62b07b6b6c918b6 | |
parent | dfe8a370b6580446cf970e27562ad0385178922f (diff) | |
download | qpid-python-b87a1e9d27755e2f98792567c29a0625b92c8654.tar.gz |
removed the need to pass MethodContext/RequestId through proxy and handler/adapter interfaces
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557522 13f79535-47bb-0310-9956-ffa450edef68
35 files changed, 570 insertions, 383 deletions
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/AmqpClass.java b/cpp/gentools/src/org/apache/qpid/gentools/AmqpClass.java index 2e8bdaf971..1bc9dc456a 100644 --- a/cpp/gentools/src/org/apache/qpid/gentools/AmqpClass.java +++ b/cpp/gentools/src/org/apache/qpid/gentools/AmqpClass.java @@ -93,6 +93,9 @@ public class AmqpClass implements Printable, NodeAware thisMethod = new AmqpMethod(methodName, converter); methodMap.put(methodName, thisMethod); } + int content = Utils.getNamedIntegerAttribute(child, Utils.ATTRIBUTE_CONTENT, 0); + thisMethod.content = (content == 1); + if (!thisMethod.addFromNode(child, 0, version)) { String className = converter.prepareClassName(Utils.getNamedAttribute(classNode, diff --git a/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java b/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java index f3c5bdd935..e78eec112f 100644 --- a/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java +++ b/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java @@ -36,6 +36,7 @@ public class AmqpMethod implements Printable, NodeAware, VersionConsistencyCheck public AmqpFlagMap clientMethodFlagMap; // Method called on client (<chassis name="server"> in XML) public AmqpFlagMap serverMethodFlagMap; // Method called on server (<chassis name="client"> in XML) public AmqpFlagMap isResponseFlagMap; + public boolean content; public AmqpMethod(String name, LanguageConverter converter) { diff --git a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java index 26b189a1ff..edcf9e09e3 100644 --- a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java +++ b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java @@ -763,14 +763,23 @@ public class CppGenerator extends Generator AmqpVersionSet versionSet = overloadededParameterMap.get(thisFieldMap); if (!first) sb.append(cr); sb.append(indent + "virtual "+returnType+" "+ methodName + "("); - if (abstractMethodFlag) sb.append("const MethodContext& context"); - boolean leadingComma = abstractMethodFlag; - int paramIndent = indentSize + (5*tabSize); - sb.append(generateMethodParameterList(thisFieldMap, paramIndent, leadingComma, true, true)); + + if (abstractMethodFlag && isSpecialCase(thisClass.name, method.name)) { + sb.append("const MethodContext& context"); + } else { + sb.append(generateMethodParameterList(thisFieldMap, indentSize + (5*tabSize), false, true, true)); + } + + //if (abstractMethodFlag) sb.append("const MethodContext& context"); + //boolean leadingComma = abstractMethodFlag; + //int paramIndent = indentSize + (5*tabSize); + // sb.append(generateMethodParameterList(thisFieldMap, paramIndent, leadingComma, true, true)); + /* if (!abstractMethodFlag && method.isResponse(null)) { if (!thisFieldMap.isEmpty()) sb.append(", \n"+Utils.createSpaces(paramIndent)); sb.append(" RequestId responseTo"); } + */ sb.append(" )"); if (abstractMethodFlag) sb.append(" = 0"); @@ -888,6 +897,7 @@ public class CppGenerator extends Generator sb.append(indent + "{" + cr); sb.append(indent + "private:" + cr); sb.append(indent + tab + "ChannelAdapter& channel;" + cr); + sb.append(indent + tab + "RequestId responseTo;" + cr); sb.append(cr); sb.append(indent + "public:" + cr); sb.append(indent + tab + "// Constructors and destructors" + cr); @@ -897,6 +907,9 @@ public class CppGenerator extends Generator sb.append(indent + tab + "virtual ~" + className + "() {}" + cr); sb.append(cr); sb.append(indent + tab + "static "+className+"& get(" + proxyOuterClassName(serverFlag)+"& proxy) { return proxy.get"+className+"();}\n\n"); + sb.append(indent + tab + "// set for response correlation" + cr); + sb.append(indent + tab + "void setResponseTo(RequestId r) { responseTo = r; }" + cr); + sb.append(cr); sb.append(indent + tab + "// Protocol methods" + cr); sb.append(cr); sb.append(generateInnerClassMethods(thisClass, serverFlag, false, indentSize + tabSize, tabSize)); @@ -1004,8 +1017,10 @@ public class CppGenerator extends Generator methodName + "("); sb.append(generateMethodParameterList(thisFieldMap, indentSize + (5*tabSize), false, true, true)); if (method.isResponse(null)) { + /* if (!thisFieldMap.isEmpty()) sb.append(", "); - sb.append("RequestId responseTo"); + sb.append("RequestId responseTo"); + */ } sb.append(")"); if (versionSet.size() != globalVersionSet.size()) @@ -1457,6 +1472,7 @@ public class CppGenerator extends Generator String indent = Utils.createSpaces(indentSize); String tab = Utils.createSpaces(tabSize); StringBuffer sb = new StringBuffer(); + boolean special = isSpecialCase(thisClass.name, method.name); if (method.serverMethodFlagMap.size() > 0) // At least one AMQP version defines this method as a server method { @@ -1466,14 +1482,18 @@ public class CppGenerator extends Generator if (bItr.next()) // This is a server operation { boolean fieldMapNotEmptyFlag = method.fieldMap.size() > 0; - sb.append(indent + "void invoke(AMQP_ServerOperations& target, const MethodContext& context)" + cr); + sb.append(indent + "void invoke(AMQP_ServerOperations& target, const MethodContext&" + + (special ? " context)" : " )") + cr); sb.append(indent + "{" + cr); sb.append(indent + tab + "target.get" + thisClass.name + "Handler()->" + parseForReservedWords(Utils.firstLower(method.name), - thisClass.name + Utils.firstUpper(method.name) + "Body.invoke()") + "(context"); - if (fieldMapNotEmptyFlag) + thisClass.name + Utils.firstUpper(method.name) + "Body.invoke()") + "("); + if (special) + { + sb.append("context"); + } + else if (fieldMapNotEmptyFlag) { - sb.append("," + cr); sb.append(generateFieldList(method.fieldMap, version, false, false, indentSize + 4*tabSize)); sb.append(indent + tab + tab + tab + tab); } @@ -1632,4 +1652,9 @@ public class CppGenerator extends Generator } return ccn.toString(); } + + private boolean isSpecialCase(String className, String methodName) + { + return "message".equalsIgnoreCase(className) && ("transfer".equalsIgnoreCase(methodName) || "append".equalsIgnoreCase(methodName)); + } } diff --git a/cpp/gentools/src/org/apache/qpid/gentools/Utils.java b/cpp/gentools/src/org/apache/qpid/gentools/Utils.java index ade00cd8c7..e1c4661acf 100644 --- a/cpp/gentools/src/org/apache/qpid/gentools/Utils.java +++ b/cpp/gentools/src/org/apache/qpid/gentools/Utils.java @@ -70,6 +70,13 @@ public class Utils { return Integer.parseInt(getNamedAttribute(n, attrName)); } + + public static int getNamedIntegerAttribute(Node node, String name, int defaultValue) throws AmqpParseException + { + NamedNodeMap attributes = node.getAttributes(); + Attr a = attributes == null ? null : (Attr) attributes.getNamedItem(name); + return a == null ? defaultValue : Integer.parseInt(a.getNodeValue()); + } // Element functions diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index e215272bec..0231a58217 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -151,6 +151,7 @@ libqpidcommon_la_SOURCES = \ qpid/framing/ProtocolVersionException.cpp \ qpid/framing/Requester.cpp \ qpid/framing/Responder.cpp \ + qpid/framing/SequenceNumber.cpp \ qpid/framing/Correlator.cpp \ qpid/framing/Value.cpp \ qpid/framing/Proxy.cpp \ @@ -360,6 +361,7 @@ nobase_include_HEADERS = \ qpid/framing/Requester.h \ qpid/framing/Responder.h \ qpid/framing/SerializeHandler.h \ + qpid/framing/SequenceNumber.h \ qpid/framing/Value.h \ qpid/framing/Uuid.h \ qpid/framing/amqp_framing.h \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 2a0aa9ffee..be43dacb27 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -53,36 +53,33 @@ ProtocolVersion BrokerAdapter::getVersion() const { return connection.getVersion(); } -void BrokerAdapter::ChannelHandlerImpl::open( - const MethodContext& context, const string& /*outOfBand*/){ +void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){ channel.open(); // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - client.openOk( - std::string()/* ID */, context.getRequestId()); + client.openOk(std::string()/* ID */);//GRS, context.getRequestId()); } -void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext& context, bool active){ +void BrokerAdapter::ChannelHandlerImpl::flow(bool active){ channel.flow(active); - client.flowOk(active, context.getRequestId()); + client.flowOk(active);//GRS, context.getRequestId()); } -void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){} -void BrokerAdapter::ChannelHandlerImpl::close( - const MethodContext& context, uint16_t /*replyCode*/, +void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/, const string& /*replyText*/, uint16_t /*classId*/, uint16_t /*methodId*/) { - client.closeOk(context.getRequestId()); + client.closeOk();//GRS context.getRequestId()); // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. connection.closeChannel(channel.getId()); } -void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} +void BrokerAdapter::ChannelHandlerImpl::closeOk(){} -void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type, +void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait, const FieldTable& args){ @@ -107,31 +104,30 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u } } if(!nowait){ - client.declareOk(context.getRequestId()); + client.declareOk();//GRS context.getRequestId()); } } -void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, +void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/, bool nowait){ //TODO: implement unused Exchange::shared_ptr exchange(broker.getExchanges().get(name)); if (exchange->isDurable()) broker.getStore().destroy(*exchange); broker.getExchanges().destroy(name); - if(!nowait) client.deleteOk(context.getRequestId()); + if(!nowait) client.deleteOk();//GRS context.getRequestId()); } -void BrokerAdapter::ExchangeHandlerImpl::query(const MethodContext& context, u_int16_t /*ticket*/, const string& name) +void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) { try { Exchange::shared_ptr exchange(broker.getExchanges().get(name)); - client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs(), context.getRequestId()); + client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());//GRS, context.getRequestId()); } catch (const ChannelException& e) { - client.queryOk("", false, true, FieldTable(), context.getRequestId()); + client.queryOk("", false, true, FieldTable());//GRS, context.getRequestId()); } } -void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& context, - u_int16_t /*ticket*/, +void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, const std::string& exchangeName, const std::string& queueName, const std::string& key, @@ -148,22 +144,22 @@ void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& cont } if (!exchange) { - client.queryOk(true, false, false, false, false, context.getRequestId()); + client.queryOk(true, false, false, false, false);//GRS, context.getRequestId()); } else if (!queueName.empty() && !queue) { - client.queryOk(false, true, false, false, false, context.getRequestId()); + client.queryOk(false, true, false, false, false);//GRS, context.getRequestId()); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - client.queryOk(false, false, false, false, false, context.getRequestId()); + client.queryOk(false, false, false, false, false);//GRS, context.getRequestId()); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched, context.getRequestId()); + client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);//GRS, context.getRequestId()); } } -void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name, +void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; @@ -200,12 +196,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint if (!nowait) { string queueName = queue->getName(); client.declareOk( - queueName, queue->getMessageCount(), queue->getConsumerCount(), - context.getRequestId()); + queueName, queue->getMessageCount(), queue->getConsumerCount());//GRS, context.getRequestId()); } } -void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, +void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, bool nowait, const FieldTable& arguments){ @@ -219,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_ broker.getStore().bind(*exchange, *queue, routingKey, arguments); } } - if(!nowait) client.bindOk(context.getRequestId()); + if(!nowait) client.bindOk();//GRS context.getRequestId()); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -227,9 +222,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_ } void -BrokerAdapter::QueueHandlerImpl::unbind( - const MethodContext& context, - uint16_t /*ticket*/, +BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, @@ -245,17 +238,17 @@ BrokerAdapter::QueueHandlerImpl::unbind( broker.getStore().unbind(*exchange, *queue, routingKey, arguments); } - client.unbindOk(context.getRequestId()); + client.unbindOk();//GRS context.getRequestId()); } -void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = getQueue(queueName); int count = queue->purge(); - if(!nowait) client.purgeOk( count, context.getRequestId()); + if(!nowait) client.purgeOk( count);//GRS, context.getRequestId()); } -void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, const string& queue, +void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); @@ -277,21 +270,20 @@ void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint } if(!nowait) - client.deleteOk(count, context.getRequestId()); + client.deleteOk(count);//GRS, context.getRequestId()); } -void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.qosOk(context.getRequestId()); + client.qosOk();//GRS context.getRequestId()); } -void BrokerAdapter::BasicHandlerImpl::consume( - const MethodContext& context, uint16_t /*ticket*/, +void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait, const FieldTable& fields) @@ -308,29 +300,26 @@ void BrokerAdapter::BasicHandlerImpl::consume( channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())), newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) client.consumeOk(newTag, context.getRequestId()); + if(!nowait) client.consumeOk(newTag);//GRS, context.getRequestId()); //allow messages to be dispatched if required as there is now a consumer: queue->requestDispatch(); } -void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ +void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){ channel.cancel(consumerTag); - if(!nowait) client.cancelOk(consumerTag, context.getRequestId()); + if(!nowait) client.cancelOk(consumerTag);//GRS, context.getRequestId()); } -void BrokerAdapter::BasicHandlerImpl::publish( - const MethodContext& context, uint16_t /*ticket*/, +void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, const string& exchangeName, const string& routingKey, bool mandatory, bool immediate) { Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - BasicMessage* msg = new BasicMessage( - &connection, exchangeName, routingKey, mandatory, immediate, - context.methodBody); + BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate); channel.handlePublish(msg); }else{ throw ChannelException( @@ -338,45 +327,47 @@ void BrokerAdapter::BasicHandlerImpl::publish( } } -void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = getQueue(queueName); GetAdapter out(adapter, queue, "", connection.getFrameMax()); if(!channel.get(out, queue, !noAck)){ string clusterId;//not used, part of an imatix hack - client.getEmpty(clusterId, context.getRequestId()); + client.getEmpty(clusterId);//GRS, context.getRequestId()); } } -void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, uint64_t deliveryTag, bool multiple){ +void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){ channel.ack(deliveryTag, multiple); } -void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, uint64_t /*deliveryTag*/, bool /*requeue*/){} +void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ +void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) +{ channel.recover(requeue); } -void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::select() +{ channel.startTx(); - client.selectOk(context.getRequestId()); + client.selectOk();//GRS context.getRequestId()); } -void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::commit() +{ channel.commit(); - client.commitOk(context.getRequestId()); + client.commitOk();//GRS context.getRequestId()); } -void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ - +void BrokerAdapter::TxHandlerImpl::rollback() +{ channel.rollback(); - client.rollbackOk(context.getRequestId()); + client.rollbackOk();//GRS context.getRequestId()); channel.recover(false); } -void -BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) +void BrokerAdapter::ChannelHandlerImpl::ok() { //no specific action required, generic response handling should be sufficient } @@ -385,27 +376,36 @@ BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) // // Message class method handlers // -void -BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) +void BrokerAdapter::ChannelHandlerImpl::ping() { - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); client.pong(); } void -BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) +BrokerAdapter::ChannelHandlerImpl::pong() { - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } -void -BrokerAdapter::ChannelHandlerImpl::resume( - const MethodContext&, - const string& /*channel*/ ) +void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } +void BrokerAdapter::setResponseTo(RequestId r) +{ + basicHandler.client.setResponseTo(r); + channelHandler.client.setResponseTo(r); + exchangeHandler.client.setResponseTo(r); + bindingHandler.client.setResponseTo(r); + messageHandler.client.setResponseTo(r); + queueHandler.client.setResponseTo(r); + txHandler.client.setResponseTo(r); + dtxHandler.setResponseTo(r); +} + + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 795744aa9a..41013e8bf6 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -85,6 +85,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations } framing::AMQP_ClientProxy& getProxy() { return proxy; } + void setResponseTo(framing::RequestId r); private: @@ -95,16 +96,16 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public: ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - void open(const framing::MethodContext& context, const std::string& outOfBand); - void flow(const framing::MethodContext& context, bool active); - void flowOk(const framing::MethodContext& context, bool active); - void ok( const framing::MethodContext& context ); - void ping( const framing::MethodContext& context ); - void pong( const framing::MethodContext& context ); - void resume( const framing::MethodContext& context, const std::string& channelId ); - void close(const framing::MethodContext& context, uint16_t replyCode, const + void open(const std::string& outOfBand); + void flow(bool active); + void flowOk(bool active); + void ok( ); + void ping( ); + void pong( ); + void resume( const std::string& channelId ); + void close(uint16_t replyCode, const std::string& replyText, uint16_t classId, uint16_t methodId); - void closeOk(const framing::MethodContext& context); + void closeOk(); }; class ExchangeHandlerImpl : @@ -114,16 +115,14 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public: ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - void declare(const framing::MethodContext& context, uint16_t ticket, + void declare(uint16_t ticket, const std::string& exchange, const std::string& type, bool passive, bool durable, bool autoDelete, bool internal, bool nowait, const qpid::framing::FieldTable& arguments); - void delete_(const framing::MethodContext& context, uint16_t ticket, + void delete_(uint16_t ticket, const std::string& exchange, bool ifUnused, bool nowait); - void query(const framing::MethodContext& context, - u_int16_t ticket, - const string& name); + void query(u_int16_t ticket, const string& name); }; class BindingHandlerImpl : @@ -133,8 +132,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public: BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - void query(const framing::MethodContext& context, - u_int16_t ticket, + void query(u_int16_t ticket, const std::string& exchange, const std::string& queue, const std::string& routingKey, @@ -148,22 +146,21 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public: QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - void declare(const framing::MethodContext& context, uint16_t ticket, const std::string& queue, + void declare(uint16_t ticket, const std::string& queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); - void bind(const framing::MethodContext& context, uint16_t ticket, const std::string& queue, + void bind(uint16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, bool nowait, const qpid::framing::FieldTable& arguments); - void unbind(const framing::MethodContext& context, - uint16_t ticket, + void unbind(uint16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments ); - void purge(const framing::MethodContext& context, uint16_t ticket, const std::string& queue, + void purge(uint16_t ticket, const std::string& queue, bool nowait); - void delete_(const framing::MethodContext& context, uint16_t ticket, const std::string& queue, + void delete_(uint16_t ticket, const std::string& queue, bool ifUnused, bool ifEmpty, bool nowait); }; @@ -177,23 +174,23 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public: BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent), tagGenerator("sgen") {} - void qos(const framing::MethodContext& context, uint32_t prefetchSize, + void qos(uint32_t prefetchSize, uint16_t prefetchCount, bool global); void consume( - const framing::MethodContext& context, uint16_t ticket, const std::string& queue, + uint16_t ticket, const std::string& queue, const std::string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait, const qpid::framing::FieldTable& fields); - void cancel(const framing::MethodContext& context, const std::string& consumerTag, + void cancel(const std::string& consumerTag, bool nowait); - void publish(const framing::MethodContext& context, uint16_t ticket, + void publish(uint16_t ticket, const std::string& exchange, const std::string& routingKey, bool mandatory, bool immediate); - void get(const framing::MethodContext& context, uint16_t ticket, const std::string& queue, + void get(uint16_t ticket, const std::string& queue, bool noAck); - void ack(const framing::MethodContext& context, uint64_t deliveryTag, bool multiple); - void reject(const framing::MethodContext& context, uint64_t deliveryTag, bool requeue); - void recover(const framing::MethodContext& context, bool requeue); + void ack(uint64_t deliveryTag, bool multiple); + void reject(uint64_t deliveryTag, bool requeue); + void recover(bool requeue); }; class TxHandlerImpl : @@ -203,9 +200,9 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public: TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - void select(const framing::MethodContext& context); - void commit(const framing::MethodContext& context); - void rollback(const framing::MethodContext& context); + void select(); + void commit(); + void rollback(); }; Connection& connection; diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp index 83b6f2bf18..21b56869d4 100644 --- a/cpp/src/qpid/broker/BrokerMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessage.cpp @@ -29,6 +29,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/BasicDeliverBody.h" #include "qpid/framing/BasicGetOkBody.h" +#include "qpid/framing/BasicPublishBody.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/AMQMethodBody.h" @@ -44,10 +45,10 @@ using namespace qpid::sys; BasicMessage::BasicMessage( const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, - bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo + bool _mandatory, bool _immediate ) : Message(_publisher, _exchange, _routingKey, _mandatory, - _immediate, respondTo), + _immediate, framing::AMQMethodBody::shared_ptr(new BasicPublishBody(ProtocolVersion(0,9)))), size(0) {} @@ -84,19 +85,20 @@ void BasicMessage::deliver(ChannelAdapter& channel, sendContent(channel, framesize); } -void BasicMessage::sendGetOk(const MethodContext& context, - const std::string& /*destination*/, +void BasicMessage::sendGetOk(ChannelAdapter& channel, + const std::string& /*destination*/, uint32_t messageCount, + uint64_t responseTo, uint64_t deliveryTag, uint32_t framesize) { - context.channel->send( + channel.send( new BasicGetOkBody( - context.channel->getVersion(), - context.methodBody->getRequestId(), + channel.getVersion(), + responseTo, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount)); - sendContent(*context.channel, framesize); + sendContent(channel, framesize); } void BasicMessage::sendContent( diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h index aee5092dab..2e031d0bb2 100644 --- a/cpp/src/qpid/broker/BrokerMessage.h +++ b/cpp/src/qpid/broker/BrokerMessage.h @@ -63,8 +63,7 @@ class BasicMessage : public Message { BasicMessage(const ConnectionToken* const publisher, const string& exchange, const string& routingKey, - bool mandatory, bool immediate, - boost::shared_ptr<framing::AMQMethodBody> respondTo); + bool mandatory, bool immediate); BasicMessage(); ~BasicMessage(); void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header); @@ -76,9 +75,10 @@ class BasicMessage : public Message { uint64_t deliveryTag, uint32_t framesize); - void sendGetOk(const framing::MethodContext&, - const std::string& destination, + void sendGetOk(framing::ChannelAdapter& channel, + const std::string& destination, uint32_t messageCount, + uint64_t responseTo, uint64_t deliveryTag, uint32_t framesize); diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h index 048b1c80e2..a6b039cd4d 100644 --- a/cpp/src/qpid/broker/BrokerMessageBase.h +++ b/cpp/src/qpid/broker/BrokerMessageBase.h @@ -85,7 +85,6 @@ class Message : public PersistableMessage{ const std::string& getExchange() const { return exchange; } uint64_t getPersistenceId() const { return persistenceId; } bool getRedelivered() const { return redelivered; } - AMQMethodBodyPtr getRespondTo() const { return respondTo; } void setRouting(const std::string& _exchange, const std::string& _routingKey) { exchange = _exchange; routingKey = _routingKey; } @@ -102,9 +101,10 @@ class Message : public PersistableMessage{ /** * Used to return a message in response to a get from a queue */ - virtual void sendGetOk(const framing::MethodContext& context, + virtual void sendGetOk(framing::ChannelAdapter& channel, const std::string& destination, uint32_t messageCount, + uint64_t responseTo, uint64_t deliveryTag, uint32_t framesize) = 0; diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index 17ccdc03b8..b23ebaf50b 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -185,14 +185,14 @@ void MessageMessage::deliver( } void MessageMessage::sendGetOk( - const framing::MethodContext& context, + framing::ChannelAdapter& channel, const std::string& destination, uint32_t /*messageCount*/, + uint64_t /*responseTo*/, uint64_t /*deliveryTag*/, uint32_t framesize) { - framing::ChannelAdapter* channel = context.channel; - transferMessage(*channel, destination, framesize); + transferMessage(channel, destination, framesize); } bool MessageMessage::isComplete() diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h index e64e08fcef..c2d4b7f20b 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.h +++ b/cpp/src/qpid/broker/BrokerMessageMessage.h @@ -58,9 +58,10 @@ class MessageMessage: public Message{ uint64_t deliveryTag, uint32_t framesize); - void sendGetOk(const framing::MethodContext& context, - const std::string& destination, + void sendGetOk(framing::ChannelAdapter& channel, + const std::string& destination, uint32_t messageCount, + uint64_t responseTo, uint64_t deliveryTag, uint32_t framesize); diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp index 8a4450c881..bb2a66bfdb 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.cpp +++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp @@ -50,10 +50,14 @@ void ConnectionAdapter::handleMethodInContext( ) { try{ + handler->client.setResponseTo(context.getRequestId()); method->invoke(*this, context); + handler->client.setResponseTo(0); }catch(ConnectionException& e){ + handler->client.setResponseTo(0); handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ + handler->client.setResponseTo(0); handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } @@ -82,43 +86,38 @@ Handler::Handler(Connection& c, ConnectionAdapter& a) : proxy(a), client(proxy.getConnection()), connection(c) {} -void Handler::startOk( - const MethodContext&, const FieldTable& /*clientProperties*/, +void Handler::startOk(const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/) { client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat()); } -void Handler::secureOk( - const MethodContext&, const string& /*response*/){} +void Handler::secureOk(const string& /*response*/){} -void Handler::tuneOk( - const MethodContext&, uint16_t /*channelmax*/, +void Handler::tuneOk(uint16_t /*channelmax*/, uint32_t framemax, uint16_t heartbeat) { connection.setFrameMax(framemax); connection.setHeartbeat(heartbeat); } -void Handler::open( - const MethodContext& context, const string& /*virtualHost*/, +void Handler::open(const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/) { string knownhosts; client.openOk( - knownhosts, context.getRequestId()); + knownhosts);//GRS, context.getRequestId()); } -void Handler::close( - const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/, +void Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, uint16_t /*classId*/, uint16_t /*methodId*/) { - client.closeOk(context.getRequestId()); + client.closeOk();//GRS context.getRequestId()); connection.getOutput().close(); } -void Handler::closeOk(const MethodContext&){ +void Handler::closeOk(){ connection.getOutput().close(); } diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h index 383fbf84c0..2e27abd333 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.h +++ b/cpp/src/qpid/broker/ConnectionAdapter.h @@ -81,22 +81,16 @@ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler Connection& connection; Handler(Connection& connection, ConnectionAdapter& adapter); - void startOk(const framing::MethodContext& context, - const qpid::framing::FieldTable& clientProperties, + void startOk(const qpid::framing::FieldTable& clientProperties, const std::string& mechanism, const std::string& response, const std::string& locale); - void secureOk(const framing::MethodContext& context, - const std::string& response); - void tuneOk(const framing::MethodContext& context, - uint16_t channelMax, - uint32_t frameMax, uint16_t heartbeat); - void open(const framing::MethodContext& context, - const std::string& virtualHost, - const std::string& capabilities, bool insist); - void close(const framing::MethodContext& context, uint16_t replyCode, - const std::string& replyText, + void secureOk(const std::string& response); + void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); + void open(const std::string& virtualHost, + const std::string& capabilities, bool insist); + void close(uint16_t replyCode, const std::string& replyText, uint16_t classId, uint16_t methodId); - void closeOk(const framing::MethodContext& context); + void closeOk(); }; }} diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index aeb1c3014c..2aa53e66bd 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -49,14 +49,13 @@ const int XA_OK(8); // DtxDemarcationHandler: -void DtxHandlerImpl::select(const MethodContext& context ) +void DtxHandlerImpl::select() { channel.selectDtx(); - dClient.selectOk(context.getRequestId()); + dClient.selectOk();//GRS context.getRequestId()); } -void DtxHandlerImpl::end(const MethodContext& context, - u_int16_t /*ticket*/, +void DtxHandlerImpl::end(u_int16_t /*ticket*/, const string& xid, bool fail, bool suspend) @@ -67,7 +66,7 @@ void DtxHandlerImpl::end(const MethodContext& context, if (suspend) { throw ConnectionException(503, "End and suspend cannot both be set."); } else { - dClient.endOk(XA_RBROLLBACK, context.getRequestId()); + dClient.endOk(XA_RBROLLBACK);//GRS, context.getRequestId()); } } else { if (suspend) { @@ -75,15 +74,14 @@ void DtxHandlerImpl::end(const MethodContext& context, } else { channel.endDtx(xid, false); } - dClient.endOk(XA_OK, context.getRequestId()); + dClient.endOk(XA_OK);//GRS, context.getRequestId()); } } catch (const DtxTimeoutException& e) { - dClient.endOk(XA_RBTIMEOUT, context.getRequestId()); + dClient.endOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); } } -void DtxHandlerImpl::start(const MethodContext& context, - u_int16_t /*ticket*/, +void DtxHandlerImpl::start(u_int16_t /*ticket*/, const string& xid, bool join, bool resume) @@ -97,54 +95,50 @@ void DtxHandlerImpl::start(const MethodContext& context, } else { channel.startDtx(xid, broker.getDtxManager(), join); } - dClient.startOk(XA_OK, context.getRequestId()); + dClient.startOk(XA_OK);//GRS, context.getRequestId()); } catch (const DtxTimeoutException& e) { - dClient.startOk(XA_RBTIMEOUT, context.getRequestId()); + dClient.startOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); } } // DtxCoordinationHandler: -void DtxHandlerImpl::prepare(const MethodContext& context, - u_int16_t /*ticket*/, +void DtxHandlerImpl::prepare(u_int16_t /*ticket*/, const string& xid) { try { bool ok = broker.getDtxManager().prepare(xid); - cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); + cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId()); } catch (const DtxTimeoutException& e) { - cClient.prepareOk(XA_RBTIMEOUT, context.getRequestId()); + cClient.prepareOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); } } -void DtxHandlerImpl::commit(const MethodContext& context, - u_int16_t /*ticket*/, +void DtxHandlerImpl::commit(u_int16_t /*ticket*/, const string& xid, bool onePhase) { try { bool ok = broker.getDtxManager().commit(xid, onePhase); - cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); + cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId()); } catch (const DtxTimeoutException& e) { - cClient.commitOk(XA_RBTIMEOUT, context.getRequestId()); + cClient.commitOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); } } -void DtxHandlerImpl::rollback(const MethodContext& context, - u_int16_t /*ticket*/, +void DtxHandlerImpl::rollback(u_int16_t /*ticket*/, const string& xid ) { try { broker.getDtxManager().rollback(xid); - cClient.rollbackOk(XA_OK, context.getRequestId()); + cClient.rollbackOk(XA_OK);//GRS, context.getRequestId()); } catch (const DtxTimeoutException& e) { - cClient.rollbackOk(XA_RBTIMEOUT, context.getRequestId()); + cClient.rollbackOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); } } -void DtxHandlerImpl::recover(const MethodContext& context, - u_int16_t /*ticket*/, +void DtxHandlerImpl::recover(u_int16_t /*ticket*/, bool /*startscan*/, u_int32_t /*endscan*/ ) { @@ -177,33 +171,34 @@ void DtxHandlerImpl::recover(const MethodContext& context, FieldTable response; response.setString("xids", data); - cClient.recoverOk(response, context.getRequestId()); + cClient.recoverOk(response);//GRS, context.getRequestId()); } -void DtxHandlerImpl::forget(const MethodContext& /*context*/, - u_int16_t /*ticket*/, +void DtxHandlerImpl::forget(u_int16_t /*ticket*/, const string& xid) { //Currently no heuristic completion is supported, so this should never be used. throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid); } -void DtxHandlerImpl::getTimeout(const MethodContext& context, - const string& xid) +void DtxHandlerImpl::getTimeout(const string& xid) { uint32_t timeout = broker.getDtxManager().getTimeout(xid); - cClient.getTimeoutOk(timeout, context.getRequestId()); + cClient.getTimeoutOk(timeout);//GRS, context.getRequestId()); } -void DtxHandlerImpl::setTimeout(const MethodContext& context, - u_int16_t /*ticket*/, +void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/, const string& xid, u_int32_t timeout) { broker.getDtxManager().setTimeout(xid, timeout); - cClient.setTimeoutOk(context.getRequestId()); + cClient.setTimeoutOk();//GRS context.getRequestId()); } - +void DtxHandlerImpl::setResponseTo(framing::RequestId r) +{ + dClient.setResponseTo(r); + cClient.setResponseTo(r); +} diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h index eda9e83a91..e18d3c153d 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.h +++ b/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -36,53 +36,31 @@ class DtxHandlerImpl public: DtxHandlerImpl(CoreRefs& parent); + void setResponseTo(framing::RequestId r); + // DtxCoordinationHandler: - void commit(const framing::MethodContext& context, - u_int16_t ticket, - const std::string& xid, - bool onePhase ); + void commit(u_int16_t ticket, const std::string& xid, bool onePhase); - void forget(const framing::MethodContext& context, - u_int16_t ticket, - const std::string& xid ); + void forget(u_int16_t ticket, const std::string& xid); - void getTimeout(const framing::MethodContext& context, - const std::string& xid ); + void getTimeout(const std::string& xid); - void prepare(const framing::MethodContext& context, - u_int16_t ticket, - const std::string& xid ); + void prepare(u_int16_t ticket, const std::string& xid); - void recover(const framing::MethodContext& context, - u_int16_t ticket, - bool startscan, - u_int32_t endscan ); + void recover(u_int16_t ticket, bool startscan, u_int32_t endscan); - void rollback(const framing::MethodContext& context, - u_int16_t ticket, - const std::string& xid ); + void rollback(u_int16_t ticket, const std::string& xid); - void setTimeout(const framing::MethodContext& context, - u_int16_t ticket, - const std::string& xid, - u_int32_t timeout ); + void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout); // DtxDemarcationHandler: - void end(const framing::MethodContext& context, - u_int16_t ticket, - const std::string& xid, - bool fail, - bool suspend ); + void end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend); - void select(const framing::MethodContext& context ); + void select(); - void start(const framing::MethodContext& context, - u_int16_t ticket, - const std::string& xid, - bool join, - bool resume ); + void start(u_int16_t ticket, const std::string& xid, bool join, bool resume); }; diff --git a/cpp/src/qpid/broker/GetAdapter.cpp b/cpp/src/qpid/broker/GetAdapter.cpp index 4a2f6d34d4..bbffade712 100644 --- a/cpp/src/qpid/broker/GetAdapter.cpp +++ b/cpp/src/qpid/broker/GetAdapter.cpp @@ -36,6 +36,5 @@ RequestId GetAdapter::getNextDeliveryTag() void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag) { - msg->sendGetOk(MethodContext(&adapter, msg->getRespondTo()), destination, - queue->getMessageCount(), deliveryTag, framesize); + msg->sendGetOk(adapter, destination, queue->getMessageCount(), 1, deliveryTag, framesize); } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 252b465cc5..c9fbc2b95d 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -42,72 +42,64 @@ MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) // void -MessageHandlerImpl::cancel(const MethodContext& context, - const string& destination ) +MessageHandlerImpl::cancel(const string& destination ) { channel.cancel(destination); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::open(const MethodContext& context, - const string& reference) +MessageHandlerImpl::open(const string& reference) { references.open(reference); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::append(const MethodContext& context, - const string& reference, - const string& /*bytes*/ ) +MessageHandlerImpl::append(const framing::MethodContext& context) { - references.get(reference)->append( - boost::shared_polymorphic_downcast<MessageAppendBody>( - context.methodBody)); - client.ok(context.getRequestId()); + MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody)); + references.get(body->getReference())->append(body); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::close(const MethodContext& context, - const string& reference) +MessageHandlerImpl::close(const string& reference) { Reference::shared_ptr ref = references.get(reference); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); // Send any transfer messages to their correct exchanges and okay them const Reference::Messages& msgs = ref->getMessages(); for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) { channel.handleInlineTransfer(*m); - client.ok((*m)->getRequestId()); + client.setResponseTo((*m)->getRequestId()); + client.ok(); } ref->close(); } void -MessageHandlerImpl::checkpoint(const MethodContext& context, - const string& /*reference*/, +MessageHandlerImpl::checkpoint(const string& /*reference*/, const string& /*identifier*/ ) { // Initial implementation (which is conforming) is to do nothing here // and return offset zero for the resume - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::resume(const MethodContext& context, - const string& reference, +MessageHandlerImpl::resume(const string& reference, const string& /*identifier*/ ) { // Initial (null) implementation // open reference and return 0 offset references.open(reference); - client.offset(0, context.getRequestId()); + client.offset(0);//GRS, );//GRS, context.getRequestId()); } void -MessageHandlerImpl::offset(const MethodContext&, - uint64_t /*value*/ ) +MessageHandlerImpl::offset(uint64_t /*value*/ ) { // Shouldn't ever receive this as it is reponse to resume // which is never sent @@ -116,8 +108,7 @@ MessageHandlerImpl::offset(const MethodContext&, } void -MessageHandlerImpl::consume(const MethodContext& context, - uint16_t /*ticket*/, +MessageHandlerImpl::consume(uint16_t /*ticket*/, const string& queueName, const string& destination, bool noLocal, @@ -132,14 +123,13 @@ MessageHandlerImpl::consume(const MethodContext& context, channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } void -MessageHandlerImpl::get( const MethodContext& context, - uint16_t /*ticket*/, +MessageHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, const string& destination, bool noAck ) @@ -148,13 +138,13 @@ MessageHandlerImpl::get( const MethodContext& context, GetAdapter out(adapter, queue, destination, connection.getFrameMax()); if(channel.get(out, queue, !noAck)) - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); else - client.empty(context.getRequestId()); + client.empty();//GRS context.getRequestId()); } void -MessageHandlerImpl::empty( const MethodContext& ) +MessageHandlerImpl::empty() { // Shouldn't ever receive this as it is a response to get // which is never sent @@ -163,34 +153,31 @@ MessageHandlerImpl::empty( const MethodContext& ) } void -MessageHandlerImpl::ok(const MethodContext& /*context*/) +MessageHandlerImpl::ok() { channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest()); } void -MessageHandlerImpl::qos(const MethodContext& context, - uint32_t prefetchSize, +MessageHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/ ) { //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::recover(const MethodContext& context, - bool requeue) +MessageHandlerImpl::recover(bool requeue) { channel.recover(requeue); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::reject(const MethodContext& /*context*/, - uint16_t /*code*/, +MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ ) { //channel.ack(); @@ -198,45 +185,20 @@ MessageHandlerImpl::reject(const MethodContext& /*context*/, } void -MessageHandlerImpl::transfer(const MethodContext& context, - uint16_t /*ticket*/, - const string& /* destination */, - bool /*redelivered*/, - bool /*immediate*/, - uint64_t /*ttl*/, - uint8_t /*priority*/, - uint64_t /*timestamp*/, - uint8_t /*deliveryMode*/, - uint64_t /*expiration*/, - const string& /*exchangeName*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const framing::FieldTable& /*applicationHeaders*/, - const framing::Content& body, - bool /*mandatory*/) +MessageHandlerImpl::transfer(const framing::MethodContext& context) { MessageTransferBody::shared_ptr transfer( boost::shared_polymorphic_downcast<MessageTransferBody>( context.methodBody)); RequestId requestId = context.getRequestId(); - if (body.isInline()) { - MessageMessage::shared_ptr message( - new MessageMessage(&connection, requestId, transfer)); + if (transfer->getBody().isInline()) { + MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer)); channel.handleInlineTransfer(message); - client.ok(requestId); + client.ok(); } else { - Reference::shared_ptr ref(references.get(body.getValue())); - MessageMessage::shared_ptr message( - new MessageMessage(&connection, requestId, transfer, ref)); + Reference::shared_ptr ref(references.get(transfer->getBody().getValue())); + MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref)); ref->addMessage(message); } } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h index 47c8cf0ce0..86c54023b5 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.h +++ b/cpp/src/qpid/broker/MessageHandlerImpl.h @@ -40,22 +40,16 @@ class MessageHandlerImpl : public: MessageHandlerImpl(CoreRefs& parent); - void append(const framing::MethodContext&, - const std::string& reference, - const std::string& bytes ); + void append(const framing::MethodContext& context); - void cancel(const framing::MethodContext&, - const std::string& destination ); + void cancel(const std::string& destination ); - void checkpoint(const framing::MethodContext&, - const std::string& reference, + void checkpoint(const std::string& reference, const std::string& identifier ); - void close(const framing::MethodContext&, - const std::string& reference ); + void close(const std::string& reference ); - void consume(const framing::MethodContext&, - uint16_t ticket, + void consume(uint16_t ticket, const std::string& queue, const std::string& destination, bool noLocal, @@ -63,62 +57,32 @@ class MessageHandlerImpl : bool exclusive, const framing::FieldTable& filter ); - void empty( const framing::MethodContext& ); + void empty(); - void get(const framing::MethodContext&, - uint16_t ticket, + void get(uint16_t ticket, const std::string& queue, const std::string& destination, bool noAck ); - void offset(const framing::MethodContext&, - uint64_t value ); + void offset(uint64_t value); - void ok( const framing::MethodContext& ); + void ok(); - void open(const framing::MethodContext&, - const std::string& reference ); + void open(const std::string& reference ); - void qos(const framing::MethodContext&, - uint32_t prefetchSize, + void qos(uint32_t prefetchSize, uint16_t prefetchCount, bool global ); - void recover(const framing::MethodContext&, - bool requeue ); + void recover(bool requeue ); - void reject(const framing::MethodContext&, - uint16_t code, + void reject(uint16_t code, const std::string& text ); - void resume(const framing::MethodContext&, - const std::string& reference, + void resume(const std::string& reference, const std::string& identifier ); - void transfer(const framing::MethodContext&, - uint16_t ticket, - const std::string& destination, - bool redelivered, - bool immediate, - uint64_t ttl, - uint8_t priority, - uint64_t timestamp, - uint8_t deliveryMode, - uint64_t expiration, - const std::string& exchange, - const std::string& routingKey, - const std::string& messageId, - const std::string& correlationId, - const std::string& replyTo, - const std::string& contentType, - const std::string& contentEncoding, - const std::string& userId, - const std::string& appId, - const std::string& transactionId, - const std::string& securityToken, - const framing::FieldTable& applicationHeaders, - const framing::Content& body, - bool mandatory ); + void transfer(const framing::MethodContext& context); private: ReferenceRegistry references; }; diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index df92f74b14..a96a8c5cde 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -37,7 +37,29 @@ SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : void SemanticHandler::handle(framing::AMQFrame& frame) { + //TODO: assembly etc when move to 0-10 framing + // + //have potentially three separate tracks at this point: + // + // (1) execution controls + // (2) commands + // (3) data i.e. content-bearing commands + // + //framesets on each can be interleaved. framesets on the latter + //two share a command-id sequence. + // + //need to decide what to do if a frame on the command track + //arrives while a frameset on the data track is still + //open. execute it (i.e. out-of order execution with respect to + //the command id sequence) or queue it up. + + //if ready to execute (i.e. if segment is complete or frame is + //message content): handleBody(frame.getBody()); + //if the frameset is complete, we can move the execution-mark + //forward (not for execution controls) + //note: need to be more sophisticated than this if we execute + //commands that arrive within an active message frameset } //ChannelAdapter virtual methods: @@ -52,9 +74,12 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ throw ConnectionException(504, out.str()); } } else { + adapter->setResponseTo(context.getRequestId()); method->invoke(*adapter, context); + adapter->setResponseTo(0); } }catch(ChannelException& e){ + adapter->setResponseTo(0); adapter->getProxy().getChannel().close( e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index a179969ece..eecf61466b 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -26,6 +26,7 @@ #include "Connection.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/FrameHandler.h" +#include "qpid/framing/SequenceNumber.h" namespace qpid { namespace broker { @@ -37,6 +38,7 @@ class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHa Connection& connection; Channel channel; std::auto_ptr<BrokerAdapter> adapter; + framing::SequenceNumber executionMark; //ChannelAdapter virtual methods: void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, diff --git a/cpp/src/qpid/framing/FramingContent.h b/cpp/src/qpid/framing/FramingContent.h index 876e90c905..c813408cf3 100644 --- a/cpp/src/qpid/framing/FramingContent.h +++ b/cpp/src/qpid/framing/FramingContent.h @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ #ifndef _framing_FramingContent_h #define _framing_FramingContent_h diff --git a/cpp/src/qpid/framing/MethodContext.cpp b/cpp/src/qpid/framing/MethodContext.cpp index 73af73f8e5..9dc42dcfd7 100644 --- a/cpp/src/qpid/framing/MethodContext.cpp +++ b/cpp/src/qpid/framing/MethodContext.cpp @@ -24,8 +24,11 @@ namespace qpid { namespace framing { RequestId MethodContext::getRequestId() const { - return boost::shared_polymorphic_downcast<AMQRequestBody>(methodBody) - ->getRequestId(); + if (methodBody->type() == REQUEST_BODY) { + return boost::shared_polymorphic_cast<AMQRequestBody>(methodBody)->getRequestId(); + } else { + return 0; + } } }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Proxy.h b/cpp/src/qpid/framing/Proxy.h index 8ed46ed748..4af496387b 100644 --- a/cpp/src/qpid/framing/Proxy.h +++ b/cpp/src/qpid/framing/Proxy.h @@ -20,6 +20,7 @@ */ #include "ProtocolVersion.h" +#include "amqp_types.h" namespace qpid { namespace framing { diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp new file mode 100644 index 0000000000..9bba67d4ae --- /dev/null +++ b/cpp/src/qpid/framing/SequenceNumber.cpp @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SequenceNumber.h" + +using qpid::framing::SequenceNumber; + +SequenceNumber::SequenceNumber() : value(0) {} + +SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {} + +bool SequenceNumber::operator==(const SequenceNumber& other) const +{ + return value == other.value; +} + +bool SequenceNumber::operator!=(const SequenceNumber& other) const +{ + return !(value == other.value); +} + + +SequenceNumber& SequenceNumber::operator++() +{ + value = value + 1; + return *this; +} + +const SequenceNumber SequenceNumber::operator++(int) +{ + SequenceNumber old(value); + value = value + 1; + return old; +} + +bool SequenceNumber::operator<(const SequenceNumber& other) const +{ + return (value - other.value) < 0; +} + +bool SequenceNumber::operator>(const SequenceNumber& other) const +{ + return other < *this; +} diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h new file mode 100644 index 0000000000..6d38084a25 --- /dev/null +++ b/cpp/src/qpid/framing/SequenceNumber.h @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _framing_SequenceNumber_h +#define _framing_SequenceNumber_h + +#include "amqp_types.h" + +namespace qpid { +namespace framing { + +/** + * 4-byte sequence number that 'wraps around'. + */ +class SequenceNumber +{ + int32_t value; + public: + SequenceNumber(); + SequenceNumber(uint32_t v); + + SequenceNumber& operator++();//prefix ++ + const SequenceNumber operator++(int);//postfix ++ + bool operator==(const SequenceNumber& other) const; + bool operator!=(const SequenceNumber& other) const; + bool operator<(const SequenceNumber& other) const; + bool operator>(const SequenceNumber& other) const; + uint32_t getValue() const { return (uint32_t) value; } +}; + +}} // namespace qpid::framing + + +#endif diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index acbe867cfa..05bdb7b3f0 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -225,9 +225,7 @@ class BrokerChannelTest : public CppUnit::TestCase Channel channel(connection, 1, &store); const string data[] = {"abcde", "fghij", "klmno"}; - Message* msg = new BasicMessage( - 0, "my_exchange", "my_routing_key", false, false, - MockChannel::basicGetBody()); + Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); store.expect(); store.stage(*msg); @@ -347,8 +345,7 @@ class BrokerChannelTest : public CppUnit::TestCase Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) { BasicMessage* msg = new BasicMessage( - 0, exchange, routingKey, false, false, - MockChannel::basicGetBody()); + 0, exchange, routingKey, false, false); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(contentSize); msg->setHeader(header); diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index 595b025e86..ef2646519d 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -63,11 +63,7 @@ class ExchangeTest : public CppUnit::TestCase queue.reset(); queue2.reset(); - Message::shared_ptr msgPtr( - new BasicMessage( - 0, "e", "A", true, true, - AMQMethodBody::shared_ptr( - new BasicGetBody(ProtocolVersion())))); + Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true)); DeliverableMessage msg(msgPtr); topic.route(msg, "abc", 0); direct.route(msg, "abc", 0); diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 69d1d8def1..255f6348b9 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -84,7 +84,8 @@ broker_unit_tests = \ framing_unit_tests = \ FieldTableTest \ FramingTest \ - HeaderTest + HeaderTest \ + SequenceNumberTest misc_unit_tests = \ ProducerConsumerTest diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp index 526db85c31..3d2ee1aaea 100644 --- a/cpp/src/tests/MessageBuilderTest.cpp +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -118,8 +118,7 @@ class MessageBuilderTest : public CppUnit::TestCase Message::shared_ptr message( new BasicMessage( - 0, "test", "my_routing_key", false, false, - MockChannel::basicGetBody())); + 0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(0); @@ -137,8 +136,7 @@ class MessageBuilderTest : public CppUnit::TestCase string data1("abcdefg"); Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false, - MockChannel::basicGetBody())); + new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(7); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -160,8 +158,7 @@ class MessageBuilderTest : public CppUnit::TestCase string data2("hijklmn"); Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false, - MockChannel::basicGetBody())); + new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -191,8 +188,7 @@ class MessageBuilderTest : public CppUnit::TestCase string data2("hijklmn"); Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false, - MockChannel::basicGetBody())); + new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp index d3869b552e..1e976312be 100644 --- a/cpp/src/tests/MessageTest.cpp +++ b/cpp/src/tests/MessageTest.cpp @@ -46,8 +46,7 @@ class MessageTest : public CppUnit::TestCase string data2("hijklmn"); BasicMessage::shared_ptr msg( - new BasicMessage(0, exchange, routingKey, false, false, - MockChannel::basicGetBody())); + new BasicMessage(0, exchange, routingKey, false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index ddf2314f8d..eb9d567b82 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -69,8 +69,7 @@ class QueueTest : public CppUnit::TestCase public: Message::shared_ptr message(std::string exchange, std::string routingKey) { return Message::shared_ptr( - new BasicMessage(0, exchange, routingKey, true, true, - MockChannel::basicGetBody())); + new BasicMessage(0, exchange, routingKey, true, true)); } void testConsumers(){ diff --git a/cpp/src/tests/SequenceNumberTest.cpp b/cpp/src/tests/SequenceNumberTest.cpp new file mode 100644 index 0000000000..f42ccfc061 --- /dev/null +++ b/cpp/src/tests/SequenceNumberTest.cpp @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid_test_plugin.h" +#include <iostream> +#include "qpid/framing/SequenceNumber.h" + +using namespace qpid::framing; + +class SequenceNumberTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(SequenceNumberTest); + CPPUNIT_TEST(testIncrementPostfix); + CPPUNIT_TEST(testIncrementPrefix); + CPPUNIT_TEST(testWrapAround); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testIncrementPostfix() + { + SequenceNumber a; + SequenceNumber b; + CPPUNIT_ASSERT(!(a > b)); + CPPUNIT_ASSERT(!(b < a)); + CPPUNIT_ASSERT(a == b); + + SequenceNumber c = a++; + CPPUNIT_ASSERT(a > b); + CPPUNIT_ASSERT(b < a); + CPPUNIT_ASSERT(a != b); + CPPUNIT_ASSERT(c < a); + CPPUNIT_ASSERT(a != c); + + b++; + CPPUNIT_ASSERT(!(a > b)); + CPPUNIT_ASSERT(!(b < a)); + CPPUNIT_ASSERT(a == b); + CPPUNIT_ASSERT(c < b); + CPPUNIT_ASSERT(b != c); + } + + void testIncrementPrefix() + { + SequenceNumber a; + SequenceNumber b; + CPPUNIT_ASSERT(!(a > b)); + CPPUNIT_ASSERT(!(b < a)); + CPPUNIT_ASSERT(a == b); + + SequenceNumber c = ++a; + CPPUNIT_ASSERT(a > b); + CPPUNIT_ASSERT(b < a); + CPPUNIT_ASSERT(a != b); + CPPUNIT_ASSERT(a == c); + + ++b; + CPPUNIT_ASSERT(!(a > b)); + CPPUNIT_ASSERT(!(b < a)); + CPPUNIT_ASSERT(a == b); + } + + void testWrapAround() + { + const uint32_t max = 0xFFFFFFFF; + SequenceNumber a(max - 10); + SequenceNumber b(max - 5); + + //increment until b wraps around + for (int i = 0; i < 6; i++) { + CPPUNIT_ASSERT(++a < ++b);//test prefix + } + //verify we have wrapped around + CPPUNIT_ASSERT(a.getValue() > b.getValue()); + //keep incrementing until a also wraps around + for (int i = 0; i < 6; i++) { + CPPUNIT_ASSERT(a++ < b++);//test postfix + } + //let a 'catch up' + for (int i = 0; i < 5; i++) { + a++; + } + CPPUNIT_ASSERT(a == b); + CPPUNIT_ASSERT(++a > b); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(SequenceNumberTest); diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp index 3b9b1db308..df9fa89501 100644 --- a/cpp/src/tests/TxAckTest.cpp +++ b/cpp/src/tests/TxAckTest.cpp @@ -69,8 +69,7 @@ public: { for(int i = 0; i < 10; i++){ Message::shared_ptr msg( - new BasicMessage(0, "exchange", "routing_key", false, false, - MockChannel::basicGetBody())); + new BasicMessage(0, "exchange", "routing_key", false, false)); msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); messages.push_back(msg); diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp index 8c3c7afe31..05c03754f9 100644 --- a/cpp/src/tests/TxPublishTest.cpp +++ b/cpp/src/tests/TxPublishTest.cpp @@ -69,8 +69,7 @@ public: TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), queue2(new Queue("queue2", false, &store, 0)), - msg(new BasicMessage(0, "exchange", "routing_key", false, false, - MockChannel::basicGetBody())), + msg(new BasicMessage(0, "exchange", "routing_key", false, false)), op(msg) { msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); |