summaryrefslogtreecommitdiff
path: root/cpp/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/tests')
-rw-r--r--cpp/tests/ChannelTest.cpp4
-rw-r--r--cpp/tests/ExchangeTest.cpp2
-rw-r--r--cpp/tests/InProcessBroker.h153
-rw-r--r--cpp/tests/MessageBuilderTest.cpp16
-rw-r--r--cpp/tests/MessageTest.cpp4
-rw-r--r--cpp/tests/QueueTest.cpp12
-rw-r--r--cpp/tests/TxAckTest.cpp2
-rw-r--r--cpp/tests/TxPublishTest.cpp2
8 files changed, 174 insertions, 21 deletions
diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp
index b31ff6a321..760a4d3344 100644
--- a/cpp/tests/ChannelTest.cpp
+++ b/cpp/tests/ChannelTest.cpp
@@ -221,7 +221,7 @@ class ChannelTest : public CppUnit::TestCase
Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
const string data[] = {"abcde", "fghij", "klmno"};
- Message* msg = new Message(0, "my_exchange", "my_routing_key", false, false);
+ Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
store.expect();
store.stage(msg);
@@ -309,7 +309,7 @@ class ChannelTest : public CppUnit::TestCase
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize)
{
- Message* msg = new Message(0, exchange, routingKey, false, false);
+ BasicMessage* msg = new BasicMessage(0, exchange, routingKey, false, false);
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(contentSize);
msg->setHeader(header);
diff --git a/cpp/tests/ExchangeTest.cpp b/cpp/tests/ExchangeTest.cpp
index 8fef4ccaac..a31c369fe1 100644
--- a/cpp/tests/ExchangeTest.cpp
+++ b/cpp/tests/ExchangeTest.cpp
@@ -54,7 +54,7 @@ class ExchangeTest : public CppUnit::TestCase
queue.reset();
queue2.reset();
- Message::shared_ptr msgPtr(new Message(0, "e", "A", true, true));
+ Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true));
DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h
index 4ef352e677..af0f4e84fe 100644
--- a/cpp/tests/InProcessBroker.h
+++ b/cpp/tests/InProcessBroker.h
@@ -151,3 +151,156 @@ std::ostream& operator<<(
}} // namespace qpid::broker
#endif /*!_tests_InProcessBroker_h*/
+#ifndef _tests_InProcessBroker_h
+#define _tests_InProcessBroker_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <vector>
+#include <iostream>
+#include <algorithm>
+
+#include "framing/AMQFrame.h"
+#include "broker/Broker.h"
+#include "broker/Connection.h"
+#include "client/Connector.h"
+
+namespace qpid {
+namespace broker {
+
+/** Make a copy of a frame body. Inefficient, only intended for tests. */
+// TODO aconway 2007-01-29: from should be const, need to fix
+// AMQPFrame::encode as const.
+framing::AMQFrame copy(framing::AMQFrame& from) {
+ framing::Buffer buffer(from.size());
+ from.encode(buffer);
+ buffer.flip();
+ framing::AMQFrame result;
+ result.decode(buffer);
+ return result;
+}
+
+/**
+ * A broker that implements client::Connector allowing direct
+ * in-process connection of client to broker. Used to write round-trip
+ * tests without requiring an external broker process.
+ *
+ * Also allows you to "snoop" on frames exchanged between client & broker.
+ *
+ * Use as follows:
+ *
+ \code
+ broker::InProcessBroker ibroker(version);
+ client::Connection clientConnection;
+ clientConnection.setConnector(ibroker);
+ clientConnection.open("");
+ ... use as normal
+ \endcode
+ *
+ */
+class InProcessBroker : public client::Connector {
+ public:
+ enum Sender {CLIENT,BROKER};
+ struct Frame : public framing::AMQFrame {
+ Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {}
+ bool fromBroker() const { return from == BROKER; }
+ bool fromClient() const { return from == CLIENT; }
+
+ template <class MethodType>
+ MethodType* asMethod() {
+ return dynamic_cast<MethodType*>(getBody().get());
+ }
+
+ Sender from;
+ };
+ typedef std::vector<Frame> Conversation;
+
+ InProcessBroker(const framing::ProtocolVersion& ver) :
+ Connector(ver),
+ protocolInit(ver),
+ broker(broker::Broker::create()),
+ brokerOut(BROKER, conversation),
+ brokerConnection(&brokerOut, *broker),
+ clientOut(CLIENT, conversation, &brokerConnection)
+ {}
+
+ void connect(const std::string& /*host*/, int /*port*/) {}
+ void init() { brokerConnection.initiated(&protocolInit); }
+ void close() {}
+
+ /** Client's input handler. */
+ void setInputHandler(framing::InputHandler* handler) {
+ brokerOut.in = handler;
+ }
+
+ /** Called by client to send a frame */
+ void send(framing::AMQFrame* frame) {
+ clientOut.send(frame);
+ }
+
+ /** Entire client-broker conversation is recorded here */
+ Conversation conversation;
+
+ private:
+ /** OutputHandler that forwards data to an InputHandler */
+ struct OutputToInputHandler : public sys::ConnectionOutputHandler {
+ OutputToInputHandler(
+ Sender from_, Conversation& conversation_,
+ framing::InputHandler* ih=0
+ ) : from(from_), conversation(conversation_), in(ih) {}
+
+ void send(framing::AMQFrame* frame) {
+ conversation.push_back(Frame(from, copy(*frame)));
+ in->received(frame);
+ }
+
+ void close() {}
+
+ Sender from;
+ Conversation& conversation;
+ framing::InputHandler* in;
+ };
+
+ framing::ProtocolInitiation protocolInit;
+ Broker::shared_ptr broker;
+ OutputToInputHandler brokerOut;
+ broker::Connection brokerConnection;
+ OutputToInputHandler clientOut;
+};
+
+std::ostream& operator<<(
+ std::ostream& out, const InProcessBroker::Frame& frame)
+{
+ return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") <<
+ static_cast<const framing::AMQFrame&>(frame);
+}
+std::ostream& operator<<(
+ std::ostream& out, const InProcessBroker::Conversation& conv)
+{
+ for (InProcessBroker::Conversation::const_iterator i = conv.begin();
+ i != conv.end(); ++i)
+ {
+ out << *i << std::endl;
+ }
+ return out;
+}
+
+
+}} // namespace qpid::broker
+
+#endif /*!_tests_InProcessBroker_h*/
diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp
index 21f5935218..e84d1df0e7 100644
--- a/cpp/tests/MessageBuilderTest.cpp
+++ b/cpp/tests/MessageBuilderTest.cpp
@@ -74,14 +74,14 @@ class MessageBuilderTest : public CppUnit::TestCase
// Don't hide overloads.
using NullMessageStore::destroy;
- void destroy(Message* msg)
+ void destroy(BasicMessage* msg)
{
CPPUNIT_ASSERT(msg->getPersistenceId());
}
- Message::shared_ptr getRestoredMessage()
+ BasicMessage::shared_ptr getRestoredMessage()
{
- Message::shared_ptr msg(new Message());
+ BasicMessage::shared_ptr msg(new BasicMessage());
if (header) {
header->flip();
msg->decodeHeader(*header);
@@ -116,7 +116,7 @@ class MessageBuilderTest : public CppUnit::TestCase
DummyHandler handler;
MessageBuilder builder(&handler);
- Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(0);
@@ -133,7 +133,7 @@ class MessageBuilderTest : public CppUnit::TestCase
string data1("abcdefg");
- Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(7);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -154,7 +154,7 @@ class MessageBuilderTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -183,7 +183,7 @@ class MessageBuilderTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties());
@@ -200,7 +200,7 @@ class MessageBuilderTest : public CppUnit::TestCase
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
- Message::shared_ptr restored = store.getRestoredMessage();
+ BasicMessage::shared_ptr restored = store.getRestoredMessage();
CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange());
CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey());
CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId());
diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp
index 8bb570e598..62249e1f5f 100644
--- a/cpp/tests/MessageTest.cpp
+++ b/cpp/tests/MessageTest.cpp
@@ -52,7 +52,7 @@ class MessageTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr msg = Message::shared_ptr(new Message(0, exchange, routingKey, false, false));
+ Message::shared_ptr msg = Message::shared_ptr(new BasicMessage(0, exchange, routingKey, false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -69,7 +69,7 @@ class MessageTest : public CppUnit::TestCase
msg->encode(buffer);
buffer.flip();
- msg = Message::shared_ptr(new Message(buffer));
+ msg = Message::shared_ptr(new BasicMessage(buffer));
CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId());
diff --git a/cpp/tests/QueueTest.cpp b/cpp/tests/QueueTest.cpp
index 9d655781c1..e156efc507 100644
--- a/cpp/tests/QueueTest.cpp
+++ b/cpp/tests/QueueTest.cpp
@@ -66,9 +66,9 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount());
//Test basic delivery:
- Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
- Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
- Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+ Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true));
queue->deliver(msg1);
CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
@@ -123,9 +123,9 @@ class QueueTest : public CppUnit::TestCase
void testDequeue(){
Queue::shared_ptr queue(new Queue("my_queue", true));
- Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
- Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
- Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+ Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true));
Message::shared_ptr received;
queue->deliver(msg1);
diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp
index 0ffe984ded..a6fe0f1010 100644
--- a/cpp/tests/TxAckTest.cpp
+++ b/cpp/tests/TxAckTest.cpp
@@ -69,7 +69,7 @@ public:
TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid)
{
for(int i = 0; i < 10; i++){
- Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false));
+ Message::shared_ptr msg(new BasicMessage(0, "exchange", "routing_key", false, false));
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
messages.push_back(msg);
diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp
index 3542e08f45..87658af62e 100644
--- a/cpp/tests/TxPublishTest.cpp
+++ b/cpp/tests/TxPublishTest.cpp
@@ -75,7 +75,7 @@ public:
TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)),
queue2(new Queue("queue2", false, &store, 0)),
- msg(new Message(0, "exchange", "routing_key", false, false)),
+ msg(new BasicMessage(0, "exchange", "routing_key", false, false)),
op(msg, &xid)
{
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));