diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-29 16:13:24 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-29 16:13:24 +0000 |
| commit | 5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d (patch) | |
| tree | f9a982b65400154a86edd02faf75da143a96404c /cpp/lib/common | |
| parent | 5d28464c46c1e64ded078a4585f0f49e30b8b5d6 (diff) | |
| download | qpid-python-5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d.tar.gz | |
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side.
* Improved various exception messages with boost::format messages.
* Removed unnecssary virtual inheritance.
* Widespread: fixed incorrect non-const ProtocolVersion& parameters.
* Client API: pass channels by reference, not pointer.
* codegen:
- MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template.
- Various: fixed non-const ProtocolVersion& parameters.
* cpp/bootstrap: Allow config arguments with -build.
* cpp/gen/Makefile.am: Merged codegen fixes from trunk.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/common')
| -rw-r--r-- | cpp/lib/common/framing/AMQFrame.cpp | 13 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQFrame.h | 2 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQMethodBody.cpp | 8 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQMethodBody.h | 14 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQRequestBody.cpp | 2 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQResponseBody.cpp | 2 | ||||
| -rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.cpp | 43 | ||||
| -rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.h | 41 | ||||
| -rw-r--r-- | cpp/lib/common/framing/ProtocolInitiation.h | 2 | ||||
| -rw-r--r-- | cpp/lib/common/framing/amqp_types.h | 3 | ||||
| -rw-r--r-- | cpp/lib/common/sys/apr/APRSocket.cpp | 2 | ||||
| -rw-r--r-- | cpp/lib/common/sys/apr/APRSocket.h | 2 |
12 files changed, 93 insertions, 41 deletions
diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp index c6837af668..f1461b6bb3 100644 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ b/cpp/lib/common/framing/AMQFrame.cpp @@ -19,6 +19,8 @@ * under the License. * */ +#include <boost/format.hpp> + #include <AMQFrame.h> #include <QpidError.h> #include "AMQRequestBody.h" @@ -67,10 +69,10 @@ u_int32_t AMQFrame::size() const{ bool AMQFrame::decode(Buffer& buffer) { - if(buffer.available() < 7) return false; + if(buffer.available() < 7) + return false; buffer.record(); u_int32_t frameSize = decodeHead(buffer); - if(buffer.available() < frameSize + 1){ buffer.restore(); return false; @@ -110,10 +112,9 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) body = AMQBody::shared_ptr(new AMQHeartbeatBody()); break; default: - assert(0); - string msg("Unknown body type: "); - msg += type; - THROW_QPID_ERROR(FRAMING_ERROR, msg); + THROW_QPID_ERROR( + FRAMING_ERROR, + (boost::format("Unknown frame type %d")%type).str()); } body->decode(buffer, size); } diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index f3c3232d56..15b294a373 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -38,7 +38,7 @@ namespace qpid { namespace framing { -class AMQFrame : virtual public AMQDataBlock +class AMQFrame : public AMQDataBlock { public: AMQFrame(const qpid::framing::ProtocolVersion& _version = highestProtocolVersion); diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp index 73b729b945..17138a401e 100644 --- a/cpp/lib/common/framing/AMQMethodBody.cpp +++ b/cpp/lib/common/framing/AMQMethodBody.cpp @@ -31,10 +31,6 @@ void AMQMethodBody::encodeId(Buffer& buffer) const{ buffer.putShort(amqpMethodId()); } -bool AMQMethodBody::match(AMQMethodBody* other) const{ - return other != 0 && other->amqpClassId() == amqpClassId() && other->amqpMethodId() == amqpMethodId(); -} - void AMQMethodBody::invoke(AMQP_ServerOperations&, const MethodContext&){ assert(0); THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server."); @@ -44,14 +40,14 @@ AMQMethodBody::shared_ptr AMQMethodBody::create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer) { - MethodId id; + ClassMethodId id; id.decode(buffer); return AMQMethodBody::shared_ptr( versionMap.createMethodBody( id.classId, id.methodId, version.getMajor(), version.getMinor())); } -void AMQMethodBody::MethodId::decode(Buffer& buffer) { +void AMQMethodBody::ClassMethodId::decode(Buffer& buffer) { classId = buffer.getShort(); methodId = buffer.getShort(); } diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h index ff09ee60e1..d9f028c786 100644 --- a/cpp/lib/common/framing/AMQMethodBody.h +++ b/cpp/lib/common/framing/AMQMethodBody.h @@ -48,11 +48,16 @@ class AMQMethodBody : public AMQBody virtual ~AMQMethodBody() {} void decode(Buffer&, u_int32_t); - virtual u_int16_t amqpMethodId() const = 0; - virtual u_int16_t amqpClassId() const = 0; + virtual MethodId amqpMethodId() const = 0; + virtual ClassId amqpClassId() const = 0; + virtual void invoke(AMQP_ServerOperations&, const MethodContext&); - bool match(AMQMethodBody* other) const; + // FIXME aconway 2007-01-24: remove match, use isA + bool match(AMQMethodBody* other) const; + template <class T> bool isA() { + return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID; + } /** * Wrap this method in a frame and send using the current context. @@ -63,7 +68,7 @@ class AMQMethodBody : public AMQBody protected: static u_int32_t baseSize() { return 4; } - struct MethodId { + struct ClassMethodId { u_int16_t classId; u_int16_t methodId; void decode(Buffer& b); @@ -74,6 +79,7 @@ class AMQMethodBody : public AMQBody virtual void decodeContent(Buffer& buffer) = 0; }; + }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp index e711d4c0a1..54e1c11863 100644 --- a/cpp/lib/common/framing/AMQRequestBody.cpp +++ b/cpp/lib/common/framing/AMQRequestBody.cpp @@ -45,7 +45,7 @@ AMQRequestBody::create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer) { - MethodId id; + ClassMethodId id; Data data; data.decode(buffer); id.decode(buffer); diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp index dffbb62aca..da70e24cd4 100644 --- a/cpp/lib/common/framing/AMQResponseBody.cpp +++ b/cpp/lib/common/framing/AMQResponseBody.cpp @@ -45,7 +45,7 @@ AMQResponseBody::shared_ptr AMQResponseBody::create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer) { - MethodId id; + ClassMethodId id; Data data; data.decode(buffer); id.decode(buffer); diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp index 7685240826..59dc93c287 100644 --- a/cpp/lib/common/framing/ChannelAdapter.cpp +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -22,7 +22,17 @@ namespace qpid { namespace framing { +void ChannelAdapter::init( + ChannelId i, OutputHandler& o, const ProtocolVersion& v) +{ + assertChannelNotOpen(); + id = i; + out = &o; + version = v; +} + void ChannelAdapter::send(AMQFrame* frame) { + assertChannelOpen(); AMQBody::shared_ptr body = frame->getBody(); switch (body->type()) { case REQUEST_BODY: { @@ -38,33 +48,52 @@ void ChannelAdapter::send(AMQFrame* frame) { break; } } - out.send(frame); + out->send(frame); +} + +void ChannelAdapter::send(AMQBody::shared_ptr body) { + send(new AMQFrame(getVersion(), getId(), body)); } void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { + assertMethodOk(*request); responder.received(request->getData()); MethodContext context(id, this, request->getRequestId()); handleMethodInContext(request, context); } void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { + assertMethodOk(*response); handleMethod(response); requester.processed(response->getData()); } void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) { + assertMethodOk(*method); MethodContext context(id, this); handleMethodInContext(method, context); } -void ChannelAdapter::assertChannelZero(u_int16_t id) { - if (id != 0) - throw ConnectionException(504, "Invalid channel id, not 0"); +void ChannelAdapter::assertMethodOk(AMQMethodBody& /*method*/) const { + // No connection methods allowed on a non-zero channel + // Subclass ChannelZero overrides for 0 channels. + // FIXME aconway 2007-01-25: with ctors +// assertChannelOpen(); +// if (method.amqpClassId() == ConnectionOpenBody::CLASS_ID) +// throw ConnectionException( +// 504, "Connection method on non-0 channel."); +} + +void ChannelAdapter::assertChannelOpen() const { + // FIXME aconway 2007-01-25: with ctors +// if (!isOpen()) +// throw ConnectionException(504, "Channel is not open"); } -void ChannelAdapter::assertChannelNonZero(u_int16_t id) { - if (id == 0) - throw ConnectionException(504, "Invalid channel id 0"); +void ChannelAdapter::assertChannelNotOpen() const { + // FIXME aconway 2007-01-25: with ctors +// if (isOpen()) +// throw ConnectionException(504, "Channel is already open"); } }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h index 0652cc41bb..60615740ad 100644 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -38,22 +38,29 @@ class MethodContext; * Base class for client and broker channel adapters. * * As BodyHandler: - * - Creates MethodContext and dispatches methods+context to derived class. - * - Updates request/response ID data. + * - receives frame bodies from the network. + * - Updates request/response data. + * - Dispatches requests with a MethodContext for responses. * * As OutputHandler: * - Updates request/resposne ID data. - * + * - Forwards frame to the peer. + * + * Thread safety: OBJECT UNSAFE. Instances must not be called + * concurrently. AMQP defines channels to be serialized. */ class ChannelAdapter : public BodyHandler, public OutputHandler { public: /** *@param output Processed frames are forwarded to this handler. */ - ChannelAdapter(OutputHandler& output, ChannelId channelId) - : id(channelId), out(output) {} + ChannelAdapter() : id(0), out(0) {} - ChannelId getId() { return id; } + /** Initialize the channel adapter. */ + void init(ChannelId, OutputHandler&, const ProtocolVersion&); + + ChannelId getId() const { return id; } + const ProtocolVersion& getVersion() const { return version; } /** * Do request/response-id processing and then forward to @@ -61,27 +68,37 @@ class ChannelAdapter : public BodyHandler, public OutputHandler { * have their request-id set before calling send. */ void send(AMQFrame* frame); + /** + * Wrap body in a frame and send the frame. + * Takes ownership of body. + */ + void send(AMQBody::shared_ptr body); + void send(AMQBody* body) { send(AMQBody::shared_ptr(body)); } void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>); void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>); void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>); + virtual bool isOpen() const = 0; + protected: - /** Throw protocol exception if this is not channel 0. */ - static void assertChannelZero(u_int16_t id); - /** Throw protocol exception if this is channel 0. */ - static void assertChannelNonZero(u_int16_t id); + void assertMethodOk(AMQMethodBody& method) const; + void assertChannelOpen() const; + void assertChannelNotOpen() const; virtual void handleMethodInContext( boost::shared_ptr<qpid::framing::AMQMethodBody> method, const MethodContext& context) = 0; - ChannelId id; + RequestId getRequestInProgress() { return requestInProgress; } private: + ChannelId id; + OutputHandler* out; + ProtocolVersion version; Requester requester; Responder responder; - OutputHandler& out; + RequestId requestInProgress; // TODO aconway 2007-01-24: use it. }; }} diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h index 003c3bba81..6b3dbac88d 100644 --- a/cpp/lib/common/framing/ProtocolInitiation.h +++ b/cpp/lib/common/framing/ProtocolInitiation.h @@ -29,7 +29,7 @@ namespace qpid { namespace framing { -class ProtocolInitiation : virtual public AMQDataBlock +class ProtocolInitiation : public AMQDataBlock { private: ProtocolVersion version; diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h index 4fac13e93b..777d9e7bc5 100644 --- a/cpp/lib/common/framing/amqp_types.h +++ b/cpp/lib/common/framing/amqp_types.h @@ -40,6 +40,9 @@ typedef u_int16_t ChannelId; typedef u_int64_t RequestId; typedef u_int64_t ResponseId; typedef u_int32_t BatchOffset; +typedef u_int16_t ClassId; +typedef u_int16_t MethodId; +typedef u_int16_t ReplyCode; }} // namespace qpid::framing #endif diff --git a/cpp/lib/common/sys/apr/APRSocket.cpp b/cpp/lib/common/sys/apr/APRSocket.cpp index 4917803370..621a66b919 100644 --- a/cpp/lib/common/sys/apr/APRSocket.cpp +++ b/cpp/lib/common/sys/apr/APRSocket.cpp @@ -59,7 +59,7 @@ void APRSocket::close(){ } } -bool APRSocket::isOpen(){ +bool APRSocket::isOpen() const { return !closed; } diff --git a/cpp/lib/common/sys/apr/APRSocket.h b/cpp/lib/common/sys/apr/APRSocket.h index 53f1055c6a..d4274300f2 100644 --- a/cpp/lib/common/sys/apr/APRSocket.h +++ b/cpp/lib/common/sys/apr/APRSocket.h @@ -36,7 +36,7 @@ namespace sys { void read(qpid::framing::Buffer& b); void write(qpid::framing::Buffer& b); void close(); - bool isOpen(); + bool isOpen() const; u_int8_t read(); ~APRSocket(); }; |
