summaryrefslogtreecommitdiff
path: root/cpp/lib/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-30 20:07:41 +0000
committerAlan Conway <aconway@apache.org>2007-01-30 20:07:41 +0000
commitf9f848394de0662248cf62d4ec5e4818949403b2 (patch)
tree4f13105e2223b704d7850300116dcc56116acae2 /cpp/lib/broker
parent98ccae7574a18f8d0a1f9e28e86ccfde4541c81f (diff)
downloadqpid-python-f9f848394de0662248cf62d4ec5e4818949403b2.tar.gz
Andrew Stitcher <astitcher@redhat.com>
r723@fuschia: andrew | 2007-01-12 00:35:16 +0000 Branch for my work on Qpid.0-9 r724@fuschia: andrew | 2007-01-12 00:59:28 +0000 Added in empty implementation of handler class for protocol Message class r768@fuschia: andrew | 2007-01-17 01:25:16 +0000 * Added Test for new MessageHandlerImpl (but no actual tests yet) * Filled in lots of the blanks in the MessageHandlerImpl with code stolen from the BasicHandlerImpl r800@fuschia: andrew | 2007-01-17 17:34:13 +0000 Updated to latest upstream changes r840@fuschia: andrew | 2007-01-19 00:31:59 +0000 Fixed merge errors r841@fuschia: andrew | 2007-01-19 00:47:29 +0000 Another merge problem fixed r878@fuschia: andrew | 2007-01-24 11:27:48 +0000 Started work on the Message class handler implementation r976@fuschia: andrew | 2007-01-30 17:05:05 +0000 Working again after broker Message refactor r980@fuschia: andrew | 2007-01-30 18:39:18 +0000 Fix for extra parameter to transfer git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501534 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp16
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp52
-rw-r--r--cpp/lib/broker/BrokerMessage.h13
-rw-r--r--cpp/lib/broker/BrokerMessageBase.h268
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h134
-rw-r--r--cpp/lib/broker/Connection.cpp2
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp38
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.h3
8 files changed, 471 insertions, 55 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 5cf767a8e1..abf0b3852d 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -322,7 +322,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& conte
void
BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
- const MethodContext&,
+ const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
const string& exchangeName,
@@ -337,7 +337,7 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
exchange->unbind(queue, routingKey, &arguments);
- connection.client->getQueue().unbindOk(channel.getId());
+ connection.client->getQueue().unbindOk(context);
}
void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
@@ -420,7 +420,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate);
+ BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
channel.handlePublish(msg, exchange);
}else{
throw ChannelException(
@@ -475,16 +475,16 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& )
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context)
{
- connection.client->getChannel().ok(channel.getId());
- connection.client->getChannel().pong(channel.getId());
+ connection.client->getChannel().ok(context);
+ connection.client->getChannel().pong(context);
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context)
{
- connection.client->getChannel().ok(channel.getId());
+ connection.client->getChannel().ok(context);
}
void
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 07b14a4eff..a5192beede 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -33,7 +33,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-Message::Message(const ConnectionToken* const _publisher,
+BasicMessage::BasicMessage(const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
bool _mandatory, bool _immediate) : publisher(_publisher),
exchange(_exchange),
@@ -44,23 +44,23 @@ Message::Message(const ConnectionToken* const _publisher,
size(0),
persistenceId(0) {}
-Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
+BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
decode(buffer, headersOnly, contentChunkSize);
}
-Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
+BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
-Message::~Message(){
+BasicMessage::~BasicMessage(){
if (content.get()) content->destroy();
}
-void Message::setHeader(AMQHeaderBody::shared_ptr _header){
+void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){
this->header = _header;
}
-void Message::addContent(AMQContentBody::shared_ptr data){
+void BasicMessage::addContent(AMQContentBody::shared_ptr data){
if (!content.get()) {
content = std::auto_ptr<Content>(new InMemoryContent());
}
@@ -68,15 +68,15 @@ void Message::addContent(AMQContentBody::shared_ptr data){
size += data->size();
}
-bool Message::isComplete(){
+bool BasicMessage::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
-void Message::redeliver(){
+void BasicMessage::redeliver(){
redelivered = true;
}
-void Message::deliver(OutputHandler* out, int channel,
+void BasicMessage::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize,
ProtocolVersion* version){
@@ -85,7 +85,7 @@ void Message::deliver(OutputHandler* out, int channel,
sendContent(out, channel, framesize, version);
}
-void Message::sendGetOk(OutputHandler* out,
+void BasicMessage::sendGetOk(OutputHandler* out,
int channel,
u_int32_t messageCount,
u_int64_t deliveryTag,
@@ -96,7 +96,7 @@ void Message::sendGetOk(OutputHandler* out,
sendContent(out, channel, framesize, version);
}
-void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
+void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(*version, channel, headerBody));
@@ -104,28 +104,28 @@ void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize,
if (content.get()) content->send(*version, out, channel, framesize);
}
-BasicHeaderProperties* Message::getHeaderProperties(){
+BasicHeaderProperties* BasicMessage::getHeaderProperties(){
return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
}
-const ConnectionToken* const Message::getPublisher(){
+const ConnectionToken* const BasicMessage::getPublisher(){
return publisher;
}
-bool Message::isPersistent()
+bool BasicMessage::isPersistent()
{
if(!header) return false;
BasicHeaderProperties* props = getHeaderProperties();
return props && props->getDeliveryMode() == PERSISTENT;
}
-void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
+void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
{
decodeHeader(buffer);
if (!headersOnly) decodeContent(buffer, contentChunkSize);
}
-void Message::decodeHeader(Buffer& buffer)
+void BasicMessage::decodeHeader(Buffer& buffer)
{
buffer.getShortString(exchange);
buffer.getShortString(routingKey);
@@ -136,7 +136,7 @@ void Message::decodeHeader(Buffer& buffer)
setHeader(headerBody);
}
-void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
+void BasicMessage::decodeContent(Buffer& buffer, u_int32_t chunkSize)
{
u_int64_t expected = expectedContentSize();
if (expected != buffer.available()) {
@@ -158,13 +158,13 @@ void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
}
}
-void Message::encode(Buffer& buffer)
+void BasicMessage::encode(Buffer& buffer)
{
encodeHeader(buffer);
encodeContent(buffer);
}
-void Message::encodeHeader(Buffer& buffer)
+void BasicMessage::encodeHeader(Buffer& buffer)
{
buffer.putShortString(exchange);
buffer.putShortString(routingKey);
@@ -172,36 +172,36 @@ void Message::encodeHeader(Buffer& buffer)
header->encode(buffer);
}
-void Message::encodeContent(Buffer& buffer)
+void BasicMessage::encodeContent(Buffer& buffer)
{
Mutex::ScopedLock locker(contentLock);
if (content.get()) content->encode(buffer);
}
-u_int32_t Message::encodedSize()
+u_int32_t BasicMessage::encodedSize()
{
return encodedHeaderSize() + encodedContentSize();
}
-u_int32_t Message::encodedContentSize()
+u_int32_t BasicMessage::encodedContentSize()
{
Mutex::ScopedLock locker(contentLock);
return content.get() ? content->size() : 0;
}
-u_int32_t Message::encodedHeaderSize()
+u_int32_t BasicMessage::encodedHeaderSize()
{
return exchange.size() + 1
+ routingKey.size() + 1
+ header->size() + 4;//4 extra bytes for size
}
-u_int64_t Message::expectedContentSize()
+u_int64_t BasicMessage::expectedContentSize()
{
return header.get() ? header->getContentSize() : 0;
}
-void Message::releaseContent(MessageStore* store)
+void BasicMessage::releaseContent(MessageStore* store)
{
Mutex::ScopedLock locker(contentLock);
if (!isPersistent() && persistenceId == 0) {
@@ -217,7 +217,7 @@ void Message::releaseContent(MessageStore* store)
}
}
-void Message::setContent(std::auto_ptr<Content>& _content)
+void BasicMessage::setContent(std::auto_ptr<Content>& _content)
{
Mutex::ScopedLock locker(contentLock);
content = _content;
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index 388bfba51e..d9ab9b7220 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -22,6 +22,7 @@
*
*/
+#include <BrokerMessageBase.h>
#include <memory>
#include <boost/shared_ptr.hpp>
#include <AMQContentBody.h>
@@ -45,7 +46,7 @@ namespace qpid {
* content bodies and some details about the publication
* request.
*/
- class Message{
+ class BasicMessage : public Message{
const ConnectionToken* const publisher;
string exchange;
string routingKey;
@@ -62,14 +63,14 @@ namespace qpid {
int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version);
public:
- typedef boost::shared_ptr<Message> shared_ptr;
+ typedef boost::shared_ptr<BasicMessage> shared_ptr;
- Message(const ConnectionToken* const publisher,
+ BasicMessage(const ConnectionToken* const publisher,
const string& exchange, const string& routingKey,
bool mandatory, bool immediate);
- Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
- Message();
- ~Message();
+ BasicMessage(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
+ BasicMessage();
+ ~BasicMessage();
void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
void addContent(qpid::framing::AMQContentBody::shared_ptr data);
bool isComplete();
diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h
new file mode 100644
index 0000000000..e0139519ae
--- /dev/null
+++ b/cpp/lib/broker/BrokerMessageBase.h
@@ -0,0 +1,268 @@
+#ifndef _broker_BrokerMessageBase_h
+#define _broker_BrokerMessageBase_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AMQContentBody.h"
+#include "AMQHeaderBody.h"
+#include "Content.h"
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+ namespace framing {
+ class OutputHandler;
+ class ProtocolVersion;
+ class BasicHeaderProperties;
+ }
+
+ namespace broker {
+
+ class MessageStore;
+ class ConnectionToken;
+
+ /**
+ * Base class for all types of internal broker messages
+ * abstracting away the operations
+ * TODO; AMS: for the moment this is mostly a placeholder
+ */
+ class Message{
+
+ public:
+ typedef boost::shared_ptr<Message> shared_ptr;
+
+ virtual ~Message() {};
+
+ virtual void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const std::string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version) = 0;
+ virtual void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version) = 0;
+ virtual void redeliver() = 0;
+
+ virtual bool isComplete() = 0;
+
+ virtual u_int64_t contentSize() const = 0;
+ virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0;
+ virtual bool isPersistent() = 0;
+ virtual const std::string& getRoutingKey() const = 0;
+ virtual const ConnectionToken* const getPublisher() = 0;
+ virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests?
+ virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests?
+
+ virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests?
+
+ virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+ virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedHeaderSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ virtual u_int32_t encodedContentSize() = 0;
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ virtual u_int64_t expectedContentSize() = 0;
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ virtual void releaseContent(MessageStore* /*store*/) {};
+
+ // TODO: AMS 29/1/2007 Don't think these are really part of base class
+
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
+ virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {};
+ virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {};
+ };
+
+ }
+}
+
+
+#endif /*!_broker_BrokerMessage_h*/
+#ifndef _broker_BrokerMessageBase_h
+#define _broker_BrokerMessageBase_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AMQContentBody.h"
+#include "AMQHeaderBody.h"
+#include "Content.h"
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+ namespace framing {
+ class OutputHandler;
+ class ProtocolVersion;
+ class BasicHeaderProperties;
+ }
+
+ namespace broker {
+
+ class MessageStore;
+ class ConnectionToken;
+
+ /**
+ * Base class for all types of internal broker messages
+ * abstracting away the operations
+ * TODO; AMS: for the moment this is mostly a placeholder
+ */
+ class Message{
+
+ public:
+ typedef boost::shared_ptr<Message> shared_ptr;
+
+ virtual ~Message() {};
+
+ virtual void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const std::string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version) = 0;
+ virtual void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version) = 0;
+ virtual void redeliver() = 0;
+
+ virtual bool isComplete() = 0;
+
+ virtual u_int64_t contentSize() const = 0;
+ virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0;
+ virtual bool isPersistent() = 0;
+ virtual const std::string& getRoutingKey() const = 0;
+ virtual const ConnectionToken* const getPublisher() = 0;
+ virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests?
+ virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests?
+
+ virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests?
+
+ virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+ virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedHeaderSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ virtual u_int32_t encodedContentSize() = 0;
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ virtual u_int64_t expectedContentSize() = 0;
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ virtual void releaseContent(MessageStore* /*store*/) {};
+
+ // TODO: AMS 29/1/2007 Don't think these are really part of base class
+
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
+ virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {};
+ virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {};
+ };
+
+ }
+}
+
+
+#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
new file mode 100644
index 0000000000..f25405db72
--- /dev/null
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -0,0 +1,134 @@
+#ifndef _broker_BrokerMessageMessage_h
+#define _broker_BrokerMessageMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerMessageBase.h"
+
+namespace qpid {
+ namespace broker {
+ class MessageMessage: public Message{
+
+ public:
+ ~MessageMessage();
+
+ void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const std::string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
+ void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
+ void redeliver();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+ void setContent(std::auto_ptr<Content>& content);
+
+ u_int64_t contentSize() const;
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
+ const std::string& getRoutingKey() const;
+ const ConnectionToken* const getPublisher();
+
+ u_int32_t encodedContentSize();
+ u_int64_t expectedContentSize();
+ void releaseContent(MessageStore* store);
+ };
+
+ }
+}
+
+
+#endif /*!_broker_BrokerMessage_h*/
+#ifndef _broker_BrokerMessageMessage_h
+#define _broker_BrokerMessageMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerMessageBase.h"
+
+namespace qpid {
+ namespace broker {
+ class MessageMessage: public Message{
+
+ public:
+ ~MessageMessage();
+
+ void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const std::string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
+ void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
+ void redeliver();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+ void setContent(std::auto_ptr<Content>& content);
+
+ u_int64_t contentSize() const;
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
+ const std::string& getRoutingKey() const;
+ const ConnectionToken* const getPublisher();
+
+ u_int32_t encodedContentSize();
+ u_int64_t expectedContentSize();
+ void releaseContent(MessageStore* store);
+ };
+
+ }
+}
+
+
+#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp
index d34422c93d..0f58278a5a 100644
--- a/cpp/lib/broker/Connection.cpp
+++ b/cpp/lib/broker/Connection.cpp
@@ -74,7 +74,7 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
string locales("en_US");
// TODO aconway 2007-01-16: Client call, move to adapter.
client->getConnection().start(
- MethodContext(0, &getAdapter(0)),
+ MethodContext(0, 0, &getAdapter(0)),
header->getMajor(), header->getMinor(),
properties, mechanisms, locales);
getAdapter(0).init(0, *out, client->getProtocolVersion());
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 33f7a63d45..7361d8827a 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -1,3 +1,4 @@
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -18,8 +19,11 @@
#include "MessageHandlerImpl.h"
#include "BrokerChannel.h"
+#include "FramingContent.h"
#include "Connection.h"
#include "Broker.h"
+#include "BrokerMessageMessage.h"
+
namespace qpid {
namespace broker {
@@ -41,7 +45,7 @@ void
MessageHandlerImpl::cancel( const MethodContext& context,
const string& destination )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
channel.cancel(destination);
@@ -73,10 +77,9 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool exclusive,
const qpid::framing::FieldTable& filter )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- Channel& channel = connection.getChannel(channel.getId());
if(!destination.empty() && channel.exists(destination)){
throw ConnectionException(530, "Consumer tags must be unique");
}
@@ -108,7 +111,7 @@ MessageHandlerImpl::get( const MethodContext& context,
const string& /*destination*/,
bool noAck )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Queue::shared_ptr queue =
connection.getQueue(queueName, context.channelId);
@@ -146,7 +149,7 @@ MessageHandlerImpl::qos(const MethodContext& context,
u_int16_t prefetchCount,
bool /*global*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
@@ -159,7 +162,7 @@ void
MessageHandlerImpl::recover(const MethodContext&,
bool requeue )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
channel.recover(requeue);
@@ -182,18 +185,18 @@ MessageHandlerImpl::resume(const MethodContext&,
}
void
-MessageHandlerImpl::transfer(const MethodContext&,
+MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool immediate,
+ bool /*immediate*/,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& routingKey,
+ const string& /*routingKey*/,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -204,15 +207,24 @@ MessageHandlerImpl::transfer(const MethodContext&,
const string& /*transactionId*/,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
+ qpid::framing::Content body,
+ bool /*mandatory*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Exchange::shared_ptr exchange = exchangeName.empty() ?
broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate);
- channel.handlePublish(msg, exchange);
+ if (body.isInline()) {
+// MessageMessage* msg =
+// new MessageMessage(&connection, exchangeName, routingKey, immediate);
+// channel.handlePublish(msg, exchange);
+
+ connection.client->getMessageHandler()->ok(context);
+ } else {
+ // Don't handle reference content yet
+ assert(body.isInline());
+ }
}else{
throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
}
diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h
index 0eb9e119f5..985efe3847 100644
--- a/cpp/lib/broker/MessageHandlerImpl.h
+++ b/cpp/lib/broker/MessageHandlerImpl.h
@@ -114,7 +114,8 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH
const std::string& transactionId,
const std::string& securityToken,
const framing::FieldTable& applicationHeaders,
- framing::Content body );
+ framing::Content body,
+ bool mandatory );
};
}} // namespace qpid::broker