diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 50 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Buffer.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Buffer.h | 2 |
6 files changed, 78 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e3a98ae8f5..112e8dd77d 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -33,6 +33,24 @@ Message::Message(const ConnectionToken* const _publisher, size(0), persistenceId(0) {} +Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ + buffer.getShortString(exchange); + buffer.getShortString(routingKey); + + AMQFrame headerFrame; + headerFrame.decode(buffer); + AMQHeaderBody::shared_ptr headerBody = dynamic_pointer_cast<AMQHeaderBody, AMQBody>(headerFrame.getBody()); + setHeader(headerBody); + + AMQContentBody::shared_ptr contentBody; + while (buffer.available()) { + AMQFrame contentFrame; + contentFrame.decode(buffer); + contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody()); + addContent(contentBody); + } +} + Message::~Message(){} void Message::setHeader(AMQHeaderBody::shared_ptr _header){ @@ -97,3 +115,35 @@ bool Message::isPersistent() BasicHeaderProperties* props = getHeaderProperties(); return props && props->getDeliveryMode() == PERSISTENT; } + +void Message::encode(Buffer& buffer) +{ + buffer.putShortString(exchange); + buffer.putShortString(routingKey); + + AMQBody::shared_ptr body; + + body = static_pointer_cast<AMQBody, AMQHeaderBody>(header); + + AMQFrame headerFrame(0, body); + headerFrame.encode(buffer); + + for (content_iterator i = content.begin(); i != content.end(); i++) { + body = static_pointer_cast<AMQBody, AMQContentBody>(*i); + AMQFrame contentFrame(0, body); + contentFrame.encode(buffer); + } +} + +u_int32_t Message::encodedSize() +{ + int encodedContentSize(0); + for (content_iterator i = content.begin(); i != content.end(); i++) { + encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame (TODO, could replace frame by simple size) + } + + return exchange.size() + 1 + + routingKey.size() + 1 + + header->size() + 8 //8 extra bytes for frame (TODO, could actually remove the frame) + + encodedContentSize; +} diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index f9acdfd0a5..06780ba6ed 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -40,8 +40,8 @@ namespace qpid { typedef content_list::iterator content_iterator; const ConnectionToken* const publisher; - const string exchange; - const string routingKey; + string exchange; + string routingKey; const bool mandatory; const bool immediate; bool redelivered; @@ -59,6 +59,7 @@ namespace qpid { Message(const ConnectionToken* const publisher, const string& exchange, const string& routingKey, bool mandatory, bool immediate); + Message(qpid::framing::Buffer& buffer); ~Message(); void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); void addContent(qpid::framing::AMQContentBody::shared_ptr data); @@ -84,7 +85,14 @@ namespace qpid { u_int64_t contentSize() const { return size; } u_int64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } + void encode(qpid::framing::Buffer& buffer); + /** + * @returns the size of the buffer needed to encode this message + */ + u_int32_t encodedSize(); + }; + } } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 46b14a23f5..2fe5de2dc9 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -57,6 +57,11 @@ void Queue::deliver(Message::shared_ptr& msg){ process(msg); } +void Queue::recover(Message::shared_ptr& msg){ + queueing = true; + messages.push(msg); +} + void Queue::process(Message::shared_ptr& msg){ Mutex::ScopedLock locker(lock); if(queueing || !dispatch(msg)){ diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index c146de1353..2f7ee3ec78 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -57,7 +57,7 @@ namespace qpid { mutable qpid::sys::Mutex lock; int64_t lastUsed; Consumer* exclusive; - u_int64_t persistenceId; + mutable u_int64_t persistenceId; bool startDispatching(); bool dispatch(Message::shared_ptr& msg); @@ -91,6 +91,10 @@ namespace qpid { */ void process(Message::shared_ptr& msg); /** + * Used during recovery to add stored messages back to the queue + */ + void recover(Message::shared_ptr& msg); + /** * Dispatch any queued messages providing there are * consumers for them. Only one thread can be dispatching * at any time, but this method (rather than the caller) @@ -107,7 +111,7 @@ namespace qpid { inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } inline bool hasExclusiveConsumer() const { return exclusive; } inline u_int64_t getPersistenceId() const { return persistenceId; } - inline void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } + inline void setPersistenceId(u_int64_t _persistenceId) const { persistenceId = _persistenceId; } bool canAutoDelete() const; diff --git a/cpp/src/qpid/framing/Buffer.cpp b/cpp/src/qpid/framing/Buffer.cpp index b7c17fa86f..05e9be83a9 100644 --- a/cpp/src/qpid/framing/Buffer.cpp +++ b/cpp/src/qpid/framing/Buffer.cpp @@ -18,12 +18,15 @@ #include "qpid/framing/Buffer.h" #include "qpid/framing/FieldTable.h" -qpid::framing::Buffer::Buffer(u_int32_t _size) : size(_size), position(0), limit(_size){ +qpid::framing::Buffer::Buffer(u_int32_t _size) : size(_size), owner(true), position(0), limit(_size){ data = new char[size]; } +qpid::framing::Buffer::Buffer(char* _data, u_int32_t _size) : size(_size), owner(false), data(_data), position(0), limit(_size){ +} + qpid::framing::Buffer::~Buffer(){ - delete[] data; + if(owner) delete[] data; } void qpid::framing::Buffer::flip(){ diff --git a/cpp/src/qpid/framing/Buffer.h b/cpp/src/qpid/framing/Buffer.h index 1698144908..8eb92a1d27 100644 --- a/cpp/src/qpid/framing/Buffer.h +++ b/cpp/src/qpid/framing/Buffer.h @@ -28,6 +28,7 @@ class FieldTable; class Buffer { const u_int32_t size; + const bool owner;//indicates whether the data is owned by this instance char* data; u_int32_t position; u_int32_t limit; @@ -37,6 +38,7 @@ class Buffer public: Buffer(u_int32_t size); + Buffer(char* data, u_int32_t size); ~Buffer(); void flip(); |