diff options
author | Alan Conway <aconway@apache.org> | 2011-11-30 15:01:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-11-30 15:01:06 +0000 |
commit | 392303a4c03c857122477cfbf14c8ec0900ef544 (patch) | |
tree | a126b619facd8c28042cd5b539d853b6fe6b10b3 | |
parent | 8a3cff4db5fbb95419764ae6f5e03c57053398b6 (diff) | |
download | qpid-python-392303a4c03c857122477cfbf14c8ec0900ef544.tar.gz |
QPID-3603: Cleanup of HA log messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1208461 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/ha.mk | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaPlugin.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Logging.cpp | 36 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Logging.h | 55 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 85 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 23 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 4 |
11 files changed, 158 insertions, 71 deletions
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index d367ba2101..dc4e7c8d0a 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -28,6 +28,8 @@ ha_la_SOURCES = \ qpid/ha/HaBroker.cpp \ qpid/ha/HaBroker.h \ qpid/ha/HaPlugin.cpp \ + qpid/ha/Logging.h \ + qpid/ha/Logging.cpp \ qpid/ha/Settings.h \ qpid/ha/QueueReplicator.h \ qpid/ha/QueueReplicator.cpp \ diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 39ddc527b0..17da13ed1e 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -46,7 +46,7 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { // FIXME aconway 2011-11-24: identifying the primary. if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary. Url url(s.brokerUrl); - QPID_LOG(info, "HA: Acting as backup to " << url); + QPID_LOG(info, "HA: Acting as backup"); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over. diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index f78941e071..22b7e46595 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -59,8 +59,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) mgmtObject->set_status("solo"); ma->addObject(mgmtObject); } - QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl - << ", broker-url=" << brokerUrl); + QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl + << " broker-url=" << brokerUrl); backup.reset(new Backup(broker, s)); // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index 798dbc2bfd..80f21e4320 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -56,7 +56,6 @@ struct HaPlugin : public Plugin { void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); if (broker && settings.enabled) { - QPID_LOG(info, "HA: Enabled"); haBroker.reset(new ha::HaBroker(*broker, settings)); } else QPID_LOG(info, "HA: Disabled"); diff --git a/qpid/cpp/src/qpid/ha/Logging.cpp b/qpid/cpp/src/qpid/ha/Logging.cpp new file mode 100644 index 0000000000..7d8ee38367 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Logging.cpp @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Logging.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/SequenceNumber.h" + +namespace qpid { +namespace ha { + +QueuePos::QueuePos(const broker::QueuedMessage& qm) + : queue(qm.queue), position(qm.position) {} + +std::ostream& operator<<(std::ostream& o, const QueuePos& qp) { + return o << qp.queue->getName() << "[" << qp.position << "]"; +} + +}} // namesopace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Logging.h b/qpid/cpp/src/qpid/ha/Logging.h new file mode 100644 index 0000000000..3b12baa390 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Logging.h @@ -0,0 +1,55 @@ +#ifndef QPID_HA_HAOSTREAM_H +#define QPID_HA_HAOSTREAM_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iosfwd> + +/**@file ostream helpers used in log messages. */ + +namespace qpid { + +namespace broker { +class Queue; +class QueuedMessage; +} + +namespace framing { +class SequenceNumber; +} + +namespace ha { + +// Other printable helpers + +struct QueuePos { + const broker::Queue* queue; + const framing::SequenceNumber& position; + QueuePos(const broker::Queue* q, const framing::SequenceNumber& pos) + : queue(q), position(pos) {} + QueuePos(const broker::QueuedMessage& qm); +}; + +std::ostream& operator<<(std::ostream& o, const QueuePos& h); + +}} // namespace qpid::ha + +#endif /*!QPID_HA_HAOSTREAM_H*/ diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index bf9de4cc0c..2de9ec5a59 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -21,6 +21,7 @@ #include "QueueReplicator.h" #include "ReplicatingSubscription.h" +#include "Logging.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" @@ -82,7 +83,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest); } @@ -105,14 +106,17 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid if (current < *i) { //haven't got that far yet, record the dequeue dequeued.add(*i); - QPID_LOG(debug, "Recording dequeue of message at " << *i << " from " << queue->getName()); + QPID_LOG(trace, "HA: Recording dequeue of message at " << + QueuePos(queue.get(), *i)); } else { QueuedMessage message; if (queue->acquireMessageAt(*i, message)) { queue->dequeue(0, message); - QPID_LOG(info, "Dequeued message at " << *i << " from " << queue->getName()); + QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message)); } else { - QPID_LOG(error, "Unable to dequeue message at " << *i << " from " << queue->getName()); + // FIXME aconway 2011-11-29: error handling + QPID_LOG(error, "HA: Unable to dequeue message at " + << QueuePos(queue.get(), *i)); } } } @@ -121,10 +125,10 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid //dequeued before our subscription reached them while (dequeued.contains(++current)) { dequeued.remove(current); - QPID_LOG(debug, "Skipping dequeued message at " << current << " from " << queue->getName()); + QPID_LOG(debug, "HA: Skipping dequeued message at " << current << " from " << queue->getName()); queue->setPosition(current); } - QPID_LOG(info, "Enqueued message on " << queue->getName() << "; currently at " << current); + QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; currently at " << current); msg.deliverTo(queue); } } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 8b9f9773d1..aabbd43631 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -20,6 +20,7 @@ */ #include "ReplicatingSubscription.h" +#include "Logging.h" #include "qpid/broker/Queue.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" @@ -39,7 +40,7 @@ const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_seque const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number"); const string DOLLAR("$"); -const string INTERNAL("_internal"); +const string INTERNAL("-internal"); class ReplicationStateInitialiser { @@ -55,7 +56,7 @@ class ReplicationStateInitialiser void operator()(const QueuedMessage& message) { if (message.position < start) { //replica does not have a message that should still be on the queue - QPID_LOG(warning, "Replica appears to be missing message at " << message.position); + QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message)); } else if (message.position >= start && message.position <= end) { //i.e. message is within the intial range and has not been dequeued, so remove it from the results results.remove(message.position); @@ -75,63 +76,63 @@ string mask(const string& in) boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( - SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, + SemanticState* parent, + const string& name, + Queue::shared_ptr queue, bool ack, - bool _acquire, - bool _exclusive, - const string& _tag, - const string& _resumeId, - uint64_t _resumeTtl, - const framing::FieldTable& _arguments + bool acquire, + bool exclusive, + const string& tag, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments ) { - return boost::shared_ptr<broker::SemanticState::ConsumerImpl>( - new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments)); + new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); } ReplicatingSubscription::ReplicatingSubscription( - SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, + SemanticState* parent, + const string& name, + Queue::shared_ptr queue, bool ack, - bool _acquire, - bool _exclusive, - const string& _tag, - const string& _resumeId, - uint64_t _resumeTtl, - const framing::FieldTable& _arguments -) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments), - events(new Queue(mask(_name))), + bool acquire, + bool exclusive, + const string& tag, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments +) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, + resumeId, resumeTtl, arguments), + events(new Queue(mask(name))), consumer(new DelegatingConsumer(*this)) { + QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName()); // FIXME aconway 2011-11-25: string constants. - QPID_LOG(debug, "HA: replicating subscription " << _name << " to " << _queue->getName()); - if (_arguments.isSet("qpid.high_sequence_number")) { - qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number"); + if (arguments.isSet("qpid.high_sequence_number")) { + qpid::framing::SequenceNumber hwm = arguments.getAsInt("qpid.high_sequence_number"); qpid::framing::SequenceNumber lwm; - if (_arguments.isSet("qpid.low_sequence_number")) { - lwm = _arguments.getAsInt("qpid.low_sequence_number"); + if (arguments.isSet("qpid.low_sequence_number")) { + lwm = arguments.getAsInt("qpid.low_sequence_number"); } else { lwm = hwm; } qpid::framing::SequenceNumber oldest; - if (_queue->getOldest(oldest)) { + if (queue->getOldest(oldest)) { if (oldest >= hwm) { range.add(lwm, --oldest); } else if (oldest >= lwm) { ReplicationStateInitialiser initialiser(range, lwm, hwm); - _queue->eachMessage(initialiser); + queue->eachMessage(initialiser); } else { //i.e. have older message on master than is reported to exist on replica - QPID_LOG(warning, "Replica appears to be missing message on master"); + QPID_LOG(warning, "HA: Replica missing message on master"); } } else { //local queue (i.e. master) is empty - range.add(lwm, _queue->getPosition()); + range.add(lwm, queue->getPosition()); } - QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range - << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")"); + QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << " are " << range + << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")"); //set position of 'cursor' position = hwm; } @@ -142,11 +143,6 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) return ConsumerImpl::deliver(m); } -void ReplicatingSubscription::init() -{ - getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); -} - void ReplicatingSubscription::cancel() { getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); @@ -158,10 +154,9 @@ ReplicatingSubscription::~ReplicatingSubscription() {} //under the message lock in the queue void ReplicatingSubscription::enqueued(const QueuedMessage& m) { - QPID_LOG(debug, "Enqueued message at " << m.position); + QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m)); //delay completion m.payload->getIngressCompletion().startCompleter(); - QPID_LOG(debug, "Delayed " << m.payload.get()); } void ReplicatingSubscription::generateDequeueEvent() @@ -203,12 +198,13 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m) { sys::Mutex::ScopedLock l(lock); range.add(m.position); - QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position); + // FIXME aconway 2011-11-29: q[pos] + QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position); } notify(); if (m.position > position) { m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue"); + QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to dequeue"); } } @@ -236,5 +232,4 @@ bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Me bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } - }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 649d256e55..b83842acbb 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -71,7 +71,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, ~ReplicatingSubscription(); - void init(); void cancel(); bool deliver(broker::QueuedMessage& msg); void enqueued(const broker::QueuedMessage&); diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index e621052fea..125e2c0ba6 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -213,7 +213,7 @@ void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(QUEUE, queueName, sessionHandler); sendQuery(EXCHANGE, queueName, sessionHandler); sendQuery(BINDING, queueName, sessionHandler); - QPID_LOG(debug, "Activated wiring replicator") + QPID_LOG(debug, "HA: Activated wiring replicator") } void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { @@ -227,10 +227,10 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { - Variant::Map& map = list.front().asMap(); + Variant::Map& map = i->asMap(); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); - QPID_LOG(trace, "HA: Configuration event from primary: " << values); + QPID_LOG(trace, "HA: Configuration event: schema=" << schema << " values=" << values); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); @@ -246,7 +246,7 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram Variant::Map& values = i->asMap()[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(values[ARGUMENTS].asMap(), args); - QPID_LOG(trace, "HA: Configuration response from primary: " << values); + QPID_LOG(trace, "HA: Configuration response type=" << type << " values=" << values); if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); @@ -284,6 +284,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { // re-create from event. // Events are always up to date, whereas responses may be // out of date. + QPID_LOG(debug, "HA: New queue replica " << name); startQueueReplicator(result.first); } else { QPID_LOG(warning, "HA: Replicated queue " << name << " already exists"); @@ -309,7 +310,7 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { string name = values[EXNAME].asString(); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - QPID_LOG(debug, "HA: Creating exchange from event " << name); + QPID_LOG(debug, "HA: New exchange replica " << name); if (!broker.createExchange( name, values[EXTYPE].asString(), @@ -320,7 +321,7 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { values[RHOST].asString()).second) { // FIXME aconway 2011-11-22: should delete pre-exisitng exchange // and re-create from event. See comment in doEventQueueDeclare. - QPID_LOG(warning, "Replicated exchange " << name << " already exists"); + QPID_LOG(warning, "HA: Replicated exchange " << name << " already exists"); } } } @@ -348,7 +349,7 @@ void WiringReplicator::doEventBind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(values[ARGS].asMap(), args); string key = values[KEY].asString(); - QPID_LOG(debug, "Replicated binding exchange=" << exchange->getName() + QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); exchange->bind(queue, key, &args); @@ -363,7 +364,6 @@ void WiringReplicator::doResponseQueue(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(argsMap, args); string name(values[NAME].asString()); - QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() << " (in catch-up)"); std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( name, @@ -375,11 +375,12 @@ void WiringReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); if (result.second) { + QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in catch-up)"); startQueueReplicator(result.first); } else { // FIXME aconway 2011-11-22: Normal to find queue already // exists if we're failing over. - QPID_LOG(warning, "Replicated queue " << values[NAME] << " already exists (in catch-up)"); + QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already exists (in catch-up)"); } } @@ -388,7 +389,7 @@ void WiringReplicator::doResponseExchange(Variant::Map& values) { if (!replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); - QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString() << " (in catch-up)"); + QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in catch-up)"); if (!broker.createExchange( values[NAME].asString(), values[TYPE].asString(), @@ -397,7 +398,7 @@ void WiringReplicator::doResponseExchange(Variant::Map& values) { args, ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(warning, "Replicated exchange " << values[QNAME] << " already exists (in catch-up)"); + QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " already exists (in catch-up)"); } } diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 021401bb08..9b52c2fca7 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -39,10 +39,6 @@ class ShortTests(BrokerTest): ] + args, **kwargs) - def setup_wiring(self, primary, backup): - cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary) - self.assertEqual(0, os.system(cmd)) - # FIXME aconway 2011-11-15: work around async replication. def wait(self, session, address): def check(): |