diff options
| author | Gordon Sim <gsim@apache.org> | 2014-01-09 11:20:51 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-01-09 11:20:51 +0000 |
| commit | 665989f6c6192b4ae68069031bee86e5899571e7 (patch) | |
| tree | 54f63140b1da6d62eb815f38614f04a86f916be1 /qpid/cpp/src | |
| parent | 8562fa7dde6762a1fa03a7f4e14aecf1d45cb2df (diff) | |
| download | qpid-python-665989f6c6192b4ae68069031bee86e5899571e7.tar.gz | |
QPID-5457: support for messages composed of multiple transfers
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556787 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/amqp/Decoder.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.cpp | 46 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.h | 1 |
3 files changed, 35 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/amqp/Decoder.cpp b/qpid/cpp/src/qpid/amqp/Decoder.cpp index 1058f83e38..de2c5ce64d 100644 --- a/qpid/cpp/src/qpid/amqp/Decoder.cpp +++ b/qpid/cpp/src/qpid/amqp/Decoder.cpp @@ -301,7 +301,7 @@ Descriptor Decoder::readDescriptor() void Decoder::advance(size_t n) { - if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds")); + if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds: requested advance of " << n << " at " << position << " but only " << available() << " available")); position += n; } diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp index 347c3eb14d..a1556f5249 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -27,6 +27,7 @@ #include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/Message.h" #include "qpid/broker/Broker.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { @@ -111,19 +112,38 @@ DecodingIncoming::~DecodingIncoming() {} void DecodingIncoming::readable(pn_delivery_t* delivery) { - boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery))); - /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize()); - received->scan(); - pn_link_advance(link); + size_t pending = pn_delivery_pending(delivery); + size_t offset = partial ? partial->getSize() : 0; + boost::intrusive_ptr<Message> received(new Message(offset + pending)); + if (partial) { + ::memcpy(received->getData(), partial->getData(), offset); + partial = boost::intrusive_ptr<Message>(); + } + assert(received->getSize() == pending + offset); + pn_link_recv(link, received->getData() + offset, pending); + + if (pn_delivery_partial(delivery)) { + QPID_LOG(debug, "Message incomplete: received " << pending << " bytes, now have " << received->getSize()); + partial = received; + } else { + if (offset) { + QPID_LOG(debug, "Message complete: received " << pending << " bytes, " << received->getSize() << " in total"); + } else { + QPID_LOG(debug, "Message received: " << received->getSize() << " bytes"); + } - qpid::broker::Message message(received, received); - message.setPublisher(session->getParent()); - userid.verify(message.getUserId()); - message.computeExpiration(expiryPolicy); - handle(message); - --window; - received->begin(); - Transfer t(delivery, session); - received->end(t); + received->scan(); + pn_link_advance(link); + + qpid::broker::Message message(received, received); + message.setPublisher(session->getParent()); + userid.verify(message.getUserId()); + message.computeExpiration(expiryPolicy); + handle(message); + --window; + received->begin(); + Transfer t(delivery, session); + received->end(t); + } } }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h index 127f8cecf9..807b918a17 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h @@ -78,6 +78,7 @@ class DecodingIncoming : public Incoming private: boost::shared_ptr<Session> session; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + boost::intrusive_ptr<Message> partial; }; }}} // namespace qpid::broker::amqp |
