diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/amqp/Decoder.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.cpp | 46 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.h | 1 |
3 files changed, 35 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/amqp/Decoder.cpp b/qpid/cpp/src/qpid/amqp/Decoder.cpp index 1058f83e38..de2c5ce64d 100644 --- a/qpid/cpp/src/qpid/amqp/Decoder.cpp +++ b/qpid/cpp/src/qpid/amqp/Decoder.cpp @@ -301,7 +301,7 @@ Descriptor Decoder::readDescriptor() void Decoder::advance(size_t n) { - if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds")); + if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds: requested advance of " << n << " at " << position << " but only " << available() << " available")); position += n; } diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp index 347c3eb14d..a1556f5249 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -27,6 +27,7 @@ #include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/Message.h" #include "qpid/broker/Broker.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { @@ -111,19 +112,38 @@ DecodingIncoming::~DecodingIncoming() {} void DecodingIncoming::readable(pn_delivery_t* delivery) { - boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery))); - /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize()); - received->scan(); - pn_link_advance(link); + size_t pending = pn_delivery_pending(delivery); + size_t offset = partial ? partial->getSize() : 0; + boost::intrusive_ptr<Message> received(new Message(offset + pending)); + if (partial) { + ::memcpy(received->getData(), partial->getData(), offset); + partial = boost::intrusive_ptr<Message>(); + } + assert(received->getSize() == pending + offset); + pn_link_recv(link, received->getData() + offset, pending); + + if (pn_delivery_partial(delivery)) { + QPID_LOG(debug, "Message incomplete: received " << pending << " bytes, now have " << received->getSize()); + partial = received; + } else { + if (offset) { + QPID_LOG(debug, "Message complete: received " << pending << " bytes, " << received->getSize() << " in total"); + } else { + QPID_LOG(debug, "Message received: " << received->getSize() << " bytes"); + } - qpid::broker::Message message(received, received); - message.setPublisher(session->getParent()); - userid.verify(message.getUserId()); - message.computeExpiration(expiryPolicy); - handle(message); - --window; - received->begin(); - Transfer t(delivery, session); - received->end(t); + received->scan(); + pn_link_advance(link); + + qpid::broker::Message message(received, received); + message.setPublisher(session->getParent()); + userid.verify(message.getUserId()); + message.computeExpiration(expiryPolicy); + handle(message); + --window; + received->begin(); + Transfer t(delivery, session); + received->end(t); + } } }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h index 127f8cecf9..807b918a17 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h @@ -78,6 +78,7 @@ class DecodingIncoming : public Incoming private: boost::shared_ptr<Session> session; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + boost::intrusive_ptr<Message> partial; }; }}} // namespace qpid::broker::amqp |
