summaryrefslogtreecommitdiff
path: root/cpp/lib/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-15 21:56:23 +0000
committerAlan Conway <aconway@apache.org>2007-01-15 21:56:23 +0000
commitef1469a7ea1f54f266aee8f2899b7cd0c7e07d08 (patch)
tree3b69ec6c589ff8edd628f2e218589180cbca005b /cpp/lib/client
parent5aaad510dc978dc09f92c774c81255b7af6b8b68 (diff)
downloadqpid-python-ef1469a7ea1f54f266aee8f2899b7cd0c7e07d08.tar.gz
* Client & broker using Requester/Responder to manage request/response IDs.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client')
-rw-r--r--cpp/lib/client/Connection.cpp18
-rw-r--r--cpp/lib/client/Connection.h5
-rw-r--r--cpp/lib/client/Connector.cpp19
-rw-r--r--cpp/lib/client/Connector.h109
4 files changed, 98 insertions, 53 deletions
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index ad8aa1d0dd..10a0b50aad 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -32,10 +32,14 @@ using namespace qpid::sys;
u_int16_t Connection::channelIdCounter;
-Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true),
+Connection::Connection(
+ bool debug, u_int32_t _max_frame_size,
+ qpid::framing::ProtocolVersion* _version
+) : max_frame_size(_max_frame_size), closed(true),
version(_version->getMajor(),_version->getMinor())
{
- connector = new Connector(version, debug, _max_frame_size);
+ connector = new Connector(
+ version, requester, responder, debug, _max_frame_size);
}
Connection::~Connection(){
@@ -152,6 +156,16 @@ void Connection::removeChannel(Channel* channel){
}
void Connection::received(AMQFrame* frame){
+ AMQBody::shared_ptr body = frame->getBody();
+ u_int8_t type = body->type();
+ if (type == REQUEST_BODY)
+ responder.received(AMQRequestBody::getData(body));
+ handleFrame(frame);
+ if (type == RESPONSE_BODY)
+ requester.processed(AMQResponseBody::getData(body));
+}
+
+void Connection::handleFrame(AMQFrame* frame){
u_int16_t channelId = frame->getChannel();
if(channelId == 0){
diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h
index 37e65e6099..21e2fb90a2 100644
--- a/cpp/lib/client/Connection.h
+++ b/cpp/lib/client/Connection.h
@@ -37,6 +37,8 @@
#include <ClientQueue.h>
#include <ResponseHandler.h>
#include <AMQP_HighestVersion.h>
+#include "Requester.h"
+#include "Responder.h"
namespace qpid {
@@ -79,6 +81,8 @@ namespace client {
ResponseHandler responses;
volatile bool closed;
qpid::framing::ProtocolVersion version;
+ qpid::framing::Requester requester;
+ qpid::framing::Responder responder;
void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
@@ -89,6 +93,7 @@ namespace client {
virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+ void handleFrame(qpid::framing::AMQFrame* frame);
public:
/**
diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp
index b34e66fd94..d05540ba32 100644
--- a/cpp/lib/client/Connector.cpp
+++ b/cpp/lib/client/Connector.cpp
@@ -22,13 +22,17 @@
#include <QpidError.h>
#include <sys/Time.h>
#include "Connector.h"
+#include "Requester.h"
+#include "Responder.h"
using namespace qpid::sys;
using namespace qpid::client;
using namespace qpid::framing;
using qpid::QpidError;
-Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) :
+Connector::Connector(const qpid::framing::ProtocolVersion& pVersion,
+ Requester& req, Responder& resp,
+ bool _debug, u_int32_t buffer_size) :
debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
@@ -40,7 +44,10 @@ Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug
timeoutHandler(0),
shutdownHandler(0),
inbuf(receive_buffer_size),
- outbuf(send_buffer_size){ }
+ outbuf(send_buffer_size),
+ requester(req),
+ responder(resp)
+{ }
Connector::~Connector(){ }
@@ -75,7 +82,13 @@ OutputHandler* Connector::getOutputHandler(){
}
void Connector::send(AMQFrame* frame){
- writeBlock(frame);
+ AMQBody::shared_ptr body = frame->getBody();
+ u_int8_t type = body->type();
+ if (type == REQUEST_BODY)
+ requester.sending(AMQRequestBody::getData(body));
+ else if (type == RESPONSE_BODY)
+ responder.sending(AMQResponseBody::getData(body));
+ writeBlock(frame);
if(debug) std::cout << "SENT: " << *frame << std::endl;
delete frame;
}
diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h
index f9e50f3216..02926b2bdb 100644
--- a/cpp/lib/client/Connector.h
+++ b/cpp/lib/client/Connector.h
@@ -34,60 +34,73 @@
#include <sys/Socket.h>
namespace qpid {
+
+namespace framing {
+
+class Requester;
+class Responder;
+
+} // namespace framing
+
namespace client {
- class Connector : public qpid::framing::OutputHandler,
- private qpid::sys::Runnable
- {
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
- qpid::framing::ProtocolVersion version;
-
- bool closed;
-
- int64_t lastIn;
- int64_t lastOut;
- int64_t timeout;
- u_int32_t idleIn;
- u_int32_t idleOut;
-
- qpid::sys::TimeoutHandler* timeoutHandler;
- qpid::sys::ShutdownHandler* shutdownHandler;
- qpid::framing::InputHandler* input;
- qpid::framing::InitiationHandler* initialiser;
- qpid::framing::OutputHandler* output;
+class Connector : public qpid::framing::OutputHandler,
+ private qpid::sys::Runnable
+{
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+ qpid::framing::ProtocolVersion version;
+
+ bool closed;
+
+ int64_t lastIn;
+ int64_t lastOut;
+ int64_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ qpid::sys::TimeoutHandler* timeoutHandler;
+ qpid::sys::ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::sys::Mutex writeLock;
+ qpid::sys::Thread receiver;
- qpid::sys::Mutex writeLock;
- qpid::sys::Thread receiver;
+ qpid::sys::Socket socket;
- qpid::sys::Socket socket;
+ qpid::framing::Requester& requester;
+ qpid::framing::Responder& responder;
- void checkIdle(ssize_t status);
- void writeBlock(qpid::framing::AMQDataBlock* data);
- void writeToSocket(char* data, size_t available);
- void setSocketTimeout();
-
- void run();
- void handleClosed();
-
- public:
- Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024);
- virtual ~Connector();
- virtual void connect(const std::string& host, int port);
- virtual void init(qpid::framing::ProtocolInitiation* header);
- virtual void close();
- virtual void setInputHandler(qpid::framing::InputHandler* handler);
- virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
- virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
- virtual qpid::framing::OutputHandler* getOutputHandler();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void setReadTimeout(u_int16_t timeout);
- virtual void setWriteTimeout(u_int16_t timeout);
- };
+ void checkIdle(ssize_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+ void handleClosed();
+
+ public:
+ Connector(const qpid::framing::ProtocolVersion& pVersion,
+ qpid::framing::Requester& req, qpid::framing::Responder& resp,
+ bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~Connector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
+ virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
+};
}
}