diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/MessageBuilder.cpp | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/MessageBuilder.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 5 |
4 files changed, 16 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp index f5e9332052..109c9b8757 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -37,7 +37,7 @@ namespace const std::string QPID_MANAGEMENT("qpid.management"); } -MessageBuilder::MessageBuilder() : state(DORMANT) {} +MessageBuilder::MessageBuilder() : state(DORMANT), copyExchange(true) {} void MessageBuilder::handle(AMQFrame& frame) { @@ -60,7 +60,10 @@ void MessageBuilder::handle(AMQFrame& frame) header.setEof(false); message->getFrames().append(header); } else if (type == HEADER_BODY) { - frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setExchange(exchange); + if (copyExchange) { + frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)-> + setExchange(exchange); + } } else { throw CommandInvalidException( QPID_MSG("Invalid frame sequence for message, expected header or content got " diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.h b/qpid/cpp/src/qpid/broker/MessageBuilder.h index 5673ed3b7f..e7a668e18d 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.h +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.h @@ -43,11 +43,14 @@ namespace qpid { boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessage(); QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id); void end(); + void setCopyExchange(bool value) { copyExchange = value; } + private: enum State {DORMANT, METHOD, HEADER, CONTENT}; State state; boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> message; std::string exchange; + bool copyExchange; void checkType(uint8_t expected, uint8_t actual); }; diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index c71c520f9c..b1f18747f3 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -147,6 +147,9 @@ class SessionState : public qpid::SessionState, /** Send result and completion for a given command to the client. */ void completeCommand(SequenceNumber id, bool requiresAccept, bool requiresSync, const std::string& result); + + MessageBuilder& getMessageBuilder() { return msgBuilder; } + private: void handleCommand(framing::AMQMethodBody* method); void handleContent(framing::AMQFrame& frame); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 7997bc6aa9..3045829ce8 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -35,6 +35,7 @@ #include "qpid/broker/QueueObserver.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionHandler.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" @@ -248,6 +249,10 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa Mutex::ScopedLock l(lock); if (!queue) return; // Already destroyed sessionHandler = &sessionHandler_; + if (sessionHandler->getSession()) { + // Don't overwrite the exchange property set on the primary. + sessionHandler->getSession()->getMessageBuilder().setCopyExchange(false); + } AMQP_ServerProxy peer(sessionHandler->out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable arguments; |
