From ef1469a7ea1f54f266aee8f2899b7cd0c7e07d08 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 15 Jan 2007 21:56:23 +0000 Subject: * Client & broker using Requester/Responder to manage request/response IDs. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496511 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/lib/common/framing/AMQFrame.h | 9 ++++++++- cpp/lib/common/framing/AMQRequestBody.cpp | 6 ++++++ cpp/lib/common/framing/AMQRequestBody.h | 6 ++++++ cpp/lib/common/framing/AMQResponseBody.cpp | 5 +++++ cpp/lib/common/framing/AMQResponseBody.h | 7 +++++++ cpp/lib/common/framing/ProtocolVersionException.h | 11 ++++------- cpp/lib/common/framing/Requester.h | 2 +- 7 files changed, 37 insertions(+), 9 deletions(-) (limited to 'cpp/lib/common') diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index a927481e77..c27de70e5a 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -21,7 +21,8 @@ * under the License. * */ -/*#include */ +#include + #include #include #include @@ -50,6 +51,12 @@ class AMQFrame : virtual public AMQDataBlock u_int16_t getChannel(); AMQBody::shared_ptr getBody(); + /** Convenience template to cast the body to an expected type */ + template boost::shared_ptr castBody() { + assert(dynamic_cast(getBody().get())); + boost::static_pointer_cast(getBody()); + } + u_int32_t decodeHead(Buffer& buffer); void decodeBody(Buffer& buffer, uint32_t size); diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp index a5e6ce6974..b18671c711 100644 --- a/cpp/lib/common/framing/AMQRequestBody.cpp +++ b/cpp/lib/common/framing/AMQRequestBody.cpp @@ -55,4 +55,10 @@ AMQRequestBody::create( return AMQRequestBody::shared_ptr(body); } +void AMQRequestBody::printPrefix(std::ostream& out) const { + out << "request(id=" << data.requestId << ",mark=" + << data.responseMark << "): "; +} + }} // namespace qpid::framing + diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h index 74aa398606..1a1d3db0e7 100644 --- a/cpp/lib/common/framing/AMQRequestBody.h +++ b/cpp/lib/common/framing/AMQRequestBody.h @@ -42,6 +42,10 @@ class AMQRequestBody : public AMQMethodBody ResponseId responseMark; }; + static Data& getData(const AMQBody::shared_ptr& body) { + return boost::dynamic_pointer_cast(body)->getData(); + } + static shared_ptr create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer); @@ -52,6 +56,7 @@ class AMQRequestBody : public AMQMethodBody u_int8_t type() const { return REQUEST_BODY; } void encode(Buffer& buffer) const; + Data& getData() { return data; } RequestId getRequestId() const { return data.requestId; } void setRequestId(RequestId id) { data.requestId=id; } ResponseId getResponseMark() const { return data.responseMark; } @@ -59,6 +64,7 @@ class AMQRequestBody : public AMQMethodBody protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+16; } + void printPrefix(std::ostream& out) const; private: Data data; diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp index 49fdea9242..c64b1325d6 100644 --- a/cpp/lib/common/framing/AMQResponseBody.cpp +++ b/cpp/lib/common/framing/AMQResponseBody.cpp @@ -56,5 +56,10 @@ AMQResponseBody::shared_ptr AMQResponseBody::create( return AMQResponseBody::shared_ptr(body); } +void AMQResponseBody::printPrefix(std::ostream& out) const { + out << "response(id=" << data.responseId << ",request=" << data.requestId + << ",batch=" << data.batchOffset << "): "; +} + }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h index d7095d3da0..6528613a12 100644 --- a/cpp/lib/common/framing/AMQResponseBody.h +++ b/cpp/lib/common/framing/AMQResponseBody.h @@ -46,6 +46,10 @@ class AMQResponseBody : public AMQMethodBody u_int32_t batchOffset; }; + static Data& getData(const AMQBody::shared_ptr& body) { + return boost::dynamic_pointer_cast(body)->getData(); + } + static shared_ptr create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer); @@ -57,12 +61,15 @@ class AMQResponseBody : public AMQMethodBody u_int8_t type() const { return RESPONSE_BODY; } void encode(Buffer& buffer) const; + Data& getData() { return data; } ResponseId getResponseId() { return data.responseId; } RequestId getRequestId() { return data.requestId; } BatchOffset getBatchOffset() { return data.batchOffset; } protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } + void printPrefix(std::ostream& out) const; + private: Data data; }; diff --git a/cpp/lib/common/framing/ProtocolVersionException.h b/cpp/lib/common/framing/ProtocolVersionException.h index 4494d87064..32b5bc7ef4 100644 --- a/cpp/lib/common/framing/ProtocolVersionException.h +++ b/cpp/lib/common/framing/ProtocolVersionException.h @@ -27,12 +27,10 @@ #include #include -namespace qpid -{ -namespace framing -{ +namespace qpid { +namespace framing { -class ProtocolVersionException : virtual public qpid::Exception +class ProtocolVersionException : public qpid::Exception { protected: ProtocolVersion versionFound; @@ -49,7 +47,6 @@ public: virtual std::string toString() const throw(); }; // class ProtocolVersionException -} // namespace framing -} // namespace qpid +}} // namespace qpid::framing #endif //ifndef _ProtocolVersionException_ diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h index e24848f98a..562ba681c1 100644 --- a/cpp/lib/common/framing/Requester.h +++ b/cpp/lib/common/framing/Requester.h @@ -45,7 +45,7 @@ class Requester /** Called after processing a response. */ void processed(const AMQResponseBody::Data&); - + private: std::set requests; /** Sent but not responded to */ RequestId lastId; -- cgit v1.2.1