diff options
Diffstat (limited to 'cpp/lib/client')
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 12 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.h | 22 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.cpp | 2 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.h | 11 | ||||
| -rw-r--r-- | cpp/lib/client/Connector.cpp | 14 | ||||
| -rw-r--r-- | cpp/lib/client/Connector.h | 2 |
6 files changed, 46 insertions, 17 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index dd93c6ae8b..52910f5161 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -24,6 +24,7 @@ #include <QpidError.h> #include <MethodBodyInstances.h> #include "Connection.h" +#include "AMQP_ServerProxy.h" // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent // handling of errors that should close the connection or the channel. @@ -48,12 +49,23 @@ Channel::~Channel(){ close(); } +AMQP_ServerProxy& Channel::brokerProxy() { + assert(proxy.get()); + return *proxy; +} + +AMQMethodBody::shared_ptr Channel::brokerResponse() { + // FIXME aconway 2007-02-08: implement responses. + return AMQMethodBody::shared_ptr(); +} + void Channel::open(ChannelId id, Connection& con) { if (isOpen()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); connection = &con; init(id, con, con.getVersion()); // ChannelAdapter initialization. + proxy.reset(new AMQP_ServerProxy(*this)); string oob; if (id != 0) sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob)); diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index a34c95d2c4..1c082f3b59 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -24,6 +24,7 @@ #include <map> #include <string> #include <queue> +#include <boost/scoped_ptr.hpp> #include "sys/types.h" #include <framing/amqp_framing.h> @@ -39,8 +40,10 @@ #include "Thread.h" namespace qpid { + namespace framing { class ChannelCloseBody; +class AMQP_ServerProxy; } namespace client { @@ -102,6 +105,7 @@ class Channel : public framing::ChannelAdapter, u_int16_t prefetch; const bool transactional; framing::ProtocolVersion version; + boost::scoped_ptr<framing::AMQP_ServerProxy> proxy; void enqueue(); void retrieve(Message& msg); @@ -151,8 +155,6 @@ class Channel : public framing::ChannelAdapter, public: - bool isOpen() const; - /** * Creates a channel object. * @@ -358,9 +360,21 @@ class Channel : public framing::ChannelAdapter, * @see publish() */ void setReturnedMessageHandler(ReturnedMessageHandler* handler); + + bool isOpen() const; + + /** + * Returns a proxy for the "raw" AMQP broker protocol. Only for use by + * protocol experts. + */ + + framing::AMQP_ServerProxy& brokerProxy(); + /** + * Wait for the next method from the broker. + */ + framing::AMQMethodBody::shared_ptr brokerResponse(); }; -} -} +}} #endif /*!_client_ClientChannel_h*/ diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 0fafd29b90..2f91c44a22 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -43,7 +43,7 @@ const std::string Connection::OK("OK"); Connection::Connection( bool _debug, u_int32_t _max_frame_size, - const framing::ProtocolVersion& _version + framing::ProtocolVersion _version ) : version(_version), max_frame_size(_max_frame_size), defaultConnector(version, _debug, _max_frame_size), isOpen(false), debug(_debug) diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 2f9b35d5ef..275e02a105 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -96,12 +96,12 @@ class Connection : public ConnectionForChannel Connector* connector; framing::OutputHandler* out; volatile bool isOpen; + Channel channel0; + bool debug; void erase(framing::ChannelId); void channelException( Channel&, framing::AMQMethodBody*, const QpidError&); - Channel channel0; - bool debug; // TODO aconway 2007-01-26: too many friendships, untagle these classes. friend class Channel; @@ -120,9 +120,8 @@ class Connection : public ConnectionForChannel * @param max_frame_size the maximum frame size that the * client will accept. Optional and defaults to 65536. */ - Connection( - bool debug = false, u_int32_t max_frame_size = 65536, - const framing::ProtocolVersion& = framing::highestProtocolVersion); + Connection(bool debug = false, u_int32_t max_frame_size = 65536, + framing::ProtocolVersion=framing::highestProtocolVersion); ~Connection(); /** @@ -185,7 +184,7 @@ class Connection : public ConnectionForChannel inline u_int32_t getMaxFrameSize(){ return max_frame_size; } /** @return protocol version in use on this connection. */ - const framing::ProtocolVersion& getVersion() const { return version; } + framing::ProtocolVersion getVersion() const { return version; } }; }} // namespace qpid::client diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index 425cecaf6f..657ee77f1a 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -23,17 +23,19 @@ #include <sys/Time.h> #include "Connector.h" +namespace qpid { +namespace client { + using namespace qpid::sys; -using namespace qpid::client; using namespace qpid::framing; using qpid::QpidError; -Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, - bool _debug, u_int32_t buffer_size) : - debug(_debug), +Connector::Connector( + ProtocolVersion ver, bool _debug, u_int32_t buffer_size +) : debug(_debug), receive_buffer_size(buffer_size), send_buffer_size(buffer_size), - version(pVersion), + version(ver), closed(true), lastIn(0), lastOut(0), timeout(0), @@ -180,3 +182,5 @@ void Connector::run(){ handleClosed(); } } + +}} // namespace qpid::client diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index 1126e861e0..ccac39f849 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -77,7 +77,7 @@ class Connector : public framing::OutputHandler, friend class Channel; public: - Connector(const framing::ProtocolVersion& pVersion, + Connector(framing::ProtocolVersion pVersion, bool debug = false, u_int32_t buffer_size = 1024); virtual ~Connector(); virtual void connect(const std::string& host, int port); |
