From b5c270f10496f522ef6a03a8fa60f85d55c9187d Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 2 Feb 2007 22:03:10 +0000 Subject: * cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to ChannelAdapter and Method Body. Request ID comes from body, ChannelAdapter is used to send frames, not OutputHandler. * cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member. Context is per-method not per-channel. * cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId with MethodContext (for responses) or ChannelAdapter (for requests.) Use context request-ID to construct responses, send all bodies via ChannelAdapter. * cpp/lib/broker/BrokerAdapter.cpp: Link broker::Channel to BrokerAdapter. * cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters. Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion& * Cosmetic changes, many files: - fixed indentation, broke long lines. - removed unnecessary qpid:: prefixes. * broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into broker::channel. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502767 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/tests/ChannelTest.cpp | 92 +++++++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 34 deletions(-) (limited to 'cpp/tests/ChannelTest.cpp') diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index 760a4d3344..a3dabe6408 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -28,6 +28,9 @@ #include #include #include "AMQFrame.h" +#include "DummyChannel.h" +#include "broker/Connection.h" +#include "ProtocolInitiation.h" using namespace boost; using namespace qpid::broker; @@ -36,12 +39,12 @@ using namespace qpid::sys; using std::string; using std::queue; -struct DummyHandler : OutputHandler{ +struct DummyHandler : ConnectionOutputHandler{ std::vector frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); - } + void send(AMQFrame* frame){ frames.push_back(frame); } + + void close() {}; }; @@ -55,6 +58,10 @@ class ChannelTest : public CppUnit::TestCase CPPUNIT_TEST(testQueuePolicy); CPPUNIT_TEST_SUITE_END(); + Broker::shared_ptr broker; + Connection connection; + DummyHandler handler; + class MockMessageStore : public NullMessageStore { struct MethodCall @@ -135,9 +142,17 @@ class ChannelTest : public CppUnit::TestCase public: + ChannelTest() : + broker(Broker::create()), + connection(&handler, *broker) + { + connection.initiated(new ProtocolInitiation()); + } + + void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(qpid::framing::highestProtocolVersion, 0, 0, 0); + Channel channel(connection, 0, 0, 0); channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); @@ -162,12 +177,10 @@ class ChannelTest : public CppUnit::TestCase } void testDeliveryNoAck(){ - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); - + Channel channel(connection, 7, 10000); const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + Message::shared_ptr msg( + createMessage("test", "my_routing_key", "my_message_id", 14)); addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); ConnectionToken* owner(0); @@ -175,22 +188,25 @@ class ChannelTest : public CppUnit::TestCase channel.consume(tag, queue, false, false, owner); queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel()); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[0]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[1]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[2]->getBody().get())); + AMQContentBody* contentBody = dynamic_cast( + handler.frames[3]->getBody().get()); CPPUNIT_ASSERT(contentBody); CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); } void testDeliveryAndRecovery(){ - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); + Channel channel(connection, 7, 10000); const string data("abcdefghijklmn"); Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); @@ -202,26 +218,32 @@ class ChannelTest : public CppUnit::TestCase channel.consume(tag, queue, true, false, owner); queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel()); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[0]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[1]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[2]->getBody().get())); + AMQContentBody* contentBody = dynamic_cast( + handler.frames[3]->getBody().get()); CPPUNIT_ASSERT(contentBody); CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); } void testStaging(){ MockMessageStore store; - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); + Channel channel( + connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); const string data[] = {"abcde", "fghij", "klmno"}; - Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); + Message* msg = new BasicMessage( + 0, "my_exchange", "my_routing_key", false, false, + DummyChannel::basicGetBody()); store.expect(); store.stage(msg); @@ -309,7 +331,9 @@ class ChannelTest : public CppUnit::TestCase Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize) { - BasicMessage* msg = new BasicMessage(0, exchange, routingKey, false, false); + BasicMessage* msg = new BasicMessage( + 0, exchange, routingKey, false, false, + DummyChannel::basicGetBody()); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(contentSize); msg->setHeader(header); -- cgit v1.2.1