diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-30 20:07:41 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-30 20:07:41 +0000 |
| commit | f9f848394de0662248cf62d4ec5e4818949403b2 (patch) | |
| tree | 4f13105e2223b704d7850300116dcc56116acae2 /cpp | |
| parent | 98ccae7574a18f8d0a1f9e28e86ccfde4541c81f (diff) | |
| download | qpid-python-f9f848394de0662248cf62d4ec5e4818949403b2.tar.gz | |
Andrew Stitcher <astitcher@redhat.com>
r723@fuschia: andrew | 2007-01-12 00:35:16 +0000
Branch for my work on Qpid.0-9
r724@fuschia: andrew | 2007-01-12 00:59:28 +0000
Added in empty implementation of handler class for protocol Message class
r768@fuschia: andrew | 2007-01-17 01:25:16 +0000
* Added Test for new MessageHandlerImpl (but no actual tests yet)
* Filled in lots of the blanks in the MessageHandlerImpl with code
stolen from the BasicHandlerImpl
r800@fuschia: andrew | 2007-01-17 17:34:13 +0000
Updated to latest upstream changes
r840@fuschia: andrew | 2007-01-19 00:31:59 +0000
Fixed merge errors
r841@fuschia: andrew | 2007-01-19 00:47:29 +0000
Another merge problem fixed
r878@fuschia: andrew | 2007-01-24 11:27:48 +0000
Started work on the Message class handler implementation
r976@fuschia: andrew | 2007-01-30 17:05:05 +0000
Working again after broker Message refactor
r980@fuschia: andrew | 2007-01-30 18:39:18 +0000
Fix for extra parameter to transfer
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501534 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 16 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 52 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 13 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessageBase.h | 268 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 134 | ||||
| -rw-r--r-- | cpp/lib/broker/Connection.cpp | 2 | ||||
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 38 | ||||
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.h | 3 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.cpp | 7 | ||||
| -rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.cpp | 6 | ||||
| -rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.h | 2 | ||||
| -rw-r--r-- | cpp/lib/common/framing/MethodContext.h | 9 | ||||
| -rw-r--r-- | cpp/tests/ChannelTest.cpp | 4 | ||||
| -rw-r--r-- | cpp/tests/ExchangeTest.cpp | 2 | ||||
| -rw-r--r-- | cpp/tests/InProcessBroker.h | 153 | ||||
| -rw-r--r-- | cpp/tests/MessageBuilderTest.cpp | 16 | ||||
| -rw-r--r-- | cpp/tests/MessageTest.cpp | 4 | ||||
| -rw-r--r-- | cpp/tests/QueueTest.cpp | 12 | ||||
| -rw-r--r-- | cpp/tests/TxAckTest.cpp | 2 | ||||
| -rw-r--r-- | cpp/tests/TxPublishTest.cpp | 2 |
20 files changed, 661 insertions, 84 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 5cf767a8e1..abf0b3852d 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -322,7 +322,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& conte void BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( - const MethodContext&, + const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& exchangeName, @@ -337,7 +337,7 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( exchange->unbind(queue, routingKey, &arguments); - connection.client->getQueue().unbindOk(channel.getId()); + connection.client->getQueue().unbindOk(context); } void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ @@ -420,7 +420,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate); + BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate); channel.handlePublish(msg, exchange); }else{ throw ChannelException( @@ -475,16 +475,16 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& ) } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context) { - connection.client->getChannel().ok(channel.getId()); - connection.client->getChannel().pong(channel.getId()); + connection.client->getChannel().ok(context); + connection.client->getChannel().pong(context); } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context) { - connection.client->getChannel().ok(channel.getId()); + connection.client->getChannel().ok(context); } void diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 07b14a4eff..a5192beede 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -33,7 +33,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -Message::Message(const ConnectionToken* const _publisher, +BasicMessage::BasicMessage(const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, bool _mandatory, bool _immediate) : publisher(_publisher), exchange(_exchange), @@ -44,23 +44,23 @@ Message::Message(const ConnectionToken* const _publisher, size(0), persistenceId(0) {} -Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : +BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ decode(buffer, headersOnly, contentChunkSize); } -Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} +BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} -Message::~Message(){ +BasicMessage::~BasicMessage(){ if (content.get()) content->destroy(); } -void Message::setHeader(AMQHeaderBody::shared_ptr _header){ +void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){ this->header = _header; } -void Message::addContent(AMQContentBody::shared_ptr data){ +void BasicMessage::addContent(AMQContentBody::shared_ptr data){ if (!content.get()) { content = std::auto_ptr<Content>(new InMemoryContent()); } @@ -68,15 +68,15 @@ void Message::addContent(AMQContentBody::shared_ptr data){ size += data->size(); } -bool Message::isComplete(){ +bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } -void Message::redeliver(){ +void BasicMessage::redeliver(){ redelivered = true; } -void Message::deliver(OutputHandler* out, int channel, +void BasicMessage::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize, ProtocolVersion* version){ @@ -85,7 +85,7 @@ void Message::deliver(OutputHandler* out, int channel, sendContent(out, channel, framesize, version); } -void Message::sendGetOk(OutputHandler* out, +void BasicMessage::sendGetOk(OutputHandler* out, int channel, u_int32_t messageCount, u_int64_t deliveryTag, @@ -96,7 +96,7 @@ void Message::sendGetOk(OutputHandler* out, sendContent(out, channel, framesize, version); } -void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ +void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); out->send(new AMQFrame(*version, channel, headerBody)); @@ -104,28 +104,28 @@ void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, if (content.get()) content->send(*version, out, channel, framesize); } -BasicHeaderProperties* Message::getHeaderProperties(){ +BasicHeaderProperties* BasicMessage::getHeaderProperties(){ return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); } -const ConnectionToken* const Message::getPublisher(){ +const ConnectionToken* const BasicMessage::getPublisher(){ return publisher; } -bool Message::isPersistent() +bool BasicMessage::isPersistent() { if(!header) return false; BasicHeaderProperties* props = getHeaderProperties(); return props && props->getDeliveryMode() == PERSISTENT; } -void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) +void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) { decodeHeader(buffer); if (!headersOnly) decodeContent(buffer, contentChunkSize); } -void Message::decodeHeader(Buffer& buffer) +void BasicMessage::decodeHeader(Buffer& buffer) { buffer.getShortString(exchange); buffer.getShortString(routingKey); @@ -136,7 +136,7 @@ void Message::decodeHeader(Buffer& buffer) setHeader(headerBody); } -void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) +void BasicMessage::decodeContent(Buffer& buffer, u_int32_t chunkSize) { u_int64_t expected = expectedContentSize(); if (expected != buffer.available()) { @@ -158,13 +158,13 @@ void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) } } -void Message::encode(Buffer& buffer) +void BasicMessage::encode(Buffer& buffer) { encodeHeader(buffer); encodeContent(buffer); } -void Message::encodeHeader(Buffer& buffer) +void BasicMessage::encodeHeader(Buffer& buffer) { buffer.putShortString(exchange); buffer.putShortString(routingKey); @@ -172,36 +172,36 @@ void Message::encodeHeader(Buffer& buffer) header->encode(buffer); } -void Message::encodeContent(Buffer& buffer) +void BasicMessage::encodeContent(Buffer& buffer) { Mutex::ScopedLock locker(contentLock); if (content.get()) content->encode(buffer); } -u_int32_t Message::encodedSize() +u_int32_t BasicMessage::encodedSize() { return encodedHeaderSize() + encodedContentSize(); } -u_int32_t Message::encodedContentSize() +u_int32_t BasicMessage::encodedContentSize() { Mutex::ScopedLock locker(contentLock); return content.get() ? content->size() : 0; } -u_int32_t Message::encodedHeaderSize() +u_int32_t BasicMessage::encodedHeaderSize() { return exchange.size() + 1 + routingKey.size() + 1 + header->size() + 4;//4 extra bytes for size } -u_int64_t Message::expectedContentSize() +u_int64_t BasicMessage::expectedContentSize() { return header.get() ? header->getContentSize() : 0; } -void Message::releaseContent(MessageStore* store) +void BasicMessage::releaseContent(MessageStore* store) { Mutex::ScopedLock locker(contentLock); if (!isPersistent() && persistenceId == 0) { @@ -217,7 +217,7 @@ void Message::releaseContent(MessageStore* store) } } -void Message::setContent(std::auto_ptr<Content>& _content) +void BasicMessage::setContent(std::auto_ptr<Content>& _content) { Mutex::ScopedLock locker(contentLock); content = _content; diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 388bfba51e..d9ab9b7220 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -22,6 +22,7 @@ * */ +#include <BrokerMessageBase.h> #include <memory> #include <boost/shared_ptr.hpp> #include <AMQContentBody.h> @@ -45,7 +46,7 @@ namespace qpid { * content bodies and some details about the publication * request. */ - class Message{ + class BasicMessage : public Message{ const ConnectionToken* const publisher; string exchange; string routingKey; @@ -62,14 +63,14 @@ namespace qpid { int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version); public: - typedef boost::shared_ptr<Message> shared_ptr; + typedef boost::shared_ptr<BasicMessage> shared_ptr; - Message(const ConnectionToken* const publisher, + BasicMessage(const ConnectionToken* const publisher, const string& exchange, const string& routingKey, bool mandatory, bool immediate); - Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); - Message(); - ~Message(); + BasicMessage(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); + BasicMessage(); + ~BasicMessage(); void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); void addContent(qpid::framing::AMQContentBody::shared_ptr data); bool isComplete(); diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h new file mode 100644 index 0000000000..e0139519ae --- /dev/null +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -0,0 +1,268 @@ +#ifndef _broker_BrokerMessageBase_h +#define _broker_BrokerMessageBase_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AMQContentBody.h" +#include "AMQHeaderBody.h" +#include "Content.h" + +#include <string> +#include <boost/shared_ptr.hpp> + +namespace qpid { + + namespace framing { + class OutputHandler; + class ProtocolVersion; + class BasicHeaderProperties; + } + + namespace broker { + + class MessageStore; + class ConnectionToken; + + /** + * Base class for all types of internal broker messages + * abstracting away the operations + * TODO; AMS: for the moment this is mostly a placeholder + */ + class Message{ + + public: + typedef boost::shared_ptr<Message> shared_ptr; + + virtual ~Message() {}; + + virtual void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void redeliver() = 0; + + virtual bool isComplete() = 0; + + virtual u_int64_t contentSize() const = 0; + virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0; + virtual bool isPersistent() = 0; + virtual const std::string& getRoutingKey() const = 0; + virtual const ConnectionToken* const getPublisher() = 0; + virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests? + virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests? + + virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests? + + virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedHeaderSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * (possibly partial) content held by this message + */ + virtual u_int32_t encodedContentSize() = 0; + /** + * If headers have been received, returns the expected + * content size else returns 0. + */ + virtual u_int64_t expectedContentSize() = 0; + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + virtual void releaseContent(MessageStore* /*store*/) {}; + + // TODO: AMS 29/1/2007 Don't think these are really part of base class + + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; + virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {}; + virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {}; + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ +#ifndef _broker_BrokerMessageBase_h +#define _broker_BrokerMessageBase_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AMQContentBody.h" +#include "AMQHeaderBody.h" +#include "Content.h" + +#include <string> +#include <boost/shared_ptr.hpp> + +namespace qpid { + + namespace framing { + class OutputHandler; + class ProtocolVersion; + class BasicHeaderProperties; + } + + namespace broker { + + class MessageStore; + class ConnectionToken; + + /** + * Base class for all types of internal broker messages + * abstracting away the operations + * TODO; AMS: for the moment this is mostly a placeholder + */ + class Message{ + + public: + typedef boost::shared_ptr<Message> shared_ptr; + + virtual ~Message() {}; + + virtual void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void redeliver() = 0; + + virtual bool isComplete() = 0; + + virtual u_int64_t contentSize() const = 0; + virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0; + virtual bool isPersistent() = 0; + virtual const std::string& getRoutingKey() const = 0; + virtual const ConnectionToken* const getPublisher() = 0; + virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests? + virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests? + + virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests? + + virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedHeaderSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * (possibly partial) content held by this message + */ + virtual u_int32_t encodedContentSize() = 0; + /** + * If headers have been received, returns the expected + * content size else returns 0. + */ + virtual u_int64_t expectedContentSize() = 0; + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + virtual void releaseContent(MessageStore* /*store*/) {}; + + // TODO: AMS 29/1/2007 Don't think these are really part of base class + + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; + virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {}; + virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {}; + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h new file mode 100644 index 0000000000..f25405db72 --- /dev/null +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -0,0 +1,134 @@ +#ifndef _broker_BrokerMessageMessage_h +#define _broker_BrokerMessageMessage_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerMessageBase.h" + +namespace qpid { + namespace broker { + class MessageMessage: public Message{ + + public: + ~MessageMessage(); + + void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void redeliver(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr data); + bool isComplete(); + void setContent(std::auto_ptr<Content>& content); + + u_int64_t contentSize() const; + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); + const std::string& getRoutingKey() const; + const ConnectionToken* const getPublisher(); + + u_int32_t encodedContentSize(); + u_int64_t expectedContentSize(); + void releaseContent(MessageStore* store); + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ +#ifndef _broker_BrokerMessageMessage_h +#define _broker_BrokerMessageMessage_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerMessageBase.h" + +namespace qpid { + namespace broker { + class MessageMessage: public Message{ + + public: + ~MessageMessage(); + + void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void redeliver(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr data); + bool isComplete(); + void setContent(std::auto_ptr<Content>& content); + + u_int64_t contentSize() const; + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); + const std::string& getRoutingKey() const; + const ConnectionToken* const getPublisher(); + + u_int32_t encodedContentSize(); + u_int64_t expectedContentSize(); + void releaseContent(MessageStore* store); + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index d34422c93d..0f58278a5a 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -74,7 +74,7 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) { string locales("en_US"); // TODO aconway 2007-01-16: Client call, move to adapter. client->getConnection().start( - MethodContext(0, &getAdapter(0)), + MethodContext(0, 0, &getAdapter(0)), header->getMajor(), header->getMinor(), properties, mechanisms, locales); getAdapter(0).init(0, *out, client->getProtocolVersion()); diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 33f7a63d45..7361d8827a 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -1,3 +1,4 @@ + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -18,8 +19,11 @@ #include "MessageHandlerImpl.h" #include "BrokerChannel.h" +#include "FramingContent.h" #include "Connection.h" #include "Broker.h" +#include "BrokerMessageMessage.h" + namespace qpid { namespace broker { @@ -41,7 +45,7 @@ void MessageHandlerImpl::cancel( const MethodContext& context, const string& destination ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature channel.cancel(destination); @@ -73,10 +77,9 @@ MessageHandlerImpl::consume(const MethodContext& context, bool exclusive, const qpid::framing::FieldTable& filter ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - Channel& channel = connection.getChannel(channel.getId()); if(!destination.empty() && channel.exists(destination)){ throw ConnectionException(530, "Consumer tags must be unique"); } @@ -108,7 +111,7 @@ MessageHandlerImpl::get( const MethodContext& context, const string& /*destination*/, bool noAck ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Queue::shared_ptr queue = connection.getQueue(queueName, context.channelId); @@ -146,7 +149,7 @@ MessageHandlerImpl::qos(const MethodContext& context, u_int16_t prefetchCount, bool /*global*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature //TODO: handle global channel.setPrefetchSize(prefetchSize); @@ -159,7 +162,7 @@ void MessageHandlerImpl::recover(const MethodContext&, bool requeue ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature channel.recover(requeue); @@ -182,18 +185,18 @@ MessageHandlerImpl::resume(const MethodContext&, } void -MessageHandlerImpl::transfer(const MethodContext&, +MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool immediate, + bool /*immediate*/, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& routingKey, + const string& /*routingKey*/, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -204,15 +207,24 @@ MessageHandlerImpl::transfer(const MethodContext&, const string& /*transactionId*/, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) + qpid::framing::Content body, + bool /*mandatory*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate); - channel.handlePublish(msg, exchange); + if (body.isInline()) { +// MessageMessage* msg = +// new MessageMessage(&connection, exchangeName, routingKey, immediate); +// channel.handlePublish(msg, exchange); + + connection.client->getMessageHandler()->ok(context); + } else { + // Don't handle reference content yet + assert(body.isInline()); + } }else{ throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); } diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 0eb9e119f5..985efe3847 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -114,7 +114,8 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH const std::string& transactionId, const std::string& securityToken, const framing::FieldTable& applicationHeaders, - framing::Content body ); + framing::Content body, + bool mandatory ); }; }} // namespace qpid::broker diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index bf6c44570d..78e340eb11 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -46,10 +46,11 @@ Connection::Connection( bool _debug, u_int32_t _max_frame_size, const framing::ProtocolVersion& _version ) : version(_version), max_frame_size(_max_frame_size), - defaultConnector(version, debug, max_frame_size), - connector(&defaultConnector), + defaultConnector(version, _debug, _max_frame_size), isOpen(false), debug(_debug) -{} +{ + setConnector(defaultConnector); +} Connection::~Connection(){ close(); diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp index 1fdb8d6691..653e47048e 100644 --- a/cpp/lib/common/framing/ChannelAdapter.cpp +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -29,7 +29,7 @@ void ChannelAdapter::init( id = i; out = &o; version = v; - context = MethodContext(id, this); + context = MethodContext(0, id, this); } void ChannelAdapter::send(AMQFrame* frame) { @@ -59,7 +59,7 @@ void ChannelAdapter::send(AMQBody::shared_ptr body) { void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { assertMethodOk(*request); responder.received(request->getData()); - context =MethodContext(id, this, request->getRequestId()); + context =MethodContext(request.get(), id, this, request->getRequestId()); handleMethodInContext(request, context); } @@ -73,7 +73,7 @@ void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) { assertMethodOk(*method); - context = MethodContext(id, this); + context = MethodContext(method.get(), id, this); handleMethodInContext(method, context); } diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h index b2a5ef6ff5..f0b3d2469a 100644 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -54,7 +54,7 @@ class ChannelAdapter : public BodyHandler, public OutputHandler { /** *@param output Processed frames are forwarded to this handler. */ - ChannelAdapter() : context(0), id(0), out(0) {} + ChannelAdapter() : context(0, 0), id(0), out(0) {} /** Initialize the channel adapter. */ void init(ChannelId, OutputHandler&, const ProtocolVersion&); diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index 46d2e064b5..54e05f0fb2 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -26,6 +26,7 @@ namespace qpid { namespace framing { class BodyHandler; +class AMQMethodBody; /** * Invocation context for an AMQP method. @@ -46,8 +47,10 @@ struct MethodContext * will automatically construct the MethodContext. */ MethodContext( + const AMQMethodBody* method, ChannelId channel, OutputHandler* output=0, RequestId request=0) - : channelId(channel), out(output), requestId(request){} + : channelId(channel), out(output), requestId(request), + methodBody(method) {} /** \internal Channel on which the method is sent. */ ChannelId channelId; @@ -60,6 +63,10 @@ struct MethodContext */ RequestId requestId; + /** \internal This is the Method Body itself + * It's useful for passing around instead of unpacking all its parameters + */ + const AMQMethodBody* methodBody; }; }} // namespace qpid::framing diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index b31ff6a321..760a4d3344 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -221,7 +221,7 @@ class ChannelTest : public CppUnit::TestCase Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); const string data[] = {"abcde", "fghij", "klmno"}; - Message* msg = new Message(0, "my_exchange", "my_routing_key", false, false); + Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); store.expect(); store.stage(msg); @@ -309,7 +309,7 @@ class ChannelTest : public CppUnit::TestCase Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize) { - Message* msg = new Message(0, exchange, routingKey, false, false); + BasicMessage* msg = new BasicMessage(0, exchange, routingKey, false, false); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(contentSize); msg->setHeader(header); diff --git a/cpp/tests/ExchangeTest.cpp b/cpp/tests/ExchangeTest.cpp index 8fef4ccaac..a31c369fe1 100644 --- a/cpp/tests/ExchangeTest.cpp +++ b/cpp/tests/ExchangeTest.cpp @@ -54,7 +54,7 @@ class ExchangeTest : public CppUnit::TestCase queue.reset(); queue2.reset(); - Message::shared_ptr msgPtr(new Message(0, "e", "A", true, true)); + Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true)); DeliverableMessage msg(msgPtr); topic.route(msg, "abc", 0); direct.route(msg, "abc", 0); diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h index 4ef352e677..af0f4e84fe 100644 --- a/cpp/tests/InProcessBroker.h +++ b/cpp/tests/InProcessBroker.h @@ -151,3 +151,156 @@ std::ostream& operator<<( }} // namespace qpid::broker #endif /*!_tests_InProcessBroker_h*/ +#ifndef _tests_InProcessBroker_h +#define _tests_InProcessBroker_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <vector> +#include <iostream> +#include <algorithm> + +#include "framing/AMQFrame.h" +#include "broker/Broker.h" +#include "broker/Connection.h" +#include "client/Connector.h" + +namespace qpid { +namespace broker { + +/** Make a copy of a frame body. Inefficient, only intended for tests. */ +// TODO aconway 2007-01-29: from should be const, need to fix +// AMQPFrame::encode as const. +framing::AMQFrame copy(framing::AMQFrame& from) { + framing::Buffer buffer(from.size()); + from.encode(buffer); + buffer.flip(); + framing::AMQFrame result; + result.decode(buffer); + return result; +} + +/** + * A broker that implements client::Connector allowing direct + * in-process connection of client to broker. Used to write round-trip + * tests without requiring an external broker process. + * + * Also allows you to "snoop" on frames exchanged between client & broker. + * + * Use as follows: + * + \code + broker::InProcessBroker ibroker(version); + client::Connection clientConnection; + clientConnection.setConnector(ibroker); + clientConnection.open(""); + ... use as normal + \endcode + * + */ +class InProcessBroker : public client::Connector { + public: + enum Sender {CLIENT,BROKER}; + struct Frame : public framing::AMQFrame { + Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {} + bool fromBroker() const { return from == BROKER; } + bool fromClient() const { return from == CLIENT; } + + template <class MethodType> + MethodType* asMethod() { + return dynamic_cast<MethodType*>(getBody().get()); + } + + Sender from; + }; + typedef std::vector<Frame> Conversation; + + InProcessBroker(const framing::ProtocolVersion& ver) : + Connector(ver), + protocolInit(ver), + broker(broker::Broker::create()), + brokerOut(BROKER, conversation), + brokerConnection(&brokerOut, *broker), + clientOut(CLIENT, conversation, &brokerConnection) + {} + + void connect(const std::string& /*host*/, int /*port*/) {} + void init() { brokerConnection.initiated(&protocolInit); } + void close() {} + + /** Client's input handler. */ + void setInputHandler(framing::InputHandler* handler) { + brokerOut.in = handler; + } + + /** Called by client to send a frame */ + void send(framing::AMQFrame* frame) { + clientOut.send(frame); + } + + /** Entire client-broker conversation is recorded here */ + Conversation conversation; + + private: + /** OutputHandler that forwards data to an InputHandler */ + struct OutputToInputHandler : public sys::ConnectionOutputHandler { + OutputToInputHandler( + Sender from_, Conversation& conversation_, + framing::InputHandler* ih=0 + ) : from(from_), conversation(conversation_), in(ih) {} + + void send(framing::AMQFrame* frame) { + conversation.push_back(Frame(from, copy(*frame))); + in->received(frame); + } + + void close() {} + + Sender from; + Conversation& conversation; + framing::InputHandler* in; + }; + + framing::ProtocolInitiation protocolInit; + Broker::shared_ptr broker; + OutputToInputHandler brokerOut; + broker::Connection brokerConnection; + OutputToInputHandler clientOut; +}; + +std::ostream& operator<<( + std::ostream& out, const InProcessBroker::Frame& frame) +{ + return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") << + static_cast<const framing::AMQFrame&>(frame); +} +std::ostream& operator<<( + std::ostream& out, const InProcessBroker::Conversation& conv) +{ + for (InProcessBroker::Conversation::const_iterator i = conv.begin(); + i != conv.end(); ++i) + { + out << *i << std::endl; + } + return out; +} + + +}} // namespace qpid::broker + +#endif /*!_tests_InProcessBroker_h*/ diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp index 21f5935218..e84d1df0e7 100644 --- a/cpp/tests/MessageBuilderTest.cpp +++ b/cpp/tests/MessageBuilderTest.cpp @@ -74,14 +74,14 @@ class MessageBuilderTest : public CppUnit::TestCase // Don't hide overloads. using NullMessageStore::destroy; - void destroy(Message* msg) + void destroy(BasicMessage* msg) { CPPUNIT_ASSERT(msg->getPersistenceId()); } - Message::shared_ptr getRestoredMessage() + BasicMessage::shared_ptr getRestoredMessage() { - Message::shared_ptr msg(new Message()); + BasicMessage::shared_ptr msg(new BasicMessage()); if (header) { header->flip(); msg->decodeHeader(*header); @@ -116,7 +116,7 @@ class MessageBuilderTest : public CppUnit::TestCase DummyHandler handler; MessageBuilder builder(&handler); - Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(0); @@ -133,7 +133,7 @@ class MessageBuilderTest : public CppUnit::TestCase string data1("abcdefg"); - Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(7); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -154,7 +154,7 @@ class MessageBuilderTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -183,7 +183,7 @@ class MessageBuilderTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); @@ -200,7 +200,7 @@ class MessageBuilderTest : public CppUnit::TestCase CPPUNIT_ASSERT(handler.msg); CPPUNIT_ASSERT_EQUAL(message, handler.msg); - Message::shared_ptr restored = store.getRestoredMessage(); + BasicMessage::shared_ptr restored = store.getRestoredMessage(); CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp index 8bb570e598..62249e1f5f 100644 --- a/cpp/tests/MessageTest.cpp +++ b/cpp/tests/MessageTest.cpp @@ -52,7 +52,7 @@ class MessageTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - Message::shared_ptr msg = Message::shared_ptr(new Message(0, exchange, routingKey, false, false)); + Message::shared_ptr msg = Message::shared_ptr(new BasicMessage(0, exchange, routingKey, false, false)); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -69,7 +69,7 @@ class MessageTest : public CppUnit::TestCase msg->encode(buffer); buffer.flip(); - msg = Message::shared_ptr(new Message(buffer)); + msg = Message::shared_ptr(new BasicMessage(buffer)); CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange()); CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId()); diff --git a/cpp/tests/QueueTest.cpp b/cpp/tests/QueueTest.cpp index 9d655781c1..e156efc507 100644 --- a/cpp/tests/QueueTest.cpp +++ b/cpp/tests/QueueTest.cpp @@ -66,9 +66,9 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount()); //Test basic delivery: - Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); - Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); - Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); + Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true)); + Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true)); + Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true)); queue->deliver(msg1); CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); @@ -123,9 +123,9 @@ class QueueTest : public CppUnit::TestCase void testDequeue(){ Queue::shared_ptr queue(new Queue("my_queue", true)); - Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); - Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); - Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); + Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true)); + Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true)); + Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true)); Message::shared_ptr received; queue->deliver(msg1); diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp index 0ffe984ded..a6fe0f1010 100644 --- a/cpp/tests/TxAckTest.cpp +++ b/cpp/tests/TxAckTest.cpp @@ -69,7 +69,7 @@ public: TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid) { for(int i = 0; i < 10; i++){ - Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false)); + Message::shared_ptr msg(new BasicMessage(0, "exchange", "routing_key", false, false)); msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); messages.push_back(msg); diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp index 3542e08f45..87658af62e 100644 --- a/cpp/tests/TxPublishTest.cpp +++ b/cpp/tests/TxPublishTest.cpp @@ -75,7 +75,7 @@ public: TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), queue2(new Queue("queue2", false, &store, 0)), - msg(new Message(0, "exchange", "routing_key", false, false)), + msg(new BasicMessage(0, "exchange", "routing_key", false, false)), op(msg, &xid) { msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); |
