diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-19 21:33:27 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-19 21:33:27 +0000 |
| commit | e861284318186f8d9cd64a7ddcc28b8d20b98721 (patch) | |
| tree | 6dac612d65297dc5f104350884fc01385c69ecda /cpp/lib/client | |
| parent | 226be67c91b25a5ba8efdd9ba88566033ec97718 (diff) | |
| download | qpid-python-e861284318186f8d9cd64a7ddcc28b8d20b98721.tar.gz | |
Last big refactoring for 0-9 framing. Still need additional tests &
debugging but the overall structure is all in place.
* configure.ac: Added -Wno_virtual_overload warning
* ChannelTest.cpp, MessageBuilderTest.cpp: Fixed virtual overload warnings.
* ChannelAdapter.cpp: Common base for client/broker adapters.
Creates invocation context, handles request/resposne IDs.
* CppGenerator.java:
- Proxies send methods using MethodContext.
* Various .h files: removed unnecessary #includes, added to requred .cpp files.
* ConnectionContext: renamed from SessionContext.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497963 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client')
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 10 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.h | 30 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.cpp | 10 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.h | 34 |
4 files changed, 57 insertions, 27 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index af26990d8a..d9edb2f390 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -256,6 +256,16 @@ void Channel::rollback(){ sendAndReceive(frame, method_bodies.tx_rollback_ok); } +void Channel::handleRequest(AMQRequestBody::shared_ptr body) { + // FIXME aconway 2007-01-19: request/response handling. + handleMethod(body); +} + +void Channel::handleResponse(AMQResponseBody::shared_ptr body) { + // FIXME aconway 2007-01-19: request/response handling. + handleMethod(body); +} + void Channel::handleMethod(AMQMethodBody::shared_ptr body){ //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request if(responses.isWaiting()){ diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index 5beda0296e..e7bab8b4ee 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -67,7 +67,9 @@ namespace client { * * \ingroup clientapi */ - class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{ + class Channel : private virtual framing::BodyHandler, + public virtual sys::Runnable + { struct Consumer{ MessageListener* listener; int ackMode; @@ -78,36 +80,38 @@ namespace client { u_int16_t id; Connection* con; - qpid::sys::Thread dispatcher; - qpid::framing::OutputHandler* out; + sys::Thread dispatcher; + framing::OutputHandler* out; IncomingMessage* incoming; ResponseHandler responses; std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume IncomingMessage* retrieved;//holds response to basic.get - qpid::sys::Monitor dispatchMonitor; - qpid::sys::Monitor retrievalMonitor; + sys::Monitor dispatchMonitor; + sys::Monitor retrievalMonitor; std::map<std::string, Consumer*> consumers; ReturnedMessageHandler* returnsHandler; bool closed; u_int16_t prefetch; const bool transactional; - qpid::framing::ProtocolVersion version; + framing::ProtocolVersion version; void enqueue(); void retrieve(Message& msg); IncomingMessage* dequeue(); void dispatch(); void stop(); - void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); + void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body); void deliver(Consumer* consumer, Message& msg); void setQos(); void cancelAll(); - virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); - virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); - virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); - virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + virtual void handleMethod(framing::AMQMethodBody::shared_ptr body); + virtual void handleHeader(framing::AMQHeaderBody::shared_ptr body); + virtual void handleContent(framing::AMQContentBody::shared_ptr body); + virtual void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); + void handleRequest(framing::AMQRequestBody::shared_ptr); + void handleResponse(framing::AMQResponseBody::shared_ptr); public: /** @@ -185,7 +189,7 @@ namespace client { * is received from the broker */ void bind(const Exchange& exchange, const Queue& queue, const std::string& key, - const qpid::framing::FieldTable& args, bool synch = true); + const framing::FieldTable& args, bool synch = true); /** * Creates a 'consumer' for a queue. Messages in (or arriving * at) that queue will be delivered to consumers @@ -216,7 +220,7 @@ namespace client { void consume( Queue& queue, std::string& tag, MessageListener* listener, int ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const qpid::framing::FieldTable* fields = 0); + const framing::FieldTable* fields = 0); /** * Cancels a subscription previously set up through a call to consume(). diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index dd1e372095..1ae317db62 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -184,6 +184,16 @@ void Connection::handleFrame(AMQFrame* frame){ } } +void Connection::handleRequest(AMQRequestBody::shared_ptr body) { + // FIXME aconway 2007-01-19: request/response handling. + handleMethod(body); +} + +void Connection::handleResponse(AMQResponseBody::shared_ptr body) { + // FIXME aconway 2007-01-19: request/response handling. + handleMethod(body); +} + void Connection::handleMethod(AMQMethodBody::shared_ptr body){ //connection.close, basic.deliver, basic.return or a response to a synchronous request if(responses.isWaiting()){ diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 21e2fb90a2..9c9b067f88 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -66,7 +66,8 @@ namespace client { class Connection : public virtual qpid::framing::InputHandler, public virtual qpid::sys::TimeoutHandler, public virtual qpid::sys::ShutdownHandler, - private virtual qpid::framing::BodyHandler{ + private virtual qpid::framing::BodyHandler + { typedef std::map<int, Channel*>::iterator iterator; @@ -80,20 +81,25 @@ namespace client { qpid::framing::OutputHandler* out; ResponseHandler responses; volatile bool closed; - qpid::framing::ProtocolVersion version; - qpid::framing::Requester requester; - qpid::framing::Responder responder; + framing::ProtocolVersion version; + framing::Requester requester; + framing::Responder responder; - void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); + void channelException(Channel* channel, framing::AMQMethodBody* body, QpidError& e); void error(int code, const std::string& msg, int classid = 0, int methodid = 0); void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0); - void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); - - virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); - virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); - virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); - virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); - void handleFrame(qpid::framing::AMQFrame* frame); + void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body); + + // FIXME aconway 2007-01-19: Use channel(0) not connection + // to handle channel 0 requests. Remove handler methods. + // + void handleRequest(framing::AMQRequestBody::shared_ptr); + void handleResponse(framing::AMQResponseBody::shared_ptr); + void handleMethod(framing::AMQMethodBody::shared_ptr); + void handleHeader(framing::AMQHeaderBody::shared_ptr); + void handleContent(framing::AMQContentBody::shared_ptr); + void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr); + void handleFrame(framing::AMQFrame* frame); public: /** @@ -110,7 +116,7 @@ namespace client { * client will accept. Optional and defaults to 65536. */ Connection( bool debug = false, u_int32_t max_frame_size = 65536, - qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestProtocolVersion)); + framing::ProtocolVersion* _version = &(framing::highestProtocolVersion)); ~Connection(); /** @@ -163,7 +169,7 @@ namespace client { */ void removeChannel(Channel* channel); - virtual void received(qpid::framing::AMQFrame* frame); + virtual void received(framing::AMQFrame* frame); virtual void idleOut(); virtual void idleIn(); |
