diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-30 20:07:41 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-30 20:07:41 +0000 |
| commit | f9f848394de0662248cf62d4ec5e4818949403b2 (patch) | |
| tree | 4f13105e2223b704d7850300116dcc56116acae2 /cpp/lib/broker | |
| parent | 98ccae7574a18f8d0a1f9e28e86ccfde4541c81f (diff) | |
| download | qpid-python-f9f848394de0662248cf62d4ec5e4818949403b2.tar.gz | |
Andrew Stitcher <astitcher@redhat.com>
r723@fuschia: andrew | 2007-01-12 00:35:16 +0000
Branch for my work on Qpid.0-9
r724@fuschia: andrew | 2007-01-12 00:59:28 +0000
Added in empty implementation of handler class for protocol Message class
r768@fuschia: andrew | 2007-01-17 01:25:16 +0000
* Added Test for new MessageHandlerImpl (but no actual tests yet)
* Filled in lots of the blanks in the MessageHandlerImpl with code
stolen from the BasicHandlerImpl
r800@fuschia: andrew | 2007-01-17 17:34:13 +0000
Updated to latest upstream changes
r840@fuschia: andrew | 2007-01-19 00:31:59 +0000
Fixed merge errors
r841@fuschia: andrew | 2007-01-19 00:47:29 +0000
Another merge problem fixed
r878@fuschia: andrew | 2007-01-24 11:27:48 +0000
Started work on the Message class handler implementation
r976@fuschia: andrew | 2007-01-30 17:05:05 +0000
Working again after broker Message refactor
r980@fuschia: andrew | 2007-01-30 18:39:18 +0000
Fix for extra parameter to transfer
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501534 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker')
| -rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 16 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 52 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 13 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessageBase.h | 268 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 134 | ||||
| -rw-r--r-- | cpp/lib/broker/Connection.cpp | 2 | ||||
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 38 | ||||
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.h | 3 |
8 files changed, 471 insertions, 55 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 5cf767a8e1..abf0b3852d 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -322,7 +322,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& conte void BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( - const MethodContext&, + const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& exchangeName, @@ -337,7 +337,7 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( exchange->unbind(queue, routingKey, &arguments); - connection.client->getQueue().unbindOk(channel.getId()); + connection.client->getQueue().unbindOk(context); } void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ @@ -420,7 +420,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate); + BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate); channel.handlePublish(msg, exchange); }else{ throw ChannelException( @@ -475,16 +475,16 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& ) } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context) { - connection.client->getChannel().ok(channel.getId()); - connection.client->getChannel().pong(channel.getId()); + connection.client->getChannel().ok(context); + connection.client->getChannel().pong(context); } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context) { - connection.client->getChannel().ok(channel.getId()); + connection.client->getChannel().ok(context); } void diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 07b14a4eff..a5192beede 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -33,7 +33,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -Message::Message(const ConnectionToken* const _publisher, +BasicMessage::BasicMessage(const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, bool _mandatory, bool _immediate) : publisher(_publisher), exchange(_exchange), @@ -44,23 +44,23 @@ Message::Message(const ConnectionToken* const _publisher, size(0), persistenceId(0) {} -Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : +BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ decode(buffer, headersOnly, contentChunkSize); } -Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} +BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} -Message::~Message(){ +BasicMessage::~BasicMessage(){ if (content.get()) content->destroy(); } -void Message::setHeader(AMQHeaderBody::shared_ptr _header){ +void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){ this->header = _header; } -void Message::addContent(AMQContentBody::shared_ptr data){ +void BasicMessage::addContent(AMQContentBody::shared_ptr data){ if (!content.get()) { content = std::auto_ptr<Content>(new InMemoryContent()); } @@ -68,15 +68,15 @@ void Message::addContent(AMQContentBody::shared_ptr data){ size += data->size(); } -bool Message::isComplete(){ +bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } -void Message::redeliver(){ +void BasicMessage::redeliver(){ redelivered = true; } -void Message::deliver(OutputHandler* out, int channel, +void BasicMessage::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize, ProtocolVersion* version){ @@ -85,7 +85,7 @@ void Message::deliver(OutputHandler* out, int channel, sendContent(out, channel, framesize, version); } -void Message::sendGetOk(OutputHandler* out, +void BasicMessage::sendGetOk(OutputHandler* out, int channel, u_int32_t messageCount, u_int64_t deliveryTag, @@ -96,7 +96,7 @@ void Message::sendGetOk(OutputHandler* out, sendContent(out, channel, framesize, version); } -void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ +void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); out->send(new AMQFrame(*version, channel, headerBody)); @@ -104,28 +104,28 @@ void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, if (content.get()) content->send(*version, out, channel, framesize); } -BasicHeaderProperties* Message::getHeaderProperties(){ +BasicHeaderProperties* BasicMessage::getHeaderProperties(){ return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); } -const ConnectionToken* const Message::getPublisher(){ +const ConnectionToken* const BasicMessage::getPublisher(){ return publisher; } -bool Message::isPersistent() +bool BasicMessage::isPersistent() { if(!header) return false; BasicHeaderProperties* props = getHeaderProperties(); return props && props->getDeliveryMode() == PERSISTENT; } -void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) +void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) { decodeHeader(buffer); if (!headersOnly) decodeContent(buffer, contentChunkSize); } -void Message::decodeHeader(Buffer& buffer) +void BasicMessage::decodeHeader(Buffer& buffer) { buffer.getShortString(exchange); buffer.getShortString(routingKey); @@ -136,7 +136,7 @@ void Message::decodeHeader(Buffer& buffer) setHeader(headerBody); } -void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) +void BasicMessage::decodeContent(Buffer& buffer, u_int32_t chunkSize) { u_int64_t expected = expectedContentSize(); if (expected != buffer.available()) { @@ -158,13 +158,13 @@ void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) } } -void Message::encode(Buffer& buffer) +void BasicMessage::encode(Buffer& buffer) { encodeHeader(buffer); encodeContent(buffer); } -void Message::encodeHeader(Buffer& buffer) +void BasicMessage::encodeHeader(Buffer& buffer) { buffer.putShortString(exchange); buffer.putShortString(routingKey); @@ -172,36 +172,36 @@ void Message::encodeHeader(Buffer& buffer) header->encode(buffer); } -void Message::encodeContent(Buffer& buffer) +void BasicMessage::encodeContent(Buffer& buffer) { Mutex::ScopedLock locker(contentLock); if (content.get()) content->encode(buffer); } -u_int32_t Message::encodedSize() +u_int32_t BasicMessage::encodedSize() { return encodedHeaderSize() + encodedContentSize(); } -u_int32_t Message::encodedContentSize() +u_int32_t BasicMessage::encodedContentSize() { Mutex::ScopedLock locker(contentLock); return content.get() ? content->size() : 0; } -u_int32_t Message::encodedHeaderSize() +u_int32_t BasicMessage::encodedHeaderSize() { return exchange.size() + 1 + routingKey.size() + 1 + header->size() + 4;//4 extra bytes for size } -u_int64_t Message::expectedContentSize() +u_int64_t BasicMessage::expectedContentSize() { return header.get() ? header->getContentSize() : 0; } -void Message::releaseContent(MessageStore* store) +void BasicMessage::releaseContent(MessageStore* store) { Mutex::ScopedLock locker(contentLock); if (!isPersistent() && persistenceId == 0) { @@ -217,7 +217,7 @@ void Message::releaseContent(MessageStore* store) } } -void Message::setContent(std::auto_ptr<Content>& _content) +void BasicMessage::setContent(std::auto_ptr<Content>& _content) { Mutex::ScopedLock locker(contentLock); content = _content; diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 388bfba51e..d9ab9b7220 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -22,6 +22,7 @@ * */ +#include <BrokerMessageBase.h> #include <memory> #include <boost/shared_ptr.hpp> #include <AMQContentBody.h> @@ -45,7 +46,7 @@ namespace qpid { * content bodies and some details about the publication * request. */ - class Message{ + class BasicMessage : public Message{ const ConnectionToken* const publisher; string exchange; string routingKey; @@ -62,14 +63,14 @@ namespace qpid { int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version); public: - typedef boost::shared_ptr<Message> shared_ptr; + typedef boost::shared_ptr<BasicMessage> shared_ptr; - Message(const ConnectionToken* const publisher, + BasicMessage(const ConnectionToken* const publisher, const string& exchange, const string& routingKey, bool mandatory, bool immediate); - Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); - Message(); - ~Message(); + BasicMessage(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); + BasicMessage(); + ~BasicMessage(); void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); void addContent(qpid::framing::AMQContentBody::shared_ptr data); bool isComplete(); diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h new file mode 100644 index 0000000000..e0139519ae --- /dev/null +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -0,0 +1,268 @@ +#ifndef _broker_BrokerMessageBase_h +#define _broker_BrokerMessageBase_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AMQContentBody.h" +#include "AMQHeaderBody.h" +#include "Content.h" + +#include <string> +#include <boost/shared_ptr.hpp> + +namespace qpid { + + namespace framing { + class OutputHandler; + class ProtocolVersion; + class BasicHeaderProperties; + } + + namespace broker { + + class MessageStore; + class ConnectionToken; + + /** + * Base class for all types of internal broker messages + * abstracting away the operations + * TODO; AMS: for the moment this is mostly a placeholder + */ + class Message{ + + public: + typedef boost::shared_ptr<Message> shared_ptr; + + virtual ~Message() {}; + + virtual void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void redeliver() = 0; + + virtual bool isComplete() = 0; + + virtual u_int64_t contentSize() const = 0; + virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0; + virtual bool isPersistent() = 0; + virtual const std::string& getRoutingKey() const = 0; + virtual const ConnectionToken* const getPublisher() = 0; + virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests? + virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests? + + virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests? + + virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedHeaderSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * (possibly partial) content held by this message + */ + virtual u_int32_t encodedContentSize() = 0; + /** + * If headers have been received, returns the expected + * content size else returns 0. + */ + virtual u_int64_t expectedContentSize() = 0; + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + virtual void releaseContent(MessageStore* /*store*/) {}; + + // TODO: AMS 29/1/2007 Don't think these are really part of base class + + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; + virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {}; + virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {}; + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ +#ifndef _broker_BrokerMessageBase_h +#define _broker_BrokerMessageBase_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AMQContentBody.h" +#include "AMQHeaderBody.h" +#include "Content.h" + +#include <string> +#include <boost/shared_ptr.hpp> + +namespace qpid { + + namespace framing { + class OutputHandler; + class ProtocolVersion; + class BasicHeaderProperties; + } + + namespace broker { + + class MessageStore; + class ConnectionToken; + + /** + * Base class for all types of internal broker messages + * abstracting away the operations + * TODO; AMS: for the moment this is mostly a placeholder + */ + class Message{ + + public: + typedef boost::shared_ptr<Message> shared_ptr; + + virtual ~Message() {}; + + virtual void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version) = 0; + virtual void redeliver() = 0; + + virtual bool isComplete() = 0; + + virtual u_int64_t contentSize() const = 0; + virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0; + virtual bool isPersistent() = 0; + virtual const std::string& getRoutingKey() const = 0; + virtual const ConnectionToken* const getPublisher() = 0; + virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests? + virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests? + + virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests? + + virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedHeaderSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * (possibly partial) content held by this message + */ + virtual u_int32_t encodedContentSize() = 0; + /** + * If headers have been received, returns the expected + * content size else returns 0. + */ + virtual u_int64_t expectedContentSize() = 0; + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + virtual void releaseContent(MessageStore* /*store*/) {}; + + // TODO: AMS 29/1/2007 Don't think these are really part of base class + + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; + virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {}; + virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {}; + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h new file mode 100644 index 0000000000..f25405db72 --- /dev/null +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -0,0 +1,134 @@ +#ifndef _broker_BrokerMessageMessage_h +#define _broker_BrokerMessageMessage_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerMessageBase.h" + +namespace qpid { + namespace broker { + class MessageMessage: public Message{ + + public: + ~MessageMessage(); + + void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void redeliver(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr data); + bool isComplete(); + void setContent(std::auto_ptr<Content>& content); + + u_int64_t contentSize() const; + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); + const std::string& getRoutingKey() const; + const ConnectionToken* const getPublisher(); + + u_int32_t encodedContentSize(); + u_int64_t expectedContentSize(); + void releaseContent(MessageStore* store); + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ +#ifndef _broker_BrokerMessageMessage_h +#define _broker_BrokerMessageMessage_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerMessageBase.h" + +namespace qpid { + namespace broker { + class MessageMessage: public Message{ + + public: + ~MessageMessage(); + + void deliver(qpid::framing::OutputHandler* out, + int channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void redeliver(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr data); + bool isComplete(); + void setContent(std::auto_ptr<Content>& content); + + u_int64_t contentSize() const; + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); + const std::string& getRoutingKey() const; + const ConnectionToken* const getPublisher(); + + u_int32_t encodedContentSize(); + u_int64_t expectedContentSize(); + void releaseContent(MessageStore* store); + }; + + } +} + + +#endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index d34422c93d..0f58278a5a 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -74,7 +74,7 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) { string locales("en_US"); // TODO aconway 2007-01-16: Client call, move to adapter. client->getConnection().start( - MethodContext(0, &getAdapter(0)), + MethodContext(0, 0, &getAdapter(0)), header->getMajor(), header->getMinor(), properties, mechanisms, locales); getAdapter(0).init(0, *out, client->getProtocolVersion()); diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 33f7a63d45..7361d8827a 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -1,3 +1,4 @@ + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -18,8 +19,11 @@ #include "MessageHandlerImpl.h" #include "BrokerChannel.h" +#include "FramingContent.h" #include "Connection.h" #include "Broker.h" +#include "BrokerMessageMessage.h" + namespace qpid { namespace broker { @@ -41,7 +45,7 @@ void MessageHandlerImpl::cancel( const MethodContext& context, const string& destination ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature channel.cancel(destination); @@ -73,10 +77,9 @@ MessageHandlerImpl::consume(const MethodContext& context, bool exclusive, const qpid::framing::FieldTable& filter ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - Channel& channel = connection.getChannel(channel.getId()); if(!destination.empty() && channel.exists(destination)){ throw ConnectionException(530, "Consumer tags must be unique"); } @@ -108,7 +111,7 @@ MessageHandlerImpl::get( const MethodContext& context, const string& /*destination*/, bool noAck ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Queue::shared_ptr queue = connection.getQueue(queueName, context.channelId); @@ -146,7 +149,7 @@ MessageHandlerImpl::qos(const MethodContext& context, u_int16_t prefetchCount, bool /*global*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature //TODO: handle global channel.setPrefetchSize(prefetchSize); @@ -159,7 +162,7 @@ void MessageHandlerImpl::recover(const MethodContext&, bool requeue ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature channel.recover(requeue); @@ -182,18 +185,18 @@ MessageHandlerImpl::resume(const MethodContext&, } void -MessageHandlerImpl::transfer(const MethodContext&, +MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool immediate, + bool /*immediate*/, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& routingKey, + const string& /*routingKey*/, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -204,15 +207,24 @@ MessageHandlerImpl::transfer(const MethodContext&, const string& /*transactionId*/, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) + qpid::framing::Content body, + bool /*mandatory*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate); - channel.handlePublish(msg, exchange); + if (body.isInline()) { +// MessageMessage* msg = +// new MessageMessage(&connection, exchangeName, routingKey, immediate); +// channel.handlePublish(msg, exchange); + + connection.client->getMessageHandler()->ok(context); + } else { + // Don't handle reference content yet + assert(body.isInline()); + } }else{ throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); } diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 0eb9e119f5..985efe3847 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -114,7 +114,8 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH const std::string& transactionId, const std::string& securityToken, const framing::FieldTable& applicationHeaders, - framing::Content body ); + framing::Content body, + bool mandatory ); }; }} // namespace qpid::broker |
