diff options
Diffstat (limited to 'cpp/tests')
| -rw-r--r-- | cpp/tests/ChannelTest.cpp | 4 | ||||
| -rw-r--r-- | cpp/tests/ExchangeTest.cpp | 2 | ||||
| -rw-r--r-- | cpp/tests/InProcessBroker.h | 153 | ||||
| -rw-r--r-- | cpp/tests/MessageBuilderTest.cpp | 16 | ||||
| -rw-r--r-- | cpp/tests/MessageTest.cpp | 4 | ||||
| -rw-r--r-- | cpp/tests/QueueTest.cpp | 12 | ||||
| -rw-r--r-- | cpp/tests/TxAckTest.cpp | 2 | ||||
| -rw-r--r-- | cpp/tests/TxPublishTest.cpp | 2 |
8 files changed, 174 insertions, 21 deletions
diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index b31ff6a321..760a4d3344 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -221,7 +221,7 @@ class ChannelTest : public CppUnit::TestCase Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); const string data[] = {"abcde", "fghij", "klmno"}; - Message* msg = new Message(0, "my_exchange", "my_routing_key", false, false); + Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); store.expect(); store.stage(msg); @@ -309,7 +309,7 @@ class ChannelTest : public CppUnit::TestCase Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize) { - Message* msg = new Message(0, exchange, routingKey, false, false); + BasicMessage* msg = new BasicMessage(0, exchange, routingKey, false, false); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(contentSize); msg->setHeader(header); diff --git a/cpp/tests/ExchangeTest.cpp b/cpp/tests/ExchangeTest.cpp index 8fef4ccaac..a31c369fe1 100644 --- a/cpp/tests/ExchangeTest.cpp +++ b/cpp/tests/ExchangeTest.cpp @@ -54,7 +54,7 @@ class ExchangeTest : public CppUnit::TestCase queue.reset(); queue2.reset(); - Message::shared_ptr msgPtr(new Message(0, "e", "A", true, true)); + Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true)); DeliverableMessage msg(msgPtr); topic.route(msg, "abc", 0); direct.route(msg, "abc", 0); diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h index 4ef352e677..af0f4e84fe 100644 --- a/cpp/tests/InProcessBroker.h +++ b/cpp/tests/InProcessBroker.h @@ -151,3 +151,156 @@ std::ostream& operator<<( }} // namespace qpid::broker #endif /*!_tests_InProcessBroker_h*/ +#ifndef _tests_InProcessBroker_h +#define _tests_InProcessBroker_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <vector> +#include <iostream> +#include <algorithm> + +#include "framing/AMQFrame.h" +#include "broker/Broker.h" +#include "broker/Connection.h" +#include "client/Connector.h" + +namespace qpid { +namespace broker { + +/** Make a copy of a frame body. Inefficient, only intended for tests. */ +// TODO aconway 2007-01-29: from should be const, need to fix +// AMQPFrame::encode as const. +framing::AMQFrame copy(framing::AMQFrame& from) { + framing::Buffer buffer(from.size()); + from.encode(buffer); + buffer.flip(); + framing::AMQFrame result; + result.decode(buffer); + return result; +} + +/** + * A broker that implements client::Connector allowing direct + * in-process connection of client to broker. Used to write round-trip + * tests without requiring an external broker process. + * + * Also allows you to "snoop" on frames exchanged between client & broker. + * + * Use as follows: + * + \code + broker::InProcessBroker ibroker(version); + client::Connection clientConnection; + clientConnection.setConnector(ibroker); + clientConnection.open(""); + ... use as normal + \endcode + * + */ +class InProcessBroker : public client::Connector { + public: + enum Sender {CLIENT,BROKER}; + struct Frame : public framing::AMQFrame { + Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {} + bool fromBroker() const { return from == BROKER; } + bool fromClient() const { return from == CLIENT; } + + template <class MethodType> + MethodType* asMethod() { + return dynamic_cast<MethodType*>(getBody().get()); + } + + Sender from; + }; + typedef std::vector<Frame> Conversation; + + InProcessBroker(const framing::ProtocolVersion& ver) : + Connector(ver), + protocolInit(ver), + broker(broker::Broker::create()), + brokerOut(BROKER, conversation), + brokerConnection(&brokerOut, *broker), + clientOut(CLIENT, conversation, &brokerConnection) + {} + + void connect(const std::string& /*host*/, int /*port*/) {} + void init() { brokerConnection.initiated(&protocolInit); } + void close() {} + + /** Client's input handler. */ + void setInputHandler(framing::InputHandler* handler) { + brokerOut.in = handler; + } + + /** Called by client to send a frame */ + void send(framing::AMQFrame* frame) { + clientOut.send(frame); + } + + /** Entire client-broker conversation is recorded here */ + Conversation conversation; + + private: + /** OutputHandler that forwards data to an InputHandler */ + struct OutputToInputHandler : public sys::ConnectionOutputHandler { + OutputToInputHandler( + Sender from_, Conversation& conversation_, + framing::InputHandler* ih=0 + ) : from(from_), conversation(conversation_), in(ih) {} + + void send(framing::AMQFrame* frame) { + conversation.push_back(Frame(from, copy(*frame))); + in->received(frame); + } + + void close() {} + + Sender from; + Conversation& conversation; + framing::InputHandler* in; + }; + + framing::ProtocolInitiation protocolInit; + Broker::shared_ptr broker; + OutputToInputHandler brokerOut; + broker::Connection brokerConnection; + OutputToInputHandler clientOut; +}; + +std::ostream& operator<<( + std::ostream& out, const InProcessBroker::Frame& frame) +{ + return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") << + static_cast<const framing::AMQFrame&>(frame); +} +std::ostream& operator<<( + std::ostream& out, const InProcessBroker::Conversation& conv) +{ + for (InProcessBroker::Conversation::const_iterator i = conv.begin(); + i != conv.end(); ++i) + { + out << *i << std::endl; + } + return out; +} + + +}} // namespace qpid::broker + +#endif /*!_tests_InProcessBroker_h*/ diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp index 21f5935218..e84d1df0e7 100644 --- a/cpp/tests/MessageBuilderTest.cpp +++ b/cpp/tests/MessageBuilderTest.cpp @@ -74,14 +74,14 @@ class MessageBuilderTest : public CppUnit::TestCase // Don't hide overloads. using NullMessageStore::destroy; - void destroy(Message* msg) + void destroy(BasicMessage* msg) { CPPUNIT_ASSERT(msg->getPersistenceId()); } - Message::shared_ptr getRestoredMessage() + BasicMessage::shared_ptr getRestoredMessage() { - Message::shared_ptr msg(new Message()); + BasicMessage::shared_ptr msg(new BasicMessage()); if (header) { header->flip(); msg->decodeHeader(*header); @@ -116,7 +116,7 @@ class MessageBuilderTest : public CppUnit::TestCase DummyHandler handler; MessageBuilder builder(&handler); - Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(0); @@ -133,7 +133,7 @@ class MessageBuilderTest : public CppUnit::TestCase string data1("abcdefg"); - Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(7); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -154,7 +154,7 @@ class MessageBuilderTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -183,7 +183,7 @@ class MessageBuilderTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); @@ -200,7 +200,7 @@ class MessageBuilderTest : public CppUnit::TestCase CPPUNIT_ASSERT(handler.msg); CPPUNIT_ASSERT_EQUAL(message, handler.msg); - Message::shared_ptr restored = store.getRestoredMessage(); + BasicMessage::shared_ptr restored = store.getRestoredMessage(); CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp index 8bb570e598..62249e1f5f 100644 --- a/cpp/tests/MessageTest.cpp +++ b/cpp/tests/MessageTest.cpp @@ -52,7 +52,7 @@ class MessageTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - Message::shared_ptr msg = Message::shared_ptr(new Message(0, exchange, routingKey, false, false)); + Message::shared_ptr msg = Message::shared_ptr(new BasicMessage(0, exchange, routingKey, false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -69,7 +69,7 @@ class MessageTest : public CppUnit::TestCase msg->encode(buffer); buffer.flip(); - msg = Message::shared_ptr(new Message(buffer)); + msg = Message::shared_ptr(new BasicMessage(buffer)); CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange()); CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId()); diff --git a/cpp/tests/QueueTest.cpp b/cpp/tests/QueueTest.cpp index 9d655781c1..e156efc507 100644 --- a/cpp/tests/QueueTest.cpp +++ b/cpp/tests/QueueTest.cpp @@ -66,9 +66,9 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount()); //Test basic delivery: - Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); - Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); - Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); + Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true)); + Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true)); + Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true)); queue->deliver(msg1); CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); @@ -123,9 +123,9 @@ class QueueTest : public CppUnit::TestCase void testDequeue(){ Queue::shared_ptr queue(new Queue("my_queue", true)); - Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); - Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); - Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); + Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true)); + Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true)); + Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true)); Message::shared_ptr received; queue->deliver(msg1); diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp index 0ffe984ded..a6fe0f1010 100644 --- a/cpp/tests/TxAckTest.cpp +++ b/cpp/tests/TxAckTest.cpp @@ -69,7 +69,7 @@ public: TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid) { for(int i = 0; i < 10; i++){ - Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false)); + Message::shared_ptr msg(new BasicMessage(0, "exchange", "routing_key", false, false)); msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); messages.push_back(msg); diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp index 3542e08f45..87658af62e 100644 --- a/cpp/tests/TxPublishTest.cpp +++ b/cpp/tests/TxPublishTest.cpp @@ -75,7 +75,7 @@ public: TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), queue2(new Queue("queue2", false, &store, 0)), - msg(new Message(0, "exchange", "routing_key", false, false)), + msg(new BasicMessage(0, "exchange", "routing_key", false, false)), op(msg, &xid) { msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); |
