summaryrefslogtreecommitdiff
path: root/cpp/lib/common
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-29 16:13:24 +0000
committerAlan Conway <aconway@apache.org>2007-01-29 16:13:24 +0000
commit5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d (patch)
treef9a982b65400154a86edd02faf75da143a96404c /cpp/lib/common
parent5d28464c46c1e64ded078a4585f0f49e30b8b5d6 (diff)
downloadqpid-python-5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d.tar.gz
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side. * Improved various exception messages with boost::format messages. * Removed unnecssary virtual inheritance. * Widespread: fixed incorrect non-const ProtocolVersion& parameters. * Client API: pass channels by reference, not pointer. * codegen: - MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template. - Various: fixed non-const ProtocolVersion& parameters. * cpp/bootstrap: Allow config arguments with -build. * cpp/gen/Makefile.am: Merged codegen fixes from trunk. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/common')
-rw-r--r--cpp/lib/common/framing/AMQFrame.cpp13
-rw-r--r--cpp/lib/common/framing/AMQFrame.h2
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.cpp8
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.h14
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.cpp2
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.cpp2
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.cpp43
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.h41
-rw-r--r--cpp/lib/common/framing/ProtocolInitiation.h2
-rw-r--r--cpp/lib/common/framing/amqp_types.h3
-rw-r--r--cpp/lib/common/sys/apr/APRSocket.cpp2
-rw-r--r--cpp/lib/common/sys/apr/APRSocket.h2
12 files changed, 93 insertions, 41 deletions
diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp
index c6837af668..f1461b6bb3 100644
--- a/cpp/lib/common/framing/AMQFrame.cpp
+++ b/cpp/lib/common/framing/AMQFrame.cpp
@@ -19,6 +19,8 @@
* under the License.
*
*/
+#include <boost/format.hpp>
+
#include <AMQFrame.h>
#include <QpidError.h>
#include "AMQRequestBody.h"
@@ -67,10 +69,10 @@ u_int32_t AMQFrame::size() const{
bool AMQFrame::decode(Buffer& buffer)
{
- if(buffer.available() < 7) return false;
+ if(buffer.available() < 7)
+ return false;
buffer.record();
u_int32_t frameSize = decodeHead(buffer);
-
if(buffer.available() < frameSize + 1){
buffer.restore();
return false;
@@ -110,10 +112,9 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size)
body = AMQBody::shared_ptr(new AMQHeartbeatBody());
break;
default:
- assert(0);
- string msg("Unknown body type: ");
- msg += type;
- THROW_QPID_ERROR(FRAMING_ERROR, msg);
+ THROW_QPID_ERROR(
+ FRAMING_ERROR,
+ (boost::format("Unknown frame type %d")%type).str());
}
body->decode(buffer, size);
}
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
index f3c3232d56..15b294a373 100644
--- a/cpp/lib/common/framing/AMQFrame.h
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -38,7 +38,7 @@ namespace qpid {
namespace framing {
-class AMQFrame : virtual public AMQDataBlock
+class AMQFrame : public AMQDataBlock
{
public:
AMQFrame(const qpid::framing::ProtocolVersion& _version = highestProtocolVersion);
diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp
index 73b729b945..17138a401e 100644
--- a/cpp/lib/common/framing/AMQMethodBody.cpp
+++ b/cpp/lib/common/framing/AMQMethodBody.cpp
@@ -31,10 +31,6 @@ void AMQMethodBody::encodeId(Buffer& buffer) const{
buffer.putShort(amqpMethodId());
}
-bool AMQMethodBody::match(AMQMethodBody* other) const{
- return other != 0 && other->amqpClassId() == amqpClassId() && other->amqpMethodId() == amqpMethodId();
-}
-
void AMQMethodBody::invoke(AMQP_ServerOperations&, const MethodContext&){
assert(0);
THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server.");
@@ -44,14 +40,14 @@ AMQMethodBody::shared_ptr AMQMethodBody::create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer)
{
- MethodId id;
+ ClassMethodId id;
id.decode(buffer);
return AMQMethodBody::shared_ptr(
versionMap.createMethodBody(
id.classId, id.methodId, version.getMajor(), version.getMinor()));
}
-void AMQMethodBody::MethodId::decode(Buffer& buffer) {
+void AMQMethodBody::ClassMethodId::decode(Buffer& buffer) {
classId = buffer.getShort();
methodId = buffer.getShort();
}
diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h
index ff09ee60e1..d9f028c786 100644
--- a/cpp/lib/common/framing/AMQMethodBody.h
+++ b/cpp/lib/common/framing/AMQMethodBody.h
@@ -48,11 +48,16 @@ class AMQMethodBody : public AMQBody
virtual ~AMQMethodBody() {}
void decode(Buffer&, u_int32_t);
- virtual u_int16_t amqpMethodId() const = 0;
- virtual u_int16_t amqpClassId() const = 0;
+ virtual MethodId amqpMethodId() const = 0;
+ virtual ClassId amqpClassId() const = 0;
+
virtual void invoke(AMQP_ServerOperations&, const MethodContext&);
- bool match(AMQMethodBody* other) const;
+ // FIXME aconway 2007-01-24: remove match, use isA
+ bool match(AMQMethodBody* other) const;
+ template <class T> bool isA() {
+ return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID;
+ }
/**
* Wrap this method in a frame and send using the current context.
@@ -63,7 +68,7 @@ class AMQMethodBody : public AMQBody
protected:
static u_int32_t baseSize() { return 4; }
- struct MethodId {
+ struct ClassMethodId {
u_int16_t classId;
u_int16_t methodId;
void decode(Buffer& b);
@@ -74,6 +79,7 @@ class AMQMethodBody : public AMQBody
virtual void decodeContent(Buffer& buffer) = 0;
};
+
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp
index e711d4c0a1..54e1c11863 100644
--- a/cpp/lib/common/framing/AMQRequestBody.cpp
+++ b/cpp/lib/common/framing/AMQRequestBody.cpp
@@ -45,7 +45,7 @@ AMQRequestBody::create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer)
{
- MethodId id;
+ ClassMethodId id;
Data data;
data.decode(buffer);
id.decode(buffer);
diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp
index dffbb62aca..da70e24cd4 100644
--- a/cpp/lib/common/framing/AMQResponseBody.cpp
+++ b/cpp/lib/common/framing/AMQResponseBody.cpp
@@ -45,7 +45,7 @@ AMQResponseBody::shared_ptr AMQResponseBody::create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer)
{
- MethodId id;
+ ClassMethodId id;
Data data;
data.decode(buffer);
id.decode(buffer);
diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp
index 7685240826..59dc93c287 100644
--- a/cpp/lib/common/framing/ChannelAdapter.cpp
+++ b/cpp/lib/common/framing/ChannelAdapter.cpp
@@ -22,7 +22,17 @@
namespace qpid {
namespace framing {
+void ChannelAdapter::init(
+ ChannelId i, OutputHandler& o, const ProtocolVersion& v)
+{
+ assertChannelNotOpen();
+ id = i;
+ out = &o;
+ version = v;
+}
+
void ChannelAdapter::send(AMQFrame* frame) {
+ assertChannelOpen();
AMQBody::shared_ptr body = frame->getBody();
switch (body->type()) {
case REQUEST_BODY: {
@@ -38,33 +48,52 @@ void ChannelAdapter::send(AMQFrame* frame) {
break;
}
}
- out.send(frame);
+ out->send(frame);
+}
+
+void ChannelAdapter::send(AMQBody::shared_ptr body) {
+ send(new AMQFrame(getVersion(), getId(), body));
}
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
+ assertMethodOk(*request);
responder.received(request->getData());
MethodContext context(id, this, request->getRequestId());
handleMethodInContext(request, context);
}
void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
+ assertMethodOk(*response);
handleMethod(response);
requester.processed(response->getData());
}
void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) {
+ assertMethodOk(*method);
MethodContext context(id, this);
handleMethodInContext(method, context);
}
-void ChannelAdapter::assertChannelZero(u_int16_t id) {
- if (id != 0)
- throw ConnectionException(504, "Invalid channel id, not 0");
+void ChannelAdapter::assertMethodOk(AMQMethodBody& /*method*/) const {
+ // No connection methods allowed on a non-zero channel
+ // Subclass ChannelZero overrides for 0 channels.
+ // FIXME aconway 2007-01-25: with ctors
+// assertChannelOpen();
+// if (method.amqpClassId() == ConnectionOpenBody::CLASS_ID)
+// throw ConnectionException(
+// 504, "Connection method on non-0 channel.");
+}
+
+void ChannelAdapter::assertChannelOpen() const {
+ // FIXME aconway 2007-01-25: with ctors
+// if (!isOpen())
+// throw ConnectionException(504, "Channel is not open");
}
-void ChannelAdapter::assertChannelNonZero(u_int16_t id) {
- if (id == 0)
- throw ConnectionException(504, "Invalid channel id 0");
+void ChannelAdapter::assertChannelNotOpen() const {
+ // FIXME aconway 2007-01-25: with ctors
+// if (isOpen())
+// throw ConnectionException(504, "Channel is already open");
}
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h
index 0652cc41bb..60615740ad 100644
--- a/cpp/lib/common/framing/ChannelAdapter.h
+++ b/cpp/lib/common/framing/ChannelAdapter.h
@@ -38,22 +38,29 @@ class MethodContext;
* Base class for client and broker channel adapters.
*
* As BodyHandler:
- * - Creates MethodContext and dispatches methods+context to derived class.
- * - Updates request/response ID data.
+ * - receives frame bodies from the network.
+ * - Updates request/response data.
+ * - Dispatches requests with a MethodContext for responses.
*
* As OutputHandler:
* - Updates request/resposne ID data.
- *
+ * - Forwards frame to the peer.
+ *
+ * Thread safety: OBJECT UNSAFE. Instances must not be called
+ * concurrently. AMQP defines channels to be serialized.
*/
class ChannelAdapter : public BodyHandler, public OutputHandler {
public:
/**
*@param output Processed frames are forwarded to this handler.
*/
- ChannelAdapter(OutputHandler& output, ChannelId channelId)
- : id(channelId), out(output) {}
+ ChannelAdapter() : id(0), out(0) {}
- ChannelId getId() { return id; }
+ /** Initialize the channel adapter. */
+ void init(ChannelId, OutputHandler&, const ProtocolVersion&);
+
+ ChannelId getId() const { return id; }
+ const ProtocolVersion& getVersion() const { return version; }
/**
* Do request/response-id processing and then forward to
@@ -61,27 +68,37 @@ class ChannelAdapter : public BodyHandler, public OutputHandler {
* have their request-id set before calling send.
*/
void send(AMQFrame* frame);
+ /**
+ * Wrap body in a frame and send the frame.
+ * Takes ownership of body.
+ */
+ void send(AMQBody::shared_ptr body);
+ void send(AMQBody* body) { send(AMQBody::shared_ptr(body)); }
void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>);
void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>);
+ virtual bool isOpen() const = 0;
+
protected:
- /** Throw protocol exception if this is not channel 0. */
- static void assertChannelZero(u_int16_t id);
- /** Throw protocol exception if this is channel 0. */
- static void assertChannelNonZero(u_int16_t id);
+ void assertMethodOk(AMQMethodBody& method) const;
+ void assertChannelOpen() const;
+ void assertChannelNotOpen() const;
virtual void handleMethodInContext(
boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const MethodContext& context) = 0;
- ChannelId id;
+ RequestId getRequestInProgress() { return requestInProgress; }
private:
+ ChannelId id;
+ OutputHandler* out;
+ ProtocolVersion version;
Requester requester;
Responder responder;
- OutputHandler& out;
+ RequestId requestInProgress; // TODO aconway 2007-01-24: use it.
};
}}
diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h
index 003c3bba81..6b3dbac88d 100644
--- a/cpp/lib/common/framing/ProtocolInitiation.h
+++ b/cpp/lib/common/framing/ProtocolInitiation.h
@@ -29,7 +29,7 @@
namespace qpid {
namespace framing {
-class ProtocolInitiation : virtual public AMQDataBlock
+class ProtocolInitiation : public AMQDataBlock
{
private:
ProtocolVersion version;
diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h
index 4fac13e93b..777d9e7bc5 100644
--- a/cpp/lib/common/framing/amqp_types.h
+++ b/cpp/lib/common/framing/amqp_types.h
@@ -40,6 +40,9 @@ typedef u_int16_t ChannelId;
typedef u_int64_t RequestId;
typedef u_int64_t ResponseId;
typedef u_int32_t BatchOffset;
+typedef u_int16_t ClassId;
+typedef u_int16_t MethodId;
+typedef u_int16_t ReplyCode;
}} // namespace qpid::framing
#endif
diff --git a/cpp/lib/common/sys/apr/APRSocket.cpp b/cpp/lib/common/sys/apr/APRSocket.cpp
index 4917803370..621a66b919 100644
--- a/cpp/lib/common/sys/apr/APRSocket.cpp
+++ b/cpp/lib/common/sys/apr/APRSocket.cpp
@@ -59,7 +59,7 @@ void APRSocket::close(){
}
}
-bool APRSocket::isOpen(){
+bool APRSocket::isOpen() const {
return !closed;
}
diff --git a/cpp/lib/common/sys/apr/APRSocket.h b/cpp/lib/common/sys/apr/APRSocket.h
index 53f1055c6a..d4274300f2 100644
--- a/cpp/lib/common/sys/apr/APRSocket.h
+++ b/cpp/lib/common/sys/apr/APRSocket.h
@@ -36,7 +36,7 @@ namespace sys {
void read(qpid::framing::Buffer& b);
void write(qpid::framing::Buffer& b);
void close();
- bool isOpen();
+ bool isOpen() const;
u_int8_t read();
~APRSocket();
};