diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-17 03:46:44 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-17 03:46:44 +0000 |
| commit | f972bfa6c7c9a1b3d5f5fe4753dce7358b6b0215 (patch) | |
| tree | a3d37ef1999eb4ebdb8bb9d6eaea8b2d3de5dfe2 /cpp/lib | |
| parent | 72de13352dc9c42acfe95a1d76f049c507eb5cfd (diff) | |
| download | qpid-python-f972bfa6c7c9a1b3d5f5fe4753dce7358b6b0215.tar.gz | |
* cpp/lib/client/ClientMessage.h/.cpp: Added Message constructor taking data.
* cpp/lib/client/IncomingMessage.cpp: Simplified message aggregation.
* cpp/lib/common/framing/AMQ*Body.h: remove unnecessary virtual inheritance.
* cpp/lib/common/framing/AMQMethodBody.h: add bool isRequest, isResponse
* cpp/lib/common/sys/Mutex.h (class ScopedUnlock): Added ScopedUnlock
* cpp/lib/common/sys/ThreadSafeQueue.h: Thread-safe queue template.
* cpp/tests/BrokerChannelTest.cpp: Renamed from ChannelTest.
* cpp/tests/ClientChannelTest.cpp: Test client API.
* cpp/tests/RequestResponseTest.cpp: Removed empty test.
* cpp/lib/client/Connection.h,.cpp:
- non-static channelIdCounter
- No close() in dtor.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@508705 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 4 | ||||
| -rw-r--r-- | cpp/lib/client/ClientMessage.cpp | 11 | ||||
| -rw-r--r-- | cpp/lib/client/ClientMessage.h | 14 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.cpp | 8 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.h | 2 | ||||
| -rw-r--r-- | cpp/lib/client/IncomingMessage.cpp | 22 | ||||
| -rw-r--r-- | cpp/lib/client/IncomingMessage.h | 6 | ||||
| -rw-r--r-- | cpp/lib/common/ExceptionHolder.h | 1 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQContentBody.h | 2 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQHeaderBody.h | 2 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQHeartbeatBody.h | 2 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQMethodBody.h | 3 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQRequestBody.h | 1 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQResponseBody.h | 1 | ||||
| -rw-r--r-- | cpp/lib/common/sys/Mutex.h | 11 | ||||
| -rw-r--r-- | cpp/lib/common/sys/ThreadSafeQueue.h | 99 |
16 files changed, 148 insertions, 41 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index 9d1c8ec011..4dff8a62f7 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -245,7 +245,7 @@ void Channel::retrieve(Message& msg){ msg.header = retrieved->getHeader(); msg.deliveryTag = retrieved->getDeliveryTag(); - retrieved->getData(msg.data); + msg.data = retrieved->getData(); delete retrieved; retrieved = 0; } @@ -446,7 +446,7 @@ void Channel::dispatch(){ if(incomingMsg){ //Note: msg is currently only valid for duration of this call Message msg(incomingMsg->getHeader()); - incomingMsg->getData(msg.data); + msg.data = incomingMsg->getData(); if(incomingMsg->isReturn()){ if(returnsHandler == 0){ //print warning to log/console diff --git a/cpp/lib/client/ClientMessage.cpp b/cpp/lib/client/ClientMessage.cpp index e8a2a6019e..8b08f7e535 100644 --- a/cpp/lib/client/ClientMessage.cpp +++ b/cpp/lib/client/ClientMessage.cpp @@ -23,8 +23,15 @@ using namespace qpid::client; using namespace qpid::framing; -Message::Message(){ - header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)); +Message::Message(const std::string& d) + : header(new AMQHeaderBody(BASIC)) +{ + setData(d); +} + +void Message::setData(const std::string& d) { + data = d; + header->setContentSize(d.size()); } Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ diff --git a/cpp/lib/client/ClientMessage.h b/cpp/lib/client/ClientMessage.h index a5d204d35a..148f9240c8 100644 --- a/cpp/lib/client/ClientMessage.h +++ b/cpp/lib/client/ClientMessage.h @@ -41,8 +41,9 @@ namespace client { qpid::framing::BasicHeaderProperties* getHeaderProperties(); Message(qpid::framing::AMQHeaderBody::shared_ptr& header); + public: - Message(); + Message(const std::string& data=std::string()); ~Message(); /** @@ -51,14 +52,15 @@ namespace client { * * @return a string representing the data of the message */ - inline std::string getData(){ return data; } + std::string getData() const { return data; } + /** * Allows the application to set the content of messages to be * sent. * * @param data a string representing the data of the message */ - inline void setData(const std::string& _data){ data = _data; } + void setData(const std::string& _data); /** * @return true if this message was delivered previously (to @@ -103,10 +105,10 @@ namespace client { void setClusterId(const std::string& clusterId); - friend class Channel; + // TODO aconway 2007-02-15: remove friendships. + friend class Channel; }; -} -} +}} #endif /*!_client_ClientMessage_h*/ diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 2f91c44a22..5b97ca8e5d 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -37,23 +37,19 @@ using namespace qpid::sys; namespace qpid { namespace client { -ChannelId Connection::channelIdCounter; - const std::string Connection::OK("OK"); Connection::Connection( bool _debug, u_int32_t _max_frame_size, framing::ProtocolVersion _version -) : version(_version), max_frame_size(_max_frame_size), +) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), defaultConnector(version, _debug, _max_frame_size), isOpen(false), debug(_debug) { setConnector(defaultConnector); } -Connection::~Connection(){ - close(); -} +Connection::~Connection(){} void Connection::setConnector(Connector& con) { diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 275e02a105..6a8d88d581 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -86,7 +86,7 @@ class Connection : public ConnectionForChannel { typedef std::map<framing::ChannelId, Channel*> ChannelMap; - static framing::ChannelId channelIdCounter; + framing::ChannelId channelIdCounter; static const std::string OK; framing::ProtocolVersion version; diff --git a/cpp/lib/client/IncomingMessage.cpp b/cpp/lib/client/IncomingMessage.cpp index 2ff143ba94..c1f6ca880f 100644 --- a/cpp/lib/client/IncomingMessage.cpp +++ b/cpp/lib/client/IncomingMessage.cpp @@ -36,12 +36,12 @@ void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){ this->header = _header; } -void IncomingMessage::addContent(AMQContentBody::shared_ptr _content){ - this->content.push_back(_content); +void IncomingMessage::addContent(AMQContentBody::shared_ptr content){ + data.append(content->getData()); } bool IncomingMessage::isComplete(){ - return header != 0 && header->getContentSize() == contentSize(); + return header != 0 && header->getContentSize() == data.size(); } bool IncomingMessage::isReturn(){ @@ -70,19 +70,7 @@ AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){ return header; } -void IncomingMessage::getData(string& s){ - int count(content.size()); - for(int i = 0; i < count; i++){ - if(i == 0) s = content[i]->getData(); - else s += content[i]->getData(); - } +std::string IncomingMessage::getData() const { + return data; } -u_int64_t IncomingMessage::contentSize(){ - u_int64_t size(0); - u_int64_t count(content.size()); - for(u_int64_t i = 0; i < count; i++){ - size += content[i]->size(); - } - return size; -} diff --git a/cpp/lib/client/IncomingMessage.h b/cpp/lib/client/IncomingMessage.h index 464e05d877..a2aa4d8441 100644 --- a/cpp/lib/client/IncomingMessage.h +++ b/cpp/lib/client/IncomingMessage.h @@ -36,9 +36,7 @@ namespace client { qpid::framing::BasicReturnBody::shared_ptr returned; qpid::framing::BasicGetOkBody::shared_ptr response; qpid::framing::AMQHeaderBody::shared_ptr header; - std::vector<qpid::framing::AMQContentBody::shared_ptr> content; - - u_int64_t contentSize(); + std::string data; public: IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro); IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro); @@ -53,7 +51,7 @@ namespace client { const std::string& getConsumerTag();//only relevant if isDelivery() qpid::framing::AMQHeaderBody::shared_ptr& getHeader(); u_int64_t getDeliveryTag(); - void getData(std::string& data); + std::string getData() const; }; } diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h index 83d0884be9..1e5a30e27c 100644 --- a/cpp/lib/common/ExceptionHolder.h +++ b/cpp/lib/common/ExceptionHolder.h @@ -53,6 +53,7 @@ class ExceptionHolder : public Exception, public boost::shared_ptr<Exception> std::string toString() const throw() { return (*this)->toString(); } virtual Exception* clone() const throw() { return (*this)->clone(); } virtual void throwSelf() const { (*this)->throwSelf(); } + virtual void throwIf() const { if (*this) (*this)->throwSelf(); } }; } // namespace qpid diff --git a/cpp/lib/common/framing/AMQContentBody.h b/cpp/lib/common/framing/AMQContentBody.h index 172228671a..a324f1ab21 100644 --- a/cpp/lib/common/framing/AMQContentBody.h +++ b/cpp/lib/common/framing/AMQContentBody.h @@ -28,7 +28,7 @@ namespace qpid { namespace framing { -class AMQContentBody : virtual public AMQBody +class AMQContentBody : public AMQBody { string data; diff --git a/cpp/lib/common/framing/AMQHeaderBody.h b/cpp/lib/common/framing/AMQHeaderBody.h index 31cf7d575e..77f56d9a93 100644 --- a/cpp/lib/common/framing/AMQHeaderBody.h +++ b/cpp/lib/common/framing/AMQHeaderBody.h @@ -29,7 +29,7 @@ namespace qpid { namespace framing { -class AMQHeaderBody : virtual public AMQBody +class AMQHeaderBody : public AMQBody { HeaderProperties* properties; u_int16_t weight; diff --git a/cpp/lib/common/framing/AMQHeartbeatBody.h b/cpp/lib/common/framing/AMQHeartbeatBody.h index a2315119e4..20a954c468 100644 --- a/cpp/lib/common/framing/AMQHeartbeatBody.h +++ b/cpp/lib/common/framing/AMQHeartbeatBody.h @@ -28,7 +28,7 @@ namespace qpid { namespace framing { -class AMQHeartbeatBody : virtual public AMQBody +class AMQHeartbeatBody : public AMQBody { public: typedef boost::shared_ptr<AMQHeartbeatBody> shared_ptr; diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h index 492571c83c..9b39a3d721 100644 --- a/cpp/lib/common/framing/AMQMethodBody.h +++ b/cpp/lib/common/framing/AMQMethodBody.h @@ -60,6 +60,9 @@ class AMQMethodBody : public AMQBody /** Return request ID or response correlationID */ virtual RequestId getRequestId() const { return 0; } + virtual bool isRequest() const { return false; } + virtual bool isResponse() const { return false; } + protected: static u_int32_t baseSize() { return 4; } diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h index 9b8fc1d663..d9eb78c26f 100644 --- a/cpp/lib/common/framing/AMQRequestBody.h +++ b/cpp/lib/common/framing/AMQRequestBody.h @@ -62,6 +62,7 @@ class AMQRequestBody : public AMQMethodBody void setRequestId(RequestId id) { data.requestId=id; } void setResponseMark(ResponseId mark) { data.responseMark=mark; } + bool isRequest()const { return true; } protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } void printPrefix(std::ostream& out) const; diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h index 3954836ec8..5ed76ff67d 100644 --- a/cpp/lib/common/framing/AMQResponseBody.h +++ b/cpp/lib/common/framing/AMQResponseBody.h @@ -69,6 +69,7 @@ class AMQResponseBody : public AMQMethodBody void setRequestId(RequestId id) { data.requestId = id; } void setBatchOffset(BatchOffset id) { data.batchOffset = id; } + bool isResponse() const { return true; } protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } void printPrefix(std::ostream& out) const; diff --git a/cpp/lib/common/sys/Mutex.h b/cpp/lib/common/sys/Mutex.h index 4022da2f6e..87d537fb9e 100644 --- a/cpp/lib/common/sys/Mutex.h +++ b/cpp/lib/common/sys/Mutex.h @@ -46,12 +46,23 @@ class ScopedLock L& mutex; }; +template <class L> +class ScopedUnlock +{ + public: + ScopedUnlock(L& l) : mutex(l) { l.unlock(); } + ~ScopedUnlock() { mutex.lock(); } + private: + L& mutex; +}; + /** * Mutex lock. */ class Mutex : private boost::noncopyable { public: typedef ScopedLock<Mutex> ScopedLock; + typedef ScopedUnlock<Mutex> ScopedUnlock; inline Mutex(); inline ~Mutex(); diff --git a/cpp/lib/common/sys/ThreadSafeQueue.h b/cpp/lib/common/sys/ThreadSafeQueue.h new file mode 100644 index 0000000000..89454d693f --- /dev/null +++ b/cpp/lib/common/sys/ThreadSafeQueue.h @@ -0,0 +1,99 @@ +#ifndef _sys_ThreadSafeQueue_h +#define _sys_ThreadSafeQueue_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <deque> +#include "ProducerConsumer.h" +#include "Exception.h" + +namespace qpid { +namespace sys { + +/** + * A thread safe queue template. + */ +template <class T, class ContainerType=std::deque<T> > +class ThreadSafeQueue +{ + public: + struct QueueStoppedException : public Exception {}; + + ThreadSafeQueue() {} + + /** Push a value onto the back of the queue */ + void push(const T& value) { + ProducerConsumer::ProducerLock producer(pc); + if (producer.isOk()) { + producer.confirm(); + container.push_back(value); + } + } + + /** Pop a value from the front of the queue. Waits till value is available. + *@throw QueueStoppedException if queue is stopped while waiting. + */ + T pop() { + ProducerConsumer::ConsumerLock consumer(pc); + if (consumer.isOk()) { + consumer.confirm(); + T value(container.front()); + container.pop_front(); + return value; + } + throw QueueStoppedException(); + } + + /** + * If a value becomes available within the timeout, set outValue + * and return true. Otherwise return false; + */ + bool pop(T& outValue, const Time& timeout) { + ProducerConsumer::ConsumerLock consumer(pc, timeout); + if (consumer.isOk()) { + consumer.confirm(); + outValue = container.front(); + container.pop_front(); + return true; + } + return false; + } + + /** Interrupt threads waiting in pop() */ + void stop() { pc.stop(); } + + /** True if queue is stopped */ + bool isStopped() { return pc.isStopped(); } + + /** Size of the queue */ + size_t size() { ProducerConsumer::Lock l(pc); return container.size(); } + + /** True if queue is empty */ + bool empty() { ProducerConsumer::Lock l(pc); return container.empty(); } + + private: + ProducerConsumer pc; + ContainerType container; +}; + +}} // namespace qpid::sys + + + +#endif /*!_sys_ThreadSafeQueue_h*/ |
