diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-15 21:56:23 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-15 21:56:23 +0000 |
| commit | ef1469a7ea1f54f266aee8f2899b7cd0c7e07d08 (patch) | |
| tree | 3b69ec6c589ff8edd628f2e218589180cbca005b /cpp/lib/client | |
| parent | 5aaad510dc978dc09f92c774c81255b7af6b8b68 (diff) | |
| download | qpid-python-ef1469a7ea1f54f266aee8f2899b7cd0c7e07d08.tar.gz | |
* Client & broker using Requester/Responder to manage request/response IDs.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client')
| -rw-r--r-- | cpp/lib/client/Connection.cpp | 18 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.h | 5 | ||||
| -rw-r--r-- | cpp/lib/client/Connector.cpp | 19 | ||||
| -rw-r--r-- | cpp/lib/client/Connector.h | 109 |
4 files changed, 98 insertions, 53 deletions
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index ad8aa1d0dd..10a0b50aad 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -32,10 +32,14 @@ using namespace qpid::sys; u_int16_t Connection::channelIdCounter; -Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true), +Connection::Connection( + bool debug, u_int32_t _max_frame_size, + qpid::framing::ProtocolVersion* _version +) : max_frame_size(_max_frame_size), closed(true), version(_version->getMajor(),_version->getMinor()) { - connector = new Connector(version, debug, _max_frame_size); + connector = new Connector( + version, requester, responder, debug, _max_frame_size); } Connection::~Connection(){ @@ -152,6 +156,16 @@ void Connection::removeChannel(Channel* channel){ } void Connection::received(AMQFrame* frame){ + AMQBody::shared_ptr body = frame->getBody(); + u_int8_t type = body->type(); + if (type == REQUEST_BODY) + responder.received(AMQRequestBody::getData(body)); + handleFrame(frame); + if (type == RESPONSE_BODY) + requester.processed(AMQResponseBody::getData(body)); +} + +void Connection::handleFrame(AMQFrame* frame){ u_int16_t channelId = frame->getChannel(); if(channelId == 0){ diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 37e65e6099..21e2fb90a2 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -37,6 +37,8 @@ #include <ClientQueue.h> #include <ResponseHandler.h> #include <AMQP_HighestVersion.h> +#include "Requester.h" +#include "Responder.h" namespace qpid { @@ -79,6 +81,8 @@ namespace client { ResponseHandler responses; volatile bool closed; qpid::framing::ProtocolVersion version; + qpid::framing::Requester requester; + qpid::framing::Responder responder; void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); void error(int code, const std::string& msg, int classid = 0, int methodid = 0); @@ -89,6 +93,7 @@ namespace client { virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + void handleFrame(qpid::framing::AMQFrame* frame); public: /** diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index b34e66fd94..d05540ba32 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -22,13 +22,17 @@ #include <QpidError.h> #include <sys/Time.h> #include "Connector.h" +#include "Requester.h" +#include "Responder.h" using namespace qpid::sys; using namespace qpid::client; using namespace qpid::framing; using qpid::QpidError; -Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) : +Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, + Requester& req, Responder& resp, + bool _debug, u_int32_t buffer_size) : debug(_debug), receive_buffer_size(buffer_size), send_buffer_size(buffer_size), @@ -40,7 +44,10 @@ Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug timeoutHandler(0), shutdownHandler(0), inbuf(receive_buffer_size), - outbuf(send_buffer_size){ } + outbuf(send_buffer_size), + requester(req), + responder(resp) +{ } Connector::~Connector(){ } @@ -75,7 +82,13 @@ OutputHandler* Connector::getOutputHandler(){ } void Connector::send(AMQFrame* frame){ - writeBlock(frame); + AMQBody::shared_ptr body = frame->getBody(); + u_int8_t type = body->type(); + if (type == REQUEST_BODY) + requester.sending(AMQRequestBody::getData(body)); + else if (type == RESPONSE_BODY) + responder.sending(AMQResponseBody::getData(body)); + writeBlock(frame); if(debug) std::cout << "SENT: " << *frame << std::endl; delete frame; } diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index f9e50f3216..02926b2bdb 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -34,60 +34,73 @@ #include <sys/Socket.h> namespace qpid { + +namespace framing { + +class Requester; +class Responder; + +} // namespace framing + namespace client { - class Connector : public qpid::framing::OutputHandler, - private qpid::sys::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - qpid::framing::ProtocolVersion version; - - bool closed; - - int64_t lastIn; - int64_t lastOut; - int64_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - qpid::sys::TimeoutHandler* timeoutHandler; - qpid::sys::ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; +class Connector : public qpid::framing::OutputHandler, + private qpid::sys::Runnable +{ + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + qpid::framing::ProtocolVersion version; + + bool closed; + + int64_t lastIn; + int64_t lastOut; + int64_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + qpid::sys::TimeoutHandler* timeoutHandler; + qpid::sys::ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::sys::Mutex writeLock; + qpid::sys::Thread receiver; - qpid::sys::Mutex writeLock; - qpid::sys::Thread receiver; + qpid::sys::Socket socket; - qpid::sys::Socket socket; + qpid::framing::Requester& requester; + qpid::framing::Responder& responder; - void checkIdle(ssize_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - void handleClosed(); - - public: - Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); - virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; + void checkIdle(ssize_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + void handleClosed(); + + public: + Connector(const qpid::framing::ProtocolVersion& pVersion, + qpid::framing::Requester& req, qpid::framing::Responder& resp, + bool debug = false, u_int32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); + virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); +}; } } |
