summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-01-09 11:20:51 +0000
committerGordon Sim <gsim@apache.org>2014-01-09 11:20:51 +0000
commit665989f6c6192b4ae68069031bee86e5899571e7 (patch)
tree54f63140b1da6d62eb815f38614f04a86f916be1 /qpid/cpp/src
parent8562fa7dde6762a1fa03a7f4e14aecf1d45cb2df (diff)
downloadqpid-python-665989f6c6192b4ae68069031bee86e5899571e7.tar.gz
QPID-5457: support for messages composed of multiple transfers
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556787 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/amqp/Decoder.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.cpp46
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.h1
3 files changed, 35 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/amqp/Decoder.cpp b/qpid/cpp/src/qpid/amqp/Decoder.cpp
index 1058f83e38..de2c5ce64d 100644
--- a/qpid/cpp/src/qpid/amqp/Decoder.cpp
+++ b/qpid/cpp/src/qpid/amqp/Decoder.cpp
@@ -301,7 +301,7 @@ Descriptor Decoder::readDescriptor()
void Decoder::advance(size_t n)
{
- if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds"));
+ if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds: requested advance of " << n << " at " << position << " but only " << available() << " available"));
position += n;
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
index 347c3eb14d..a1556f5249 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -27,6 +27,7 @@
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -111,19 +112,38 @@ DecodingIncoming::~DecodingIncoming() {}
void DecodingIncoming::readable(pn_delivery_t* delivery)
{
- boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery)));
- /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize());
- received->scan();
- pn_link_advance(link);
+ size_t pending = pn_delivery_pending(delivery);
+ size_t offset = partial ? partial->getSize() : 0;
+ boost::intrusive_ptr<Message> received(new Message(offset + pending));
+ if (partial) {
+ ::memcpy(received->getData(), partial->getData(), offset);
+ partial = boost::intrusive_ptr<Message>();
+ }
+ assert(received->getSize() == pending + offset);
+ pn_link_recv(link, received->getData() + offset, pending);
+
+ if (pn_delivery_partial(delivery)) {
+ QPID_LOG(debug, "Message incomplete: received " << pending << " bytes, now have " << received->getSize());
+ partial = received;
+ } else {
+ if (offset) {
+ QPID_LOG(debug, "Message complete: received " << pending << " bytes, " << received->getSize() << " in total");
+ } else {
+ QPID_LOG(debug, "Message received: " << received->getSize() << " bytes");
+ }
- qpid::broker::Message message(received, received);
- message.setPublisher(session->getParent());
- userid.verify(message.getUserId());
- message.computeExpiration(expiryPolicy);
- handle(message);
- --window;
- received->begin();
- Transfer t(delivery, session);
- received->end(t);
+ received->scan();
+ pn_link_advance(link);
+
+ qpid::broker::Message message(received, received);
+ message.setPublisher(session->getParent());
+ userid.verify(message.getUserId());
+ message.computeExpiration(expiryPolicy);
+ handle(message);
+ --window;
+ received->begin();
+ Transfer t(delivery, session);
+ received->end(t);
+ }
}
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
index 127f8cecf9..807b918a17 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
@@ -78,6 +78,7 @@ class DecodingIncoming : public Incoming
private:
boost::shared_ptr<Session> session;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ boost::intrusive_ptr<Message> partial;
};
}}} // namespace qpid::broker::amqp