From f9f848394de0662248cf62d4ec5e4818949403b2 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 30 Jan 2007 20:07:41 +0000 Subject: Andrew Stitcher r723@fuschia: andrew | 2007-01-12 00:35:16 +0000 Branch for my work on Qpid.0-9 r724@fuschia: andrew | 2007-01-12 00:59:28 +0000 Added in empty implementation of handler class for protocol Message class r768@fuschia: andrew | 2007-01-17 01:25:16 +0000 * Added Test for new MessageHandlerImpl (but no actual tests yet) * Filled in lots of the blanks in the MessageHandlerImpl with code stolen from the BasicHandlerImpl r800@fuschia: andrew | 2007-01-17 17:34:13 +0000 Updated to latest upstream changes r840@fuschia: andrew | 2007-01-19 00:31:59 +0000 Fixed merge errors r841@fuschia: andrew | 2007-01-19 00:47:29 +0000 Another merge problem fixed r878@fuschia: andrew | 2007-01-24 11:27:48 +0000 Started work on the Message class handler implementation r976@fuschia: andrew | 2007-01-30 17:05:05 +0000 Working again after broker Message refactor r980@fuschia: andrew | 2007-01-30 18:39:18 +0000 Fix for extra parameter to transfer git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501534 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/lib/broker/BrokerAdapter.cpp | 16 +- cpp/lib/broker/BrokerMessage.cpp | 52 +++--- cpp/lib/broker/BrokerMessage.h | 13 +- cpp/lib/broker/BrokerMessageBase.h | 268 ++++++++++++++++++++++++++++++ cpp/lib/broker/BrokerMessageMessage.h | 134 +++++++++++++++ cpp/lib/broker/Connection.cpp | 2 +- cpp/lib/broker/MessageHandlerImpl.cpp | 38 +++-- cpp/lib/broker/MessageHandlerImpl.h | 3 +- cpp/lib/client/Connection.cpp | 7 +- cpp/lib/common/framing/ChannelAdapter.cpp | 6 +- cpp/lib/common/framing/ChannelAdapter.h | 2 +- cpp/lib/common/framing/MethodContext.h | 9 +- 12 files changed, 487 insertions(+), 63 deletions(-) create mode 100644 cpp/lib/broker/BrokerMessageBase.h create mode 100644 cpp/lib/broker/BrokerMessageMessage.h (limited to 'cpp/lib') diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 5cf767a8e1..abf0b3852d 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -322,7 +322,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& conte void BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( - const MethodContext&, + const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& exchangeName, @@ -337,7 +337,7 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( exchange->unbind(queue, routingKey, &arguments); - connection.client->getQueue().unbindOk(channel.getId()); + connection.client->getQueue().unbindOk(context); } void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ @@ -420,7 +420,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate); + BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate); channel.handlePublish(msg, exchange); }else{ throw ChannelException( @@ -475,16 +475,16 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& ) } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context) { - connection.client->getChannel().ok(channel.getId()); - connection.client->getChannel().pong(channel.getId()); + connection.client->getChannel().ok(context); + connection.client->getChannel().pong(context); } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context) { - connection.client->getChannel().ok(channel.getId()); + connection.client->getChannel().ok(context); } void diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 07b14a4eff..a5192beede 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -33,7 +33,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -Message::Message(const ConnectionToken* const _publisher, +BasicMessage::BasicMessage(const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, bool _mandatory, bool _immediate) : publisher(_publisher), exchange(_exchange), @@ -44,23 +44,23 @@ Message::Message(const ConnectionToken* const _publisher, size(0), persistenceId(0) {} -Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : +BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ decode(buffer, headersOnly, contentChunkSize); } -Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} +BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} -Message::~Message(){ +BasicMessage::~BasicMessage(){ if (content.get()) content->destroy(); } -void Message::setHeader(AMQHeaderBody::shared_ptr _header){ +void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){ this->header = _header; } -void Message::addContent(AMQContentBody::shared_ptr data){ +void BasicMessage::addContent(AMQContentBody::shared_ptr data){ if (!content.get()) { content = std::auto_ptr(new InMemoryContent()); } @@ -68,15 +68,15 @@ void Message::addContent(AMQContentBody::shared_ptr data){ size += data->size(); } -bool Message::isComplete(){ +bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } -void Message::redeliver(){ +void BasicMessage::redeliver(){ redelivered = true; } -void Message::deliver(OutputHandler* out, int channel, +void BasicMessage::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize, ProtocolVersion* version){ @@ -85,7 +85,7 @@ void Message::deliver(OutputHandler* out, int channel, sendContent(out, channel, framesize, version); } -void Message::sendGetOk(OutputHandler* out, +void BasicMessage::sendGetOk(OutputHandler* out, int channel, u_int32_t messageCount, u_int64_t deliveryTag, @@ -96,7 +96,7 @@ void Message::sendGetOk(OutputHandler* out, sendContent(out, channel, framesize, version); } -void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ +void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ AMQBody::shared_ptr headerBody = static_pointer_cast(header); out->send(new AMQFrame(*version, channel, headerBody)); @@ -104,28 +104,28 @@ void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, if (content.get()) content->send(*version, out, channel, framesize); } -BasicHeaderProperties* Message::getHeaderProperties(){ +BasicHeaderProperties* BasicMessage::getHeaderProperties(){ return dynamic_cast(header->getProperties()); } -const ConnectionToken* const Message::getPublisher(){ +const ConnectionToken* const BasicMessage::getPublisher(){ return publisher; } -bool Message::isPersistent() +bool BasicMessage::isPersistent() { if(!header) return false; BasicHeaderProperties* props = getHeaderProperties(); return props && props->getDeliveryMode() == PERSISTENT; } -void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) +void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) { decodeHeader(buffer); if (!headersOnly) decodeContent(buffer, contentChunkSize); } -void Message::decodeHeader(Buffer& buffer) +void BasicMessage::decodeHeader(Buffer& buffer) { buffer.getShortString(exchange); buffer.getShortString(routingKey); @@ -136,7 +136,7 @@ void Message::decodeHeader(Buffer& buffer) setHeader(headerBody); } -void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) +void BasicMessage::decodeContent(Buffer& buffer, u_int32_t chunkSize) { u_int64_t expected = expectedContentSize(); if (expected != buffer.available()) { @@ -158,13 +158,13 @@ void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) } } -void Message::encode(Buffer& buffer) +void BasicMessage::encode(Buffer& buffer) { encodeHeader(buffer); encodeContent(buffer); } -void Message::encodeHeader(Buffer& buffer) +void BasicMessage::encodeHeader(Buffer& buffer) { buffer.putShortString(exchange); buffer.putShortString(routingKey); @@ -172,36 +172,36 @@ void Message::encodeHeader(Buffer& buffer) header->encode(buffer); } -void Message::encodeContent(Buffer& buffer) +void BasicMessage::encodeContent(Buffer& buffer) { Mutex::ScopedLock locker(contentLock); if (content.get()) content->encode(buffer); } -u_int32_t Message::encodedSize() +u_int32_t BasicMessage::encodedSize() { return encodedHeaderSize() + encodedContentSize(); } -u_int32_t Message::encodedContentSize() +u_int32_t BasicMessage::encodedContentSize() { Mutex::ScopedLock locker(contentLock); return content.get() ? content->size() : 0; } -u_int32_t Message::encodedHeaderSize() +u_int32_t BasicMessage::encodedHeaderSize() { return exchange.size() + 1 + routingKey.size() + 1 + header->size() + 4;//4 extra bytes for size } -u_int64_t Message::expectedContentSize() +u_int64_t BasicMessage::expectedContentSize() { return header.get() ? header->getContentSize() : 0; } -void Message::releaseContent(MessageStore* store) +void BasicMessage::releaseContent(MessageStore* store) { Mutex::ScopedLock locker(contentLock); if (!isPersistent() && persistenceId == 0) { @@ -217,7 +217,7 @@ void Message::releaseContent(MessageStore* store) } } -void Message::setContent(std::auto_ptr& _content) +void BasicMessage::setContent(std::auto_ptr& _content) { Mutex::ScopedLock locker(contentLock); content = _content; diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 388bfba51e..d9ab9b7220 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -22,6 +22,7 @@ * */ +#include #include #include #include @@ -45,7 +46,7 @@ namespace qpid { * content bodies and some details about the publication * request. */ - class Message{ + class BasicMessage : public Message{ const ConnectionToken* const publisher; string exchange; string routingKey; @@ -62,14 +63,14 @@ namespace qpid { int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version); public: - typedef boost::shared_ptr shared_ptr; + typedef boost::shared_ptr shared_ptr; - Message(const ConnectionToken* const publisher, + BasicMessage(const ConnectionToken* const publisher, const string& exchange, const string& routingKey, bool mandatory, bool immediate); - Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); - Message(); - ~Message(); + BasicMessage(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); + BasicMessage(); + ~BasicMessage(); void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); void addContent(qpid::framing::AMQContentBody::shared_ptr data); bool isComplete(); diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h new file mode 100644 index 0000000000..e0139519ae --- /dev/null +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -0,0 +1,268 @@ +#ifndef _broker_BrokerMessageBase_h +#define _broker_BrokerMessageBase_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AMQContentBody.h" +#include "AMQHeaderBody.h" +#include "Content.h" + +#include +#include + +namespace qpid { + + namespace framing { + class OutputHandler; + class ProtocolVersion; + class BasicHeaderProperties; + } + + namespace broker { + + class MessageStore; + class ConnectionToken; + + /** + * Base class for all types of internal broker messages + * abstracting away the operations + * TODO; AMS: for the moment this is mostly a placeholder + */ + class Message{ + + public: + typedef boost::shared_ptr shared_ptr; + + virtual ~Message() {}; + + virtual void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void redeliver() = 0; + + virtual bool isComplete() = 0; + + virtual u_int64_t contentSize() const = 0; + virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0; + virtual bool isPersistent() = 0; + virtual const std::string& getRoutingKey() const = 0; + virtual const ConnectionToken* const getPublisher() = 0; + virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests? + virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests? + + virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests? + + virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedHeaderSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * (possibly partial) content held by this message + */ + virtual u_int32_t encodedContentSize() = 0; + /** + * If headers have been received, returns the expected + * content size else returns 0. + */ + virtual u_int64_t expectedContentSize() = 0; + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + virtual void releaseContent(MessageStore* /*store*/) {}; + + // TODO: AMS 29/1/2007 Don't think these are really part of base class + + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + virtual void setContent(std::auto_ptr& /*content*/) {}; + virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {}; + virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {}; + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ +#ifndef _broker_BrokerMessageBase_h +#define _broker_BrokerMessageBase_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AMQContentBody.h" +#include "AMQHeaderBody.h" +#include "Content.h" + +#include +#include + +namespace qpid { + + namespace framing { + class OutputHandler; + class ProtocolVersion; + class BasicHeaderProperties; + } + + namespace broker { + + class MessageStore; + class ConnectionToken; + + /** + * Base class for all types of internal broker messages + * abstracting away the operations + * TODO; AMS: for the moment this is mostly a placeholder + */ + class Message{ + + public: + typedef boost::shared_ptr shared_ptr; + + virtual ~Message() {}; + + virtual void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void redeliver() = 0; + + virtual bool isComplete() = 0; + + virtual u_int64_t contentSize() const = 0; + virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0; + virtual bool isPersistent() = 0; + virtual const std::string& getRoutingKey() const = 0; + virtual const ConnectionToken* const getPublisher() = 0; + virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests? + virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests? + + virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests? + + virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedHeaderSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * (possibly partial) content held by this message + */ + virtual u_int32_t encodedContentSize() = 0; + /** + * If headers have been received, returns the expected + * content size else returns 0. + */ + virtual u_int64_t expectedContentSize() = 0; + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + virtual void releaseContent(MessageStore* /*store*/) {}; + + // TODO: AMS 29/1/2007 Don't think these are really part of base class + + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + virtual void setContent(std::auto_ptr& /*content*/) {}; + virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {}; + virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {}; + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h new file mode 100644 index 0000000000..f25405db72 --- /dev/null +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -0,0 +1,134 @@ +#ifndef _broker_BrokerMessageMessage_h +#define _broker_BrokerMessageMessage_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerMessageBase.h" + +namespace qpid { + namespace broker { + class MessageMessage: public Message{ + + public: + ~MessageMessage(); + + void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void redeliver(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr data); + bool isComplete(); + void setContent(std::auto_ptr& content); + + u_int64_t contentSize() const; + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); + const std::string& getRoutingKey() const; + const ConnectionToken* const getPublisher(); + + u_int32_t encodedContentSize(); + u_int64_t expectedContentSize(); + void releaseContent(MessageStore* store); + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ +#ifndef _broker_BrokerMessageMessage_h +#define _broker_BrokerMessageMessage_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerMessageBase.h" + +namespace qpid { + namespace broker { + class MessageMessage: public Message{ + + public: + ~MessageMessage(); + + void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void redeliver(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr data); + bool isComplete(); + void setContent(std::auto_ptr& content); + + u_int64_t contentSize() const; + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); + const std::string& getRoutingKey() const; + const ConnectionToken* const getPublisher(); + + u_int32_t encodedContentSize(); + u_int64_t expectedContentSize(); + void releaseContent(MessageStore* store); + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index d34422c93d..0f58278a5a 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -74,7 +74,7 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) { string locales("en_US"); // TODO aconway 2007-01-16: Client call, move to adapter. client->getConnection().start( - MethodContext(0, &getAdapter(0)), + MethodContext(0, 0, &getAdapter(0)), header->getMajor(), header->getMinor(), properties, mechanisms, locales); getAdapter(0).init(0, *out, client->getProtocolVersion()); diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 33f7a63d45..7361d8827a 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -1,3 +1,4 @@ + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -18,8 +19,11 @@ #include "MessageHandlerImpl.h" #include "BrokerChannel.h" +#include "FramingContent.h" #include "Connection.h" #include "Broker.h" +#include "BrokerMessageMessage.h" + namespace qpid { namespace broker { @@ -41,7 +45,7 @@ void MessageHandlerImpl::cancel( const MethodContext& context, const string& destination ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature channel.cancel(destination); @@ -73,10 +77,9 @@ MessageHandlerImpl::consume(const MethodContext& context, bool exclusive, const qpid::framing::FieldTable& filter ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - Channel& channel = connection.getChannel(channel.getId()); if(!destination.empty() && channel.exists(destination)){ throw ConnectionException(530, "Consumer tags must be unique"); } @@ -108,7 +111,7 @@ MessageHandlerImpl::get( const MethodContext& context, const string& /*destination*/, bool noAck ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Queue::shared_ptr queue = connection.getQueue(queueName, context.channelId); @@ -146,7 +149,7 @@ MessageHandlerImpl::qos(const MethodContext& context, u_int16_t prefetchCount, bool /*global*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature //TODO: handle global channel.setPrefetchSize(prefetchSize); @@ -159,7 +162,7 @@ void MessageHandlerImpl::recover(const MethodContext&, bool requeue ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature channel.recover(requeue); @@ -182,18 +185,18 @@ MessageHandlerImpl::resume(const MethodContext&, } void -MessageHandlerImpl::transfer(const MethodContext&, +MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool immediate, + bool /*immediate*/, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& routingKey, + const string& /*routingKey*/, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -204,15 +207,24 @@ MessageHandlerImpl::transfer(const MethodContext&, const string& /*transactionId*/, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) + qpid::framing::Content body, + bool /*mandatory*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate); - channel.handlePublish(msg, exchange); + if (body.isInline()) { +// MessageMessage* msg = +// new MessageMessage(&connection, exchangeName, routingKey, immediate); +// channel.handlePublish(msg, exchange); + + connection.client->getMessageHandler()->ok(context); + } else { + // Don't handle reference content yet + assert(body.isInline()); + } }else{ throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); } diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 0eb9e119f5..985efe3847 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -114,7 +114,8 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH const std::string& transactionId, const std::string& securityToken, const framing::FieldTable& applicationHeaders, - framing::Content body ); + framing::Content body, + bool mandatory ); }; }} // namespace qpid::broker diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index bf6c44570d..78e340eb11 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -46,10 +46,11 @@ Connection::Connection( bool _debug, u_int32_t _max_frame_size, const framing::ProtocolVersion& _version ) : version(_version), max_frame_size(_max_frame_size), - defaultConnector(version, debug, max_frame_size), - connector(&defaultConnector), + defaultConnector(version, _debug, _max_frame_size), isOpen(false), debug(_debug) -{} +{ + setConnector(defaultConnector); +} Connection::~Connection(){ close(); diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp index 1fdb8d6691..653e47048e 100644 --- a/cpp/lib/common/framing/ChannelAdapter.cpp +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -29,7 +29,7 @@ void ChannelAdapter::init( id = i; out = &o; version = v; - context = MethodContext(id, this); + context = MethodContext(0, id, this); } void ChannelAdapter::send(AMQFrame* frame) { @@ -59,7 +59,7 @@ void ChannelAdapter::send(AMQBody::shared_ptr body) { void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { assertMethodOk(*request); responder.received(request->getData()); - context =MethodContext(id, this, request->getRequestId()); + context =MethodContext(request.get(), id, this, request->getRequestId()); handleMethodInContext(request, context); } @@ -73,7 +73,7 @@ void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) { assertMethodOk(*method); - context = MethodContext(id, this); + context = MethodContext(method.get(), id, this); handleMethodInContext(method, context); } diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h index b2a5ef6ff5..f0b3d2469a 100644 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -54,7 +54,7 @@ class ChannelAdapter : public BodyHandler, public OutputHandler { /** *@param output Processed frames are forwarded to this handler. */ - ChannelAdapter() : context(0), id(0), out(0) {} + ChannelAdapter() : context(0, 0), id(0), out(0) {} /** Initialize the channel adapter. */ void init(ChannelId, OutputHandler&, const ProtocolVersion&); diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index 46d2e064b5..54e05f0fb2 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -26,6 +26,7 @@ namespace qpid { namespace framing { class BodyHandler; +class AMQMethodBody; /** * Invocation context for an AMQP method. @@ -46,8 +47,10 @@ struct MethodContext * will automatically construct the MethodContext. */ MethodContext( + const AMQMethodBody* method, ChannelId channel, OutputHandler* output=0, RequestId request=0) - : channelId(channel), out(output), requestId(request){} + : channelId(channel), out(output), requestId(request), + methodBody(method) {} /** \internal Channel on which the method is sent. */ ChannelId channelId; @@ -60,6 +63,10 @@ struct MethodContext */ RequestId requestId; + /** \internal This is the Method Body itself + * It's useful for passing around instead of unpacking all its parameters + */ + const AMQMethodBody* methodBody; }; }} // namespace qpid::framing -- cgit v1.2.1