summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Message.cpp50
-rw-r--r--cpp/src/qpid/broker/Message.h12
-rw-r--r--cpp/src/qpid/broker/Queue.cpp5
-rw-r--r--cpp/src/qpid/broker/Queue.h8
-rw-r--r--cpp/src/qpid/framing/Buffer.cpp7
-rw-r--r--cpp/src/qpid/framing/Buffer.h2
6 files changed, 78 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e3a98ae8f5..112e8dd77d 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -33,6 +33,24 @@ Message::Message(const ConnectionToken* const _publisher,
size(0),
persistenceId(0) {}
+Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
+ buffer.getShortString(exchange);
+ buffer.getShortString(routingKey);
+
+ AMQFrame headerFrame;
+ headerFrame.decode(buffer);
+ AMQHeaderBody::shared_ptr headerBody = dynamic_pointer_cast<AMQHeaderBody, AMQBody>(headerFrame.getBody());
+ setHeader(headerBody);
+
+ AMQContentBody::shared_ptr contentBody;
+ while (buffer.available()) {
+ AMQFrame contentFrame;
+ contentFrame.decode(buffer);
+ contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody());
+ addContent(contentBody);
+ }
+}
+
Message::~Message(){}
void Message::setHeader(AMQHeaderBody::shared_ptr _header){
@@ -97,3 +115,35 @@ bool Message::isPersistent()
BasicHeaderProperties* props = getHeaderProperties();
return props && props->getDeliveryMode() == PERSISTENT;
}
+
+void Message::encode(Buffer& buffer)
+{
+ buffer.putShortString(exchange);
+ buffer.putShortString(routingKey);
+
+ AMQBody::shared_ptr body;
+
+ body = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+
+ AMQFrame headerFrame(0, body);
+ headerFrame.encode(buffer);
+
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ body = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ AMQFrame contentFrame(0, body);
+ contentFrame.encode(buffer);
+ }
+}
+
+u_int32_t Message::encodedSize()
+{
+ int encodedContentSize(0);
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame (TODO, could replace frame by simple size)
+ }
+
+ return exchange.size() + 1
+ + routingKey.size() + 1
+ + header->size() + 8 //8 extra bytes for frame (TODO, could actually remove the frame)
+ + encodedContentSize;
+}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index f9acdfd0a5..06780ba6ed 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -40,8 +40,8 @@ namespace qpid {
typedef content_list::iterator content_iterator;
const ConnectionToken* const publisher;
- const string exchange;
- const string routingKey;
+ string exchange;
+ string routingKey;
const bool mandatory;
const bool immediate;
bool redelivered;
@@ -59,6 +59,7 @@ namespace qpid {
Message(const ConnectionToken* const publisher,
const string& exchange, const string& routingKey,
bool mandatory, bool immediate);
+ Message(qpid::framing::Buffer& buffer);
~Message();
void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
void addContent(qpid::framing::AMQContentBody::shared_ptr data);
@@ -84,7 +85,14 @@ namespace qpid {
u_int64_t contentSize() const { return size; }
u_int64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; }
+ void encode(qpid::framing::Buffer& buffer);
+ /**
+ * @returns the size of the buffer needed to encode this message
+ */
+ u_int32_t encodedSize();
+
};
+
}
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 46b14a23f5..2fe5de2dc9 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -57,6 +57,11 @@ void Queue::deliver(Message::shared_ptr& msg){
process(msg);
}
+void Queue::recover(Message::shared_ptr& msg){
+ queueing = true;
+ messages.push(msg);
+}
+
void Queue::process(Message::shared_ptr& msg){
Mutex::ScopedLock locker(lock);
if(queueing || !dispatch(msg)){
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index c146de1353..2f7ee3ec78 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -57,7 +57,7 @@ namespace qpid {
mutable qpid::sys::Mutex lock;
int64_t lastUsed;
Consumer* exclusive;
- u_int64_t persistenceId;
+ mutable u_int64_t persistenceId;
bool startDispatching();
bool dispatch(Message::shared_ptr& msg);
@@ -91,6 +91,10 @@ namespace qpid {
*/
void process(Message::shared_ptr& msg);
/**
+ * Used during recovery to add stored messages back to the queue
+ */
+ void recover(Message::shared_ptr& msg);
+ /**
* Dispatch any queued messages providing there are
* consumers for them. Only one thread can be dispatching
* at any time, but this method (rather than the caller)
@@ -107,7 +111,7 @@ namespace qpid {
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
inline u_int64_t getPersistenceId() const { return persistenceId; }
- inline void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; }
+ inline void setPersistenceId(u_int64_t _persistenceId) const { persistenceId = _persistenceId; }
bool canAutoDelete() const;
diff --git a/cpp/src/qpid/framing/Buffer.cpp b/cpp/src/qpid/framing/Buffer.cpp
index b7c17fa86f..05e9be83a9 100644
--- a/cpp/src/qpid/framing/Buffer.cpp
+++ b/cpp/src/qpid/framing/Buffer.cpp
@@ -18,12 +18,15 @@
#include "qpid/framing/Buffer.h"
#include "qpid/framing/FieldTable.h"
-qpid::framing::Buffer::Buffer(u_int32_t _size) : size(_size), position(0), limit(_size){
+qpid::framing::Buffer::Buffer(u_int32_t _size) : size(_size), owner(true), position(0), limit(_size){
data = new char[size];
}
+qpid::framing::Buffer::Buffer(char* _data, u_int32_t _size) : size(_size), owner(false), data(_data), position(0), limit(_size){
+}
+
qpid::framing::Buffer::~Buffer(){
- delete[] data;
+ if(owner) delete[] data;
}
void qpid::framing::Buffer::flip(){
diff --git a/cpp/src/qpid/framing/Buffer.h b/cpp/src/qpid/framing/Buffer.h
index 1698144908..8eb92a1d27 100644
--- a/cpp/src/qpid/framing/Buffer.h
+++ b/cpp/src/qpid/framing/Buffer.h
@@ -28,6 +28,7 @@ class FieldTable;
class Buffer
{
const u_int32_t size;
+ const bool owner;//indicates whether the data is owned by this instance
char* data;
u_int32_t position;
u_int32_t limit;
@@ -37,6 +38,7 @@ class Buffer
public:
Buffer(u_int32_t size);
+ Buffer(char* data, u_int32_t size);
~Buffer();
void flip();