summaryrefslogtreecommitdiff
path: root/cpp/lib/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/client')
-rw-r--r--cpp/lib/client/ClientChannel.cpp12
-rw-r--r--cpp/lib/client/ClientChannel.h22
-rw-r--r--cpp/lib/client/Connection.cpp2
-rw-r--r--cpp/lib/client/Connection.h11
-rw-r--r--cpp/lib/client/Connector.cpp14
-rw-r--r--cpp/lib/client/Connector.h2
6 files changed, 46 insertions, 17 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index dd93c6ae8b..52910f5161 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -24,6 +24,7 @@
#include <QpidError.h>
#include <MethodBodyInstances.h>
#include "Connection.h"
+#include "AMQP_ServerProxy.h"
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -48,12 +49,23 @@ Channel::~Channel(){
close();
}
+AMQP_ServerProxy& Channel::brokerProxy() {
+ assert(proxy.get());
+ return *proxy;
+}
+
+AMQMethodBody::shared_ptr Channel::brokerResponse() {
+ // FIXME aconway 2007-02-08: implement responses.
+ return AMQMethodBody::shared_ptr();
+}
+
void Channel::open(ChannelId id, Connection& con)
{
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
connection = &con;
init(id, con, con.getVersion()); // ChannelAdapter initialization.
+ proxy.reset(new AMQP_ServerProxy(*this));
string oob;
if (id != 0)
sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob));
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index a34c95d2c4..1c082f3b59 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -24,6 +24,7 @@
#include <map>
#include <string>
#include <queue>
+#include <boost/scoped_ptr.hpp>
#include "sys/types.h"
#include <framing/amqp_framing.h>
@@ -39,8 +40,10 @@
#include "Thread.h"
namespace qpid {
+
namespace framing {
class ChannelCloseBody;
+class AMQP_ServerProxy;
}
namespace client {
@@ -102,6 +105,7 @@ class Channel : public framing::ChannelAdapter,
u_int16_t prefetch;
const bool transactional;
framing::ProtocolVersion version;
+ boost::scoped_ptr<framing::AMQP_ServerProxy> proxy;
void enqueue();
void retrieve(Message& msg);
@@ -151,8 +155,6 @@ class Channel : public framing::ChannelAdapter,
public:
- bool isOpen() const;
-
/**
* Creates a channel object.
*
@@ -358,9 +360,21 @@ class Channel : public framing::ChannelAdapter,
* @see publish()
*/
void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ bool isOpen() const;
+
+ /**
+ * Returns a proxy for the "raw" AMQP broker protocol. Only for use by
+ * protocol experts.
+ */
+
+ framing::AMQP_ServerProxy& brokerProxy();
+ /**
+ * Wait for the next method from the broker.
+ */
+ framing::AMQMethodBody::shared_ptr brokerResponse();
};
-}
-}
+}}
#endif /*!_client_ClientChannel_h*/
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index 0fafd29b90..2f91c44a22 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -43,7 +43,7 @@ const std::string Connection::OK("OK");
Connection::Connection(
bool _debug, u_int32_t _max_frame_size,
- const framing::ProtocolVersion& _version
+ framing::ProtocolVersion _version
) : version(_version), max_frame_size(_max_frame_size),
defaultConnector(version, _debug, _max_frame_size),
isOpen(false), debug(_debug)
diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h
index 2f9b35d5ef..275e02a105 100644
--- a/cpp/lib/client/Connection.h
+++ b/cpp/lib/client/Connection.h
@@ -96,12 +96,12 @@ class Connection : public ConnectionForChannel
Connector* connector;
framing::OutputHandler* out;
volatile bool isOpen;
+ Channel channel0;
+ bool debug;
void erase(framing::ChannelId);
void channelException(
Channel&, framing::AMQMethodBody*, const QpidError&);
- Channel channel0;
- bool debug;
// TODO aconway 2007-01-26: too many friendships, untagle these classes.
friend class Channel;
@@ -120,9 +120,8 @@ class Connection : public ConnectionForChannel
* @param max_frame_size the maximum frame size that the
* client will accept. Optional and defaults to 65536.
*/
- Connection(
- bool debug = false, u_int32_t max_frame_size = 65536,
- const framing::ProtocolVersion& = framing::highestProtocolVersion);
+ Connection(bool debug = false, u_int32_t max_frame_size = 65536,
+ framing::ProtocolVersion=framing::highestProtocolVersion);
~Connection();
/**
@@ -185,7 +184,7 @@ class Connection : public ConnectionForChannel
inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
/** @return protocol version in use on this connection. */
- const framing::ProtocolVersion& getVersion() const { return version; }
+ framing::ProtocolVersion getVersion() const { return version; }
};
}} // namespace qpid::client
diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp
index 425cecaf6f..657ee77f1a 100644
--- a/cpp/lib/client/Connector.cpp
+++ b/cpp/lib/client/Connector.cpp
@@ -23,17 +23,19 @@
#include <sys/Time.h>
#include "Connector.h"
+namespace qpid {
+namespace client {
+
using namespace qpid::sys;
-using namespace qpid::client;
using namespace qpid::framing;
using qpid::QpidError;
-Connector::Connector(const qpid::framing::ProtocolVersion& pVersion,
- bool _debug, u_int32_t buffer_size) :
- debug(_debug),
+Connector::Connector(
+ ProtocolVersion ver, bool _debug, u_int32_t buffer_size
+) : debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
- version(pVersion),
+ version(ver),
closed(true),
lastIn(0), lastOut(0),
timeout(0),
@@ -180,3 +182,5 @@ void Connector::run(){
handleClosed();
}
}
+
+}} // namespace qpid::client
diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h
index 1126e861e0..ccac39f849 100644
--- a/cpp/lib/client/Connector.h
+++ b/cpp/lib/client/Connector.h
@@ -77,7 +77,7 @@ class Connector : public framing::OutputHandler,
friend class Channel;
public:
- Connector(const framing::ProtocolVersion& pVersion,
+ Connector(framing::ProtocolVersion pVersion,
bool debug = false, u_int32_t buffer_size = 1024);
virtual ~Connector();
virtual void connect(const std::string& host, int port);