summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/MessageBuilder.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/MessageBuilder.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h3
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp5
4 files changed, 16 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
index f5e9332052..109c9b8757 100644
--- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -37,7 +37,7 @@ namespace
const std::string QPID_MANAGEMENT("qpid.management");
}
-MessageBuilder::MessageBuilder() : state(DORMANT) {}
+MessageBuilder::MessageBuilder() : state(DORMANT), copyExchange(true) {}
void MessageBuilder::handle(AMQFrame& frame)
{
@@ -60,7 +60,10 @@ void MessageBuilder::handle(AMQFrame& frame)
header.setEof(false);
message->getFrames().append(header);
} else if (type == HEADER_BODY) {
- frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setExchange(exchange);
+ if (copyExchange) {
+ frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->
+ setExchange(exchange);
+ }
} else {
throw CommandInvalidException(
QPID_MSG("Invalid frame sequence for message, expected header or content got "
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.h b/qpid/cpp/src/qpid/broker/MessageBuilder.h
index 5673ed3b7f..e7a668e18d 100644
--- a/qpid/cpp/src/qpid/broker/MessageBuilder.h
+++ b/qpid/cpp/src/qpid/broker/MessageBuilder.h
@@ -43,11 +43,14 @@ namespace qpid {
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessage();
QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id);
void end();
+ void setCopyExchange(bool value) { copyExchange = value; }
+
private:
enum State {DORMANT, METHOD, HEADER, CONTENT};
State state;
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> message;
std::string exchange;
+ bool copyExchange;
void checkType(uint8_t expected, uint8_t actual);
};
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index c71c520f9c..b1f18747f3 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -147,6 +147,9 @@ class SessionState : public qpid::SessionState,
/** Send result and completion for a given command to the client. */
void completeCommand(SequenceNumber id, bool requiresAccept, bool requiresSync,
const std::string& result);
+
+ MessageBuilder& getMessageBuilder() { return msgBuilder; }
+
private:
void handleCommand(framing::AMQMethodBody* method);
void handleContent(framing::AMQFrame& frame);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 7997bc6aa9..3045829ce8 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -35,6 +35,7 @@
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -248,6 +249,10 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
sessionHandler = &sessionHandler_;
+ if (sessionHandler->getSession()) {
+ // Don't overwrite the exchange property set on the primary.
+ sessionHandler->getSession()->getMessageBuilder().setCopyExchange(false);
+ }
AMQP_ServerProxy peer(sessionHandler->out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;