From f972bfa6c7c9a1b3d5f5fe4753dce7358b6b0215 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Sat, 17 Feb 2007 03:46:44 +0000 Subject: * cpp/lib/client/ClientMessage.h/.cpp: Added Message constructor taking data. * cpp/lib/client/IncomingMessage.cpp: Simplified message aggregation. * cpp/lib/common/framing/AMQ*Body.h: remove unnecessary virtual inheritance. * cpp/lib/common/framing/AMQMethodBody.h: add bool isRequest, isResponse * cpp/lib/common/sys/Mutex.h (class ScopedUnlock): Added ScopedUnlock * cpp/lib/common/sys/ThreadSafeQueue.h: Thread-safe queue template. * cpp/tests/BrokerChannelTest.cpp: Renamed from ChannelTest. * cpp/tests/ClientChannelTest.cpp: Test client API. * cpp/tests/RequestResponseTest.cpp: Removed empty test. * cpp/lib/client/Connection.h,.cpp: - non-static channelIdCounter - No close() in dtor. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@508705 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/docs/api/developer.doxygen | 6 +- cpp/lib/client/ClientChannel.cpp | 4 +- cpp/lib/client/ClientMessage.cpp | 11 +- cpp/lib/client/ClientMessage.h | 14 +- cpp/lib/client/Connection.cpp | 8 +- cpp/lib/client/Connection.h | 2 +- cpp/lib/client/IncomingMessage.cpp | 22 +- cpp/lib/client/IncomingMessage.h | 6 +- cpp/lib/common/ExceptionHolder.h | 1 + cpp/lib/common/framing/AMQContentBody.h | 2 +- cpp/lib/common/framing/AMQHeaderBody.h | 2 +- cpp/lib/common/framing/AMQHeartbeatBody.h | 2 +- cpp/lib/common/framing/AMQMethodBody.h | 3 + cpp/lib/common/framing/AMQRequestBody.h | 1 + cpp/lib/common/framing/AMQResponseBody.h | 1 + cpp/lib/common/sys/Mutex.h | 11 + cpp/lib/common/sys/ThreadSafeQueue.h | 99 +++++++++ cpp/tests/BrokerChannelTest.cpp | 356 ++++++++++++++++++++++++++++++ cpp/tests/ChannelTest.cpp | 356 ------------------------------ cpp/tests/ClientChannelTest.cpp | 82 +++++++ cpp/tests/InProcessBroker.h | 7 +- cpp/tests/Makefile.am | 32 +-- cpp/tests/RequestResponseTest.cpp | 57 ----- 23 files changed, 609 insertions(+), 476 deletions(-) create mode 100644 cpp/lib/common/sys/ThreadSafeQueue.h create mode 100644 cpp/tests/BrokerChannelTest.cpp delete mode 100644 cpp/tests/ChannelTest.cpp create mode 100644 cpp/tests/ClientChannelTest.cpp delete mode 100644 cpp/tests/RequestResponseTest.cpp (limited to 'cpp') diff --git a/cpp/docs/api/developer.doxygen b/cpp/docs/api/developer.doxygen index 2b81671593..4679876d09 100644 --- a/cpp/docs/api/developer.doxygen +++ b/cpp/docs/api/developer.doxygen @@ -35,7 +35,7 @@ PROJECT_NUMBER = 0 # If a relative path is entered, it will be relative to the location # where doxygen was started. If left blank the current directory will be used. -OUTPUT_DIRECTORY = dev +OUTPUT_DIRECTORY = . # If the CREATE_SUBDIRS tag is set to YES, then doxygen will create # 4096 sub-directories (in 2 levels) under the output directory of each output @@ -438,7 +438,7 @@ WARN_IF_DOC_ERROR = YES # wrong or incomplete parameter documentation, but not about the absence of # documentation. -WARN_NO_PARAMDOC = NO +WARN_NO_PARAMDOC = YES # The WARN_FORMAT tag determines the format of the warning messages that # doxygen can produce. The string should contain the $file, $line, and $text @@ -453,7 +453,7 @@ WARN_FORMAT = "$file:$line: $text" # and error messages should be written. If left blank the output is written # to stderr. -WARN_LOGFILE = +WARN_LOGFILE = doxygen.log #--------------------------------------------------------------------------- # configuration options related to the input files diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index 9d1c8ec011..4dff8a62f7 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -245,7 +245,7 @@ void Channel::retrieve(Message& msg){ msg.header = retrieved->getHeader(); msg.deliveryTag = retrieved->getDeliveryTag(); - retrieved->getData(msg.data); + msg.data = retrieved->getData(); delete retrieved; retrieved = 0; } @@ -446,7 +446,7 @@ void Channel::dispatch(){ if(incomingMsg){ //Note: msg is currently only valid for duration of this call Message msg(incomingMsg->getHeader()); - incomingMsg->getData(msg.data); + msg.data = incomingMsg->getData(); if(incomingMsg->isReturn()){ if(returnsHandler == 0){ //print warning to log/console diff --git a/cpp/lib/client/ClientMessage.cpp b/cpp/lib/client/ClientMessage.cpp index e8a2a6019e..8b08f7e535 100644 --- a/cpp/lib/client/ClientMessage.cpp +++ b/cpp/lib/client/ClientMessage.cpp @@ -23,8 +23,15 @@ using namespace qpid::client; using namespace qpid::framing; -Message::Message(){ - header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)); +Message::Message(const std::string& d) + : header(new AMQHeaderBody(BASIC)) +{ + setData(d); +} + +void Message::setData(const std::string& d) { + data = d; + header->setContentSize(d.size()); } Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ diff --git a/cpp/lib/client/ClientMessage.h b/cpp/lib/client/ClientMessage.h index a5d204d35a..148f9240c8 100644 --- a/cpp/lib/client/ClientMessage.h +++ b/cpp/lib/client/ClientMessage.h @@ -41,8 +41,9 @@ namespace client { qpid::framing::BasicHeaderProperties* getHeaderProperties(); Message(qpid::framing::AMQHeaderBody::shared_ptr& header); + public: - Message(); + Message(const std::string& data=std::string()); ~Message(); /** @@ -51,14 +52,15 @@ namespace client { * * @return a string representing the data of the message */ - inline std::string getData(){ return data; } + std::string getData() const { return data; } + /** * Allows the application to set the content of messages to be * sent. * * @param data a string representing the data of the message */ - inline void setData(const std::string& _data){ data = _data; } + void setData(const std::string& _data); /** * @return true if this message was delivered previously (to @@ -103,10 +105,10 @@ namespace client { void setClusterId(const std::string& clusterId); - friend class Channel; + // TODO aconway 2007-02-15: remove friendships. + friend class Channel; }; -} -} +}} #endif /*!_client_ClientMessage_h*/ diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 2f91c44a22..5b97ca8e5d 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -37,23 +37,19 @@ using namespace qpid::sys; namespace qpid { namespace client { -ChannelId Connection::channelIdCounter; - const std::string Connection::OK("OK"); Connection::Connection( bool _debug, u_int32_t _max_frame_size, framing::ProtocolVersion _version -) : version(_version), max_frame_size(_max_frame_size), +) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), defaultConnector(version, _debug, _max_frame_size), isOpen(false), debug(_debug) { setConnector(defaultConnector); } -Connection::~Connection(){ - close(); -} +Connection::~Connection(){} void Connection::setConnector(Connector& con) { diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 275e02a105..6a8d88d581 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -86,7 +86,7 @@ class Connection : public ConnectionForChannel { typedef std::map ChannelMap; - static framing::ChannelId channelIdCounter; + framing::ChannelId channelIdCounter; static const std::string OK; framing::ProtocolVersion version; diff --git a/cpp/lib/client/IncomingMessage.cpp b/cpp/lib/client/IncomingMessage.cpp index 2ff143ba94..c1f6ca880f 100644 --- a/cpp/lib/client/IncomingMessage.cpp +++ b/cpp/lib/client/IncomingMessage.cpp @@ -36,12 +36,12 @@ void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){ this->header = _header; } -void IncomingMessage::addContent(AMQContentBody::shared_ptr _content){ - this->content.push_back(_content); +void IncomingMessage::addContent(AMQContentBody::shared_ptr content){ + data.append(content->getData()); } bool IncomingMessage::isComplete(){ - return header != 0 && header->getContentSize() == contentSize(); + return header != 0 && header->getContentSize() == data.size(); } bool IncomingMessage::isReturn(){ @@ -70,19 +70,7 @@ AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){ return header; } -void IncomingMessage::getData(string& s){ - int count(content.size()); - for(int i = 0; i < count; i++){ - if(i == 0) s = content[i]->getData(); - else s += content[i]->getData(); - } +std::string IncomingMessage::getData() const { + return data; } -u_int64_t IncomingMessage::contentSize(){ - u_int64_t size(0); - u_int64_t count(content.size()); - for(u_int64_t i = 0; i < count; i++){ - size += content[i]->size(); - } - return size; -} diff --git a/cpp/lib/client/IncomingMessage.h b/cpp/lib/client/IncomingMessage.h index 464e05d877..a2aa4d8441 100644 --- a/cpp/lib/client/IncomingMessage.h +++ b/cpp/lib/client/IncomingMessage.h @@ -36,9 +36,7 @@ namespace client { qpid::framing::BasicReturnBody::shared_ptr returned; qpid::framing::BasicGetOkBody::shared_ptr response; qpid::framing::AMQHeaderBody::shared_ptr header; - std::vector content; - - u_int64_t contentSize(); + std::string data; public: IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro); IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro); @@ -53,7 +51,7 @@ namespace client { const std::string& getConsumerTag();//only relevant if isDelivery() qpid::framing::AMQHeaderBody::shared_ptr& getHeader(); u_int64_t getDeliveryTag(); - void getData(std::string& data); + std::string getData() const; }; } diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h index 83d0884be9..1e5a30e27c 100644 --- a/cpp/lib/common/ExceptionHolder.h +++ b/cpp/lib/common/ExceptionHolder.h @@ -53,6 +53,7 @@ class ExceptionHolder : public Exception, public boost::shared_ptr std::string toString() const throw() { return (*this)->toString(); } virtual Exception* clone() const throw() { return (*this)->clone(); } virtual void throwSelf() const { (*this)->throwSelf(); } + virtual void throwIf() const { if (*this) (*this)->throwSelf(); } }; } // namespace qpid diff --git a/cpp/lib/common/framing/AMQContentBody.h b/cpp/lib/common/framing/AMQContentBody.h index 172228671a..a324f1ab21 100644 --- a/cpp/lib/common/framing/AMQContentBody.h +++ b/cpp/lib/common/framing/AMQContentBody.h @@ -28,7 +28,7 @@ namespace qpid { namespace framing { -class AMQContentBody : virtual public AMQBody +class AMQContentBody : public AMQBody { string data; diff --git a/cpp/lib/common/framing/AMQHeaderBody.h b/cpp/lib/common/framing/AMQHeaderBody.h index 31cf7d575e..77f56d9a93 100644 --- a/cpp/lib/common/framing/AMQHeaderBody.h +++ b/cpp/lib/common/framing/AMQHeaderBody.h @@ -29,7 +29,7 @@ namespace qpid { namespace framing { -class AMQHeaderBody : virtual public AMQBody +class AMQHeaderBody : public AMQBody { HeaderProperties* properties; u_int16_t weight; diff --git a/cpp/lib/common/framing/AMQHeartbeatBody.h b/cpp/lib/common/framing/AMQHeartbeatBody.h index a2315119e4..20a954c468 100644 --- a/cpp/lib/common/framing/AMQHeartbeatBody.h +++ b/cpp/lib/common/framing/AMQHeartbeatBody.h @@ -28,7 +28,7 @@ namespace qpid { namespace framing { -class AMQHeartbeatBody : virtual public AMQBody +class AMQHeartbeatBody : public AMQBody { public: typedef boost::shared_ptr shared_ptr; diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h index 492571c83c..9b39a3d721 100644 --- a/cpp/lib/common/framing/AMQMethodBody.h +++ b/cpp/lib/common/framing/AMQMethodBody.h @@ -60,6 +60,9 @@ class AMQMethodBody : public AMQBody /** Return request ID or response correlationID */ virtual RequestId getRequestId() const { return 0; } + virtual bool isRequest() const { return false; } + virtual bool isResponse() const { return false; } + protected: static u_int32_t baseSize() { return 4; } diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h index 9b8fc1d663..d9eb78c26f 100644 --- a/cpp/lib/common/framing/AMQRequestBody.h +++ b/cpp/lib/common/framing/AMQRequestBody.h @@ -62,6 +62,7 @@ class AMQRequestBody : public AMQMethodBody void setRequestId(RequestId id) { data.requestId=id; } void setResponseMark(ResponseId mark) { data.responseMark=mark; } + bool isRequest()const { return true; } protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } void printPrefix(std::ostream& out) const; diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h index 3954836ec8..5ed76ff67d 100644 --- a/cpp/lib/common/framing/AMQResponseBody.h +++ b/cpp/lib/common/framing/AMQResponseBody.h @@ -69,6 +69,7 @@ class AMQResponseBody : public AMQMethodBody void setRequestId(RequestId id) { data.requestId = id; } void setBatchOffset(BatchOffset id) { data.batchOffset = id; } + bool isResponse() const { return true; } protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } void printPrefix(std::ostream& out) const; diff --git a/cpp/lib/common/sys/Mutex.h b/cpp/lib/common/sys/Mutex.h index 4022da2f6e..87d537fb9e 100644 --- a/cpp/lib/common/sys/Mutex.h +++ b/cpp/lib/common/sys/Mutex.h @@ -46,12 +46,23 @@ class ScopedLock L& mutex; }; +template +class ScopedUnlock +{ + public: + ScopedUnlock(L& l) : mutex(l) { l.unlock(); } + ~ScopedUnlock() { mutex.lock(); } + private: + L& mutex; +}; + /** * Mutex lock. */ class Mutex : private boost::noncopyable { public: typedef ScopedLock ScopedLock; + typedef ScopedUnlock ScopedUnlock; inline Mutex(); inline ~Mutex(); diff --git a/cpp/lib/common/sys/ThreadSafeQueue.h b/cpp/lib/common/sys/ThreadSafeQueue.h new file mode 100644 index 0000000000..89454d693f --- /dev/null +++ b/cpp/lib/common/sys/ThreadSafeQueue.h @@ -0,0 +1,99 @@ +#ifndef _sys_ThreadSafeQueue_h +#define _sys_ThreadSafeQueue_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include "ProducerConsumer.h" +#include "Exception.h" + +namespace qpid { +namespace sys { + +/** + * A thread safe queue template. + */ +template > +class ThreadSafeQueue +{ + public: + struct QueueStoppedException : public Exception {}; + + ThreadSafeQueue() {} + + /** Push a value onto the back of the queue */ + void push(const T& value) { + ProducerConsumer::ProducerLock producer(pc); + if (producer.isOk()) { + producer.confirm(); + container.push_back(value); + } + } + + /** Pop a value from the front of the queue. Waits till value is available. + *@throw QueueStoppedException if queue is stopped while waiting. + */ + T pop() { + ProducerConsumer::ConsumerLock consumer(pc); + if (consumer.isOk()) { + consumer.confirm(); + T value(container.front()); + container.pop_front(); + return value; + } + throw QueueStoppedException(); + } + + /** + * If a value becomes available within the timeout, set outValue + * and return true. Otherwise return false; + */ + bool pop(T& outValue, const Time& timeout) { + ProducerConsumer::ConsumerLock consumer(pc, timeout); + if (consumer.isOk()) { + consumer.confirm(); + outValue = container.front(); + container.pop_front(); + return true; + } + return false; + } + + /** Interrupt threads waiting in pop() */ + void stop() { pc.stop(); } + + /** True if queue is stopped */ + bool isStopped() { return pc.isStopped(); } + + /** Size of the queue */ + size_t size() { ProducerConsumer::Lock l(pc); return container.size(); } + + /** True if queue is empty */ + bool empty() { ProducerConsumer::Lock l(pc); return container.empty(); } + + private: + ProducerConsumer pc; + ContainerType container; +}; + +}} // namespace qpid::sys + + + +#endif /*!_sys_ThreadSafeQueue_h*/ diff --git a/cpp/tests/BrokerChannelTest.cpp b/cpp/tests/BrokerChannelTest.cpp new file mode 100644 index 0000000000..bd96d554b4 --- /dev/null +++ b/cpp/tests/BrokerChannelTest.cpp @@ -0,0 +1,356 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "AMQFrame.h" +#include "MockChannel.h" +#include "broker/Connection.h" +#include "ProtocolInitiation.h" + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; +using std::string; +using std::queue; + +struct MockHandler : ConnectionOutputHandler{ + std::vector frames; + + void send(AMQFrame* frame){ frames.push_back(frame); } + + void close() {}; +}; + + +class BrokerChannelTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(BrokerChannelTest); + CPPUNIT_TEST(testConsumerMgmt); + CPPUNIT_TEST(testDeliveryNoAck); + CPPUNIT_TEST(testDeliveryAndRecovery); + CPPUNIT_TEST(testStaging); + CPPUNIT_TEST(testQueuePolicy); + CPPUNIT_TEST_SUITE_END(); + + Broker::shared_ptr broker; + Connection connection; + MockHandler handler; + + class MockMessageStore : public NullMessageStore + { + struct MethodCall + { + const string name; + Message* const msg; + const string data;//only needed for appendContent + + void check(const MethodCall& other) const + { + CPPUNIT_ASSERT_EQUAL(name, other.name); + CPPUNIT_ASSERT_EQUAL(msg, other.msg); + CPPUNIT_ASSERT_EQUAL(data, other.data); + } + }; + + queue expected; + bool expectMode;//true when setting up expected calls + + void handle(const MethodCall& call) + { + if (expectMode) { + expected.push(call); + } else { + call.check(expected.front()); + expected.pop(); + } + } + + void handle(const string& name, Message* msg, const string& data) + { + MethodCall call = {name, msg, data}; + handle(call); + } + + public: + + MockMessageStore() : expectMode(false) {} + + void stage(Message* const msg) + { + if(!expectMode) msg->setPersistenceId(1); + MethodCall call = {"stage", msg, ""}; + handle(call); + } + + void appendContent(Message* msg, const string& data) + { + MethodCall call = {"appendContent", msg, data}; + handle(call); + } + + // Don't hide overloads. + using NullMessageStore::destroy; + + void destroy(Message* msg) + { + MethodCall call = {"destroy", msg, ""}; + handle(call); + } + + void expect() + { + expectMode = true; + } + + void test() + { + expectMode = false; + } + + void check() + { + CPPUNIT_ASSERT(expected.empty()); + } + }; + + + public: + + BrokerChannelTest() : + broker(Broker::create()), + connection(&handler, *broker) + { + connection.initiated(new ProtocolInitiation()); + } + + + void testConsumerMgmt(){ + Queue::shared_ptr queue(new Queue("my_queue")); + Channel channel(connection, 0, 0, 0); + channel.open(); + CPPUNIT_ASSERT(!channel.exists("my_consumer")); + + ConnectionToken* owner = 0; + string tag("my_consumer"); + channel.consume(tag, queue, false, false, owner); + string tagA; + string tagB; + channel.consume(tagA, queue, false, false, owner); + channel.consume(tagB, queue, false, false, owner); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount()); + CPPUNIT_ASSERT(channel.exists("my_consumer")); + CPPUNIT_ASSERT(channel.exists(tagA)); + CPPUNIT_ASSERT(channel.exists(tagB)); + channel.cancel(tagA); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount()); + CPPUNIT_ASSERT(channel.exists("my_consumer")); + CPPUNIT_ASSERT(!channel.exists(tagA)); + CPPUNIT_ASSERT(channel.exists(tagB)); + channel.close(); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount()); + } + + void testDeliveryNoAck(){ + Channel channel(connection, 7, 10000); + channel.open(); + const string data("abcdefghijklmn"); + Message::shared_ptr msg( + createMessage("test", "my_routing_key", "my_message_id", 14)); + addContent(msg, data); + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner(0); + string tag("no_ack"); + channel.consume(tag, queue, false, false, owner); + + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel()); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[0]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[1]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[2]->getBody().get())); + AMQContentBody* contentBody = dynamic_cast( + handler.frames[3]->getBody().get()); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + } + + void testDeliveryAndRecovery(){ + Channel channel(connection, 7, 10000); + channel.open(); + const string data("abcdefghijklmn"); + + Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + addContent(msg, data); + + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner(0); + string tag("ack"); + channel.consume(tag, queue, true, false, owner); + + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel()); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[0]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[1]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast( + handler.frames[2]->getBody().get())); + AMQContentBody* contentBody = dynamic_cast( + handler.frames[3]->getBody().get()); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + } + + void testStaging(){ + MockMessageStore store; + Channel channel( + connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); + const string data[] = {"abcde", "fghij", "klmno"}; + + Message* msg = new BasicMessage( + 0, "my_exchange", "my_routing_key", false, false, + MockChannel::basicGetBody()); + + store.expect(); + store.stage(msg); + for (int i = 0; i < 3; i++) { + store.appendContent(msg, data[i]); + } + store.destroy(msg); + store.test(); + + Exchange::shared_ptr exchange = + broker->getExchanges().declare("my_exchange", "fanout").first; + Queue::shared_ptr queue(new Queue("my_queue")); + exchange->bind(queue, "", 0); + + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + u_int64_t contentSize(0); + for (int i = 0; i < 3; i++) { + contentSize += data[i].size(); + } + header->setContentSize(contentSize); + channel.handlePublish(msg); + channel.handleHeader(header); + + for (int i = 0; i < 3; i++) { + AMQContentBody::shared_ptr body(new AMQContentBody(data[i])); + channel.handleContent(body); + } + Message::shared_ptr msg2 = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg, msg2.get()); + msg2.reset();//should trigger destroy call + + store.check(); + } + + + //NOTE: strictly speaking this should/could be part of QueueTest, + //but as it can usefully use the same utility classes as this + //class it is defined here for simpllicity + void testQueuePolicy() + { + MockMessageStore store; + {//must ensure that store is last thing deleted as it is needed by destructor of lazy loaded content + const string data1("abcd"); + const string data2("efghijk"); + const string data3("lmnopqrstuvwxyz"); + Message::shared_ptr msg1(createMessage("e", "A", "MsgA", data1.size())); + Message::shared_ptr msg2(createMessage("e", "B", "MsgB", data2.size())); + Message::shared_ptr msg3(createMessage("e", "C", "MsgC", data3.size())); + addContent(msg1, data1); + addContent(msg2, data2); + addContent(msg3, data3); + + QueuePolicy policy(2, 0);//third message should be stored on disk and lazy loaded + FieldTable settings; + policy.update(settings); + + store.expect(); + store.stage(msg3.get()); + store.destroy(msg3.get()); + store.test(); + + Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); + queue->configure(settings);//set policy + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + + Message::shared_ptr next = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg1, next); + CPPUNIT_ASSERT_EQUAL((u_int32_t) data1.size(), next->encodedContentSize()); + next = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg2, next); + CPPUNIT_ASSERT_EQUAL((u_int32_t) data2.size(), next->encodedContentSize()); + next = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg3, next); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, next->encodedContentSize()); + + next.reset(); + msg1.reset(); + msg2.reset(); + msg3.reset();//must clear all references to messages to allow them to be destroyed + + } + store.check(); + } + + Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize) + { + BasicMessage* msg = new BasicMessage( + 0, exchange, routingKey, false, false, + MockChannel::basicGetBody()); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(contentSize); + msg->setHeader(header); + msg->getHeaderProperties()->setMessageId(messageId); + return msg; + } + + void addContent(Message::shared_ptr msg, const string& data) + { + AMQContentBody::shared_ptr body(new AMQContentBody(data)); + msg->addContent(body); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(BrokerChannelTest); diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp deleted file mode 100644 index c20a490dc8..0000000000 --- a/cpp/tests/ChannelTest.cpp +++ /dev/null @@ -1,356 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "AMQFrame.h" -#include "MockChannel.h" -#include "broker/Connection.h" -#include "ProtocolInitiation.h" - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; -using std::string; -using std::queue; - -struct MockHandler : ConnectionOutputHandler{ - std::vector frames; - - void send(AMQFrame* frame){ frames.push_back(frame); } - - void close() {}; -}; - - -class ChannelTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ChannelTest); - CPPUNIT_TEST(testConsumerMgmt); - CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testDeliveryAndRecovery); - CPPUNIT_TEST(testStaging); - CPPUNIT_TEST(testQueuePolicy); - CPPUNIT_TEST_SUITE_END(); - - Broker::shared_ptr broker; - Connection connection; - MockHandler handler; - - class MockMessageStore : public NullMessageStore - { - struct MethodCall - { - const string name; - Message* const msg; - const string data;//only needed for appendContent - - void check(const MethodCall& other) const - { - CPPUNIT_ASSERT_EQUAL(name, other.name); - CPPUNIT_ASSERT_EQUAL(msg, other.msg); - CPPUNIT_ASSERT_EQUAL(data, other.data); - } - }; - - queue expected; - bool expectMode;//true when setting up expected calls - - void handle(const MethodCall& call) - { - if (expectMode) { - expected.push(call); - } else { - call.check(expected.front()); - expected.pop(); - } - } - - void handle(const string& name, Message* msg, const string& data) - { - MethodCall call = {name, msg, data}; - handle(call); - } - - public: - - MockMessageStore() : expectMode(false) {} - - void stage(Message* const msg) - { - if(!expectMode) msg->setPersistenceId(1); - MethodCall call = {"stage", msg, ""}; - handle(call); - } - - void appendContent(Message* msg, const string& data) - { - MethodCall call = {"appendContent", msg, data}; - handle(call); - } - - // Don't hide overloads. - using NullMessageStore::destroy; - - void destroy(Message* msg) - { - MethodCall call = {"destroy", msg, ""}; - handle(call); - } - - void expect() - { - expectMode = true; - } - - void test() - { - expectMode = false; - } - - void check() - { - CPPUNIT_ASSERT(expected.empty()); - } - }; - - - public: - - ChannelTest() : - broker(Broker::create()), - connection(&handler, *broker) - { - connection.initiated(new ProtocolInitiation()); - } - - - void testConsumerMgmt(){ - Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(connection, 0, 0, 0); - channel.open(); - CPPUNIT_ASSERT(!channel.exists("my_consumer")); - - ConnectionToken* owner = 0; - string tag("my_consumer"); - channel.consume(tag, queue, false, false, owner); - string tagA; - string tagB; - channel.consume(tagA, queue, false, false, owner); - channel.consume(tagB, queue, false, false, owner); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount()); - CPPUNIT_ASSERT(channel.exists("my_consumer")); - CPPUNIT_ASSERT(channel.exists(tagA)); - CPPUNIT_ASSERT(channel.exists(tagB)); - channel.cancel(tagA); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount()); - CPPUNIT_ASSERT(channel.exists("my_consumer")); - CPPUNIT_ASSERT(!channel.exists(tagA)); - CPPUNIT_ASSERT(channel.exists(tagB)); - channel.close(); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount()); - } - - void testDeliveryNoAck(){ - Channel channel(connection, 7, 10000); - channel.open(); - const string data("abcdefghijklmn"); - Message::shared_ptr msg( - createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel()); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[0]->getBody().get())); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[1]->getBody().get())); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[2]->getBody().get())); - AMQContentBody* contentBody = dynamic_cast( - handler.frames[3]->getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - void testDeliveryAndRecovery(){ - Channel channel(connection, 7, 10000); - channel.open(); - const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("ack"); - channel.consume(tag, queue, true, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel()); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[0]->getBody().get())); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[1]->getBody().get())); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[2]->getBody().get())); - AMQContentBody* contentBody = dynamic_cast( - handler.frames[3]->getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - void testStaging(){ - MockMessageStore store; - Channel channel( - connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); - const string data[] = {"abcde", "fghij", "klmno"}; - - Message* msg = new BasicMessage( - 0, "my_exchange", "my_routing_key", false, false, - MockChannel::basicGetBody()); - - store.expect(); - store.stage(msg); - for (int i = 0; i < 3; i++) { - store.appendContent(msg, data[i]); - } - store.destroy(msg); - store.test(); - - Exchange::shared_ptr exchange = - broker->getExchanges().declare("my_exchange", "fanout").first; - Queue::shared_ptr queue(new Queue("my_queue")); - exchange->bind(queue, "", 0); - - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - u_int64_t contentSize(0); - for (int i = 0; i < 3; i++) { - contentSize += data[i].size(); - } - header->setContentSize(contentSize); - channel.handlePublish(msg); - channel.handleHeader(header); - - for (int i = 0; i < 3; i++) { - AMQContentBody::shared_ptr body(new AMQContentBody(data[i])); - channel.handleContent(body); - } - Message::shared_ptr msg2 = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg, msg2.get()); - msg2.reset();//should trigger destroy call - - store.check(); - } - - - //NOTE: strictly speaking this should/could be part of QueueTest, - //but as it can usefully use the same utility classes as this - //class it is defined here for simpllicity - void testQueuePolicy() - { - MockMessageStore store; - {//must ensure that store is last thing deleted as it is needed by destructor of lazy loaded content - const string data1("abcd"); - const string data2("efghijk"); - const string data3("lmnopqrstuvwxyz"); - Message::shared_ptr msg1(createMessage("e", "A", "MsgA", data1.size())); - Message::shared_ptr msg2(createMessage("e", "B", "MsgB", data2.size())); - Message::shared_ptr msg3(createMessage("e", "C", "MsgC", data3.size())); - addContent(msg1, data1); - addContent(msg2, data2); - addContent(msg3, data3); - - QueuePolicy policy(2, 0);//third message should be stored on disk and lazy loaded - FieldTable settings; - policy.update(settings); - - store.expect(); - store.stage(msg3.get()); - store.destroy(msg3.get()); - store.test(); - - Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); - queue->configure(settings);//set policy - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - - Message::shared_ptr next = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg1, next); - CPPUNIT_ASSERT_EQUAL((u_int32_t) data1.size(), next->encodedContentSize()); - next = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg2, next); - CPPUNIT_ASSERT_EQUAL((u_int32_t) data2.size(), next->encodedContentSize()); - next = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg3, next); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, next->encodedContentSize()); - - next.reset(); - msg1.reset(); - msg2.reset(); - msg3.reset();//must clear all references to messages to allow them to be destroyed - - } - store.check(); - } - - Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize) - { - BasicMessage* msg = new BasicMessage( - 0, exchange, routingKey, false, false, - MockChannel::basicGetBody()); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(contentSize); - msg->setHeader(header); - msg->getHeaderProperties()->setMessageId(messageId); - return msg; - } - - void addContent(Message::shared_ptr msg, const string& data) - { - AMQContentBody::shared_ptr body(new AMQContentBody(data)); - msg->addContent(body); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest); diff --git a/cpp/tests/ClientChannelTest.cpp b/cpp/tests/ClientChannelTest.cpp new file mode 100644 index 0000000000..e1eec4402d --- /dev/null +++ b/cpp/tests/ClientChannelTest.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid_test_plugin.h" +#include "InProcessBroker.h" +#include "ClientChannel.h" +#include "ClientMessage.h" +#include "ClientQueue.h" +#include "ClientExchange.h" + +using namespace std; +using namespace boost; +using namespace qpid::client; +using namespace qpid::framing; + +/** + * Test client API using an in-process broker. + */ +class ClientChannelTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ClientChannelTest); + CPPUNIT_TEST(testGet); + CPPUNIT_TEST(testConsume); + CPPUNIT_TEST_SUITE_END(); + + InProcessBrokerClient connection; // client::connection + local broker + Channel channel; + const std::string key; + const std::string data; + Queue queue; + Exchange exchange; + + public: + + ClientChannelTest() + : key("testq"), data("hello"), + queue(key, true), exchange("", Exchange::DIRECT_EXCHANGE) + { + connection.openChannel(channel); + CPPUNIT_ASSERT(channel.getId() != 0); + channel.declareQueue(queue); + } + + void testGet() { + // FIXME aconway 2007-02-16: Must fix thread safety bug + // in ClientChannel::get for this to pass. + return; + + Message pubMsg(data); + channel.publish(pubMsg, exchange, key); + Message getMsg; + channel.get(getMsg, queue); + CPPUNIT_ASSERT_EQUAL(data, getMsg.getData()); + } + + void testConsume() { + } + + + // FIXME aconway 2007-02-15: Cover full channel API +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ClientChannelTest); diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h index 9cd7cc69cb..709ca9b953 100644 --- a/cpp/tests/InProcessBroker.h +++ b/cpp/tests/InProcessBroker.h @@ -152,19 +152,18 @@ std::ostream& operator<<( class InProcessBrokerClient : public qpid::client::Connection { public: qpid::broker::InProcessBroker broker; + qpid::broker::InProcessBroker::Conversation& conversation; /** Constructor creates broker and opens client connection. */ InProcessBrokerClient(qpid::framing::ProtocolVersion version= qpid::framing::highestProtocolVersion - ) : broker(version) + ) : broker(version), conversation(broker.conversation) { setConnector(broker); open(""); } - ~InProcessBrokerClient() { - close(); // close before broker is deleted. - } + ~InProcessBrokerClient() {} }; #endif // _tests_InProcessBroker_h diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am index 768558f219..27cea8a952 100644 --- a/cpp/tests/Makefile.am +++ b/cpp/tests/Makefile.am @@ -9,15 +9,11 @@ INCLUDES = \ -I$(top_srcdir)/lib/common/framing \ $(APR_CXXFLAGS) -client_exe_tests = \ - client_test \ - echo_service \ - topic_listener \ - topic_publisher +# Unit tests broker_tests = \ AccumulatedAckTest \ - ChannelTest \ + BrokerChannelTest \ ConfigurationTest \ ExchangeTest \ HeadersExchangeTest \ @@ -34,12 +30,10 @@ broker_tests = \ TxBufferTest \ TxPublishTest \ ValueTest \ - MessageHandlerTest + MessageHandlerTest -# Tests that use the in-process BrokerSingleton to test client-broker -# interaction. Not strictly unit tests. -round_trip_tests = \ - RequestResponseTest +client_tests = \ + ClientChannelTest framing_tests = \ FieldTableTest \ @@ -48,17 +42,25 @@ framing_tests = \ misc_tests = \ ExceptionTest \ - ProducerConsumerTest + ProducerConsumerTest posix_tests = \ EventChannelTest \ EventChannelThreadsTest unit_tests = \ -b $(broker_tests) \ + $(broker_tests) \ + $(client_tests) \ $(framing_tests) \ - $(misc_tests) \ - $(round_trip_tests) + $(misc_tests) + +# Executable client tests + +client_exe_tests = \ + client_test \ + echo_service \ + topic_listener \ + topic_publisher noinst_PROGRAMS = $(client_exe_tests) diff --git a/cpp/tests/RequestResponseTest.cpp b/cpp/tests/RequestResponseTest.cpp deleted file mode 100644 index 1cb1a49275..0000000000 --- a/cpp/tests/RequestResponseTest.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include "BrokerSingleton.h" -#include "broker/Broker.h" -#include "client/Connection.h" -#include "client/ClientChannel.h" - -/** - * Round trip test using in-process broker. - * Verify request/response IDs - */ -class RequestResponseTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(RequestResponseTest); - CPPUNIT_TEST(testAsRequester); - CPPUNIT_TEST(testAsResponder); - CPPUNIT_TEST_SUITE_END(); - - qpid::broker::Broker::shared_ptr broker; - - public: - - void testAsRequester() { -// FIXME aconway 2007-01-22: CPPUNIT_FAIL("unfinished"); - } - void testAsResponder() { -// FIXME aconway 2007-01-22: CPPUNIT_FAIL("unfinished"); - } -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(RequestResponseTest); - - - -- cgit v1.2.1