summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-11-30 15:01:06 +0000
committerAlan Conway <aconway@apache.org>2011-11-30 15:01:06 +0000
commit392303a4c03c857122477cfbf14c8ec0900ef544 (patch)
treea126b619facd8c28042cd5b539d853b6fe6b10b3
parent8a3cff4db5fbb95419764ae6f5e03c57053398b6 (diff)
downloadqpid-python-392303a4c03c857122477cfbf14c8ec0900ef544.tar.gz
QPID-3603: Cleanup of HA log messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1208461 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/ha.mk2
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/Logging.cpp36
-rw-r--r--qpid/cpp/src/qpid/ha/Logging.h55
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp16
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp85
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h1
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp23
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py4
11 files changed, 158 insertions, 71 deletions
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index d367ba2101..dc4e7c8d0a 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -28,6 +28,8 @@ ha_la_SOURCES = \
qpid/ha/HaBroker.cpp \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
+ qpid/ha/Logging.h \
+ qpid/ha/Logging.cpp \
qpid/ha/Settings.h \
qpid/ha/QueueReplicator.h \
qpid/ha/QueueReplicator.cpp \
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 39ddc527b0..17da13ed1e 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -46,7 +46,7 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
// FIXME aconway 2011-11-24: identifying the primary.
if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary.
Url url(s.brokerUrl);
- QPID_LOG(info, "HA: Acting as backup to " << url);
+ QPID_LOG(info, "HA: Acting as backup");
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index f78941e071..22b7e46595 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -59,8 +59,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
mgmtObject->set_status("solo");
ma->addObject(mgmtObject);
}
- QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl
- << ", broker-url=" << brokerUrl);
+ QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
+ << " broker-url=" << brokerUrl);
backup.reset(new Backup(broker, s));
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index 798dbc2bfd..80f21e4320 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -56,7 +56,6 @@ struct HaPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && settings.enabled) {
- QPID_LOG(info, "HA: Enabled");
haBroker.reset(new ha::HaBroker(*broker, settings));
} else
QPID_LOG(info, "HA: Disabled");
diff --git a/qpid/cpp/src/qpid/ha/Logging.cpp b/qpid/cpp/src/qpid/ha/Logging.cpp
new file mode 100644
index 0000000000..7d8ee38367
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Logging.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Logging.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace ha {
+
+QueuePos::QueuePos(const broker::QueuedMessage& qm)
+ : queue(qm.queue), position(qm.position) {}
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& qp) {
+ return o << qp.queue->getName() << "[" << qp.position << "]";
+}
+
+}} // namesopace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Logging.h b/qpid/cpp/src/qpid/ha/Logging.h
new file mode 100644
index 0000000000..3b12baa390
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Logging.h
@@ -0,0 +1,55 @@
+#ifndef QPID_HA_HAOSTREAM_H
+#define QPID_HA_HAOSTREAM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iosfwd>
+
+/**@file ostream helpers used in log messages. */
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+class QueuedMessage;
+}
+
+namespace framing {
+class SequenceNumber;
+}
+
+namespace ha {
+
+// Other printable helpers
+
+struct QueuePos {
+ const broker::Queue* queue;
+ const framing::SequenceNumber& position;
+ QueuePos(const broker::Queue* q, const framing::SequenceNumber& pos)
+ : queue(q), position(pos) {}
+ QueuePos(const broker::QueuedMessage& qm);
+};
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& h);
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_HAOSTREAM_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index bf9de4cc0c..2de9ec5a59 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -21,6 +21,7 @@
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
+#include "Logging.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -82,7 +83,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest);
}
@@ -105,14 +106,17 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid
if (current < *i) {
//haven't got that far yet, record the dequeue
dequeued.add(*i);
- QPID_LOG(debug, "Recording dequeue of message at " << *i << " from " << queue->getName());
+ QPID_LOG(trace, "HA: Recording dequeue of message at " <<
+ QueuePos(queue.get(), *i));
} else {
QueuedMessage message;
if (queue->acquireMessageAt(*i, message)) {
queue->dequeue(0, message);
- QPID_LOG(info, "Dequeued message at " << *i << " from " << queue->getName());
+ QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message));
} else {
- QPID_LOG(error, "Unable to dequeue message at " << *i << " from " << queue->getName());
+ // FIXME aconway 2011-11-29: error handling
+ QPID_LOG(error, "HA: Unable to dequeue message at "
+ << QueuePos(queue.get(), *i));
}
}
}
@@ -121,10 +125,10 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid
//dequeued before our subscription reached them
while (dequeued.contains(++current)) {
dequeued.remove(current);
- QPID_LOG(debug, "Skipping dequeued message at " << current << " from " << queue->getName());
+ QPID_LOG(debug, "HA: Skipping dequeued message at " << current << " from " << queue->getName());
queue->setPosition(current);
}
- QPID_LOG(info, "Enqueued message on " << queue->getName() << "; currently at " << current);
+ QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; currently at " << current);
msg.deliverTo(queue);
}
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 8b9f9773d1..aabbd43631 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,6 +20,7 @@
*/
#include "ReplicatingSubscription.h"
+#include "Logging.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -39,7 +40,7 @@ const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_seque
const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
const string DOLLAR("$");
-const string INTERNAL("_internal");
+const string INTERNAL("-internal");
class ReplicationStateInitialiser
{
@@ -55,7 +56,7 @@ class ReplicationStateInitialiser
void operator()(const QueuedMessage& message) {
if (message.position < start) {
//replica does not have a message that should still be on the queue
- QPID_LOG(warning, "Replica appears to be missing message at " << message.position);
+ QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message));
} else if (message.position >= start && message.position <= end) {
//i.e. message is within the intial range and has not been dequeued, so remove it from the results
results.remove(message.position);
@@ -75,63 +76,63 @@ string mask(const string& in)
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
- SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
bool ack,
- bool _acquire,
- bool _exclusive,
- const string& _tag,
- const string& _resumeId,
- uint64_t _resumeTtl,
- const framing::FieldTable& _arguments
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
) {
-
return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
- new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments));
+ new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
}
ReplicatingSubscription::ReplicatingSubscription(
- SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
bool ack,
- bool _acquire,
- bool _exclusive,
- const string& _tag,
- const string& _resumeId,
- uint64_t _resumeTtl,
- const framing::FieldTable& _arguments
-) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments),
- events(new Queue(mask(_name))),
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments),
+ events(new Queue(mask(name))),
consumer(new DelegatingConsumer(*this))
{
+ QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
// FIXME aconway 2011-11-25: string constants.
- QPID_LOG(debug, "HA: replicating subscription " << _name << " to " << _queue->getName());
- if (_arguments.isSet("qpid.high_sequence_number")) {
- qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number");
+ if (arguments.isSet("qpid.high_sequence_number")) {
+ qpid::framing::SequenceNumber hwm = arguments.getAsInt("qpid.high_sequence_number");
qpid::framing::SequenceNumber lwm;
- if (_arguments.isSet("qpid.low_sequence_number")) {
- lwm = _arguments.getAsInt("qpid.low_sequence_number");
+ if (arguments.isSet("qpid.low_sequence_number")) {
+ lwm = arguments.getAsInt("qpid.low_sequence_number");
} else {
lwm = hwm;
}
qpid::framing::SequenceNumber oldest;
- if (_queue->getOldest(oldest)) {
+ if (queue->getOldest(oldest)) {
if (oldest >= hwm) {
range.add(lwm, --oldest);
} else if (oldest >= lwm) {
ReplicationStateInitialiser initialiser(range, lwm, hwm);
- _queue->eachMessage(initialiser);
+ queue->eachMessage(initialiser);
} else { //i.e. have older message on master than is reported to exist on replica
- QPID_LOG(warning, "Replica appears to be missing message on master");
+ QPID_LOG(warning, "HA: Replica missing message on master");
}
} else {
//local queue (i.e. master) is empty
- range.add(lwm, _queue->getPosition());
+ range.add(lwm, queue->getPosition());
}
- QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range
- << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")");
+ QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << " are " << range
+ << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")");
//set position of 'cursor'
position = hwm;
}
@@ -142,11 +143,6 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m)
return ConsumerImpl::deliver(m);
}
-void ReplicatingSubscription::init()
-{
- getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
void ReplicatingSubscription::cancel()
{
getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
@@ -158,10 +154,9 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
//under the message lock in the queue
void ReplicatingSubscription::enqueued(const QueuedMessage& m)
{
- QPID_LOG(debug, "Enqueued message at " << m.position);
+ QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m));
//delay completion
m.payload->getIngressCompletion().startCompleter();
- QPID_LOG(debug, "Delayed " << m.payload.get());
}
void ReplicatingSubscription::generateDequeueEvent()
@@ -203,12 +198,13 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
sys::Mutex::ScopedLock l(lock);
range.add(m.position);
- QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position);
+ // FIXME aconway 2011-11-29: q[pos]
+ QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position);
}
notify();
if (m.position > position) {
m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue");
+ QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to dequeue");
}
}
@@ -236,5 +232,4 @@ bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Me
bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
-
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 649d256e55..b83842acbb 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -71,7 +71,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
~ReplicatingSubscription();
- void init();
void cancel();
bool deliver(broker::QueuedMessage& msg);
void enqueued(const broker::QueuedMessage&);
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index e621052fea..125e2c0ba6 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -213,7 +213,7 @@ void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
sendQuery(QUEUE, queueName, sessionHandler);
sendQuery(EXCHANGE, queueName, sessionHandler);
sendQuery(BINDING, queueName, sessionHandler);
- QPID_LOG(debug, "Activated wiring replicator")
+ QPID_LOG(debug, "HA: Activated wiring replicator")
}
void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
@@ -227,10 +227,10 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
- Variant::Map& map = list.front().asMap();
+ Variant::Map& map = i->asMap();
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
- QPID_LOG(trace, "HA: Configuration event from primary: " << values);
+ QPID_LOG(trace, "HA: Configuration event: schema=" << schema << " values=" << values);
if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
@@ -246,7 +246,7 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
Variant::Map& values = i->asMap()[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
- QPID_LOG(trace, "HA: Configuration response from primary: " << values);
+ QPID_LOG(trace, "HA: Configuration response type=" << type << " values=" << values);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
@@ -284,6 +284,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
// re-create from event.
// Events are always up to date, whereas responses may be
// out of date.
+ QPID_LOG(debug, "HA: New queue replica " << name);
startQueueReplicator(result.first);
} else {
QPID_LOG(warning, "HA: Replicated queue " << name << " already exists");
@@ -309,7 +310,7 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- QPID_LOG(debug, "HA: Creating exchange from event " << name);
+ QPID_LOG(debug, "HA: New exchange replica " << name);
if (!broker.createExchange(
name,
values[EXTYPE].asString(),
@@ -320,7 +321,7 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
values[RHOST].asString()).second) {
// FIXME aconway 2011-11-22: should delete pre-exisitng exchange
// and re-create from event. See comment in doEventQueueDeclare.
- QPID_LOG(warning, "Replicated exchange " << name << " already exists");
+ QPID_LOG(warning, "HA: Replicated exchange " << name << " already exists");
}
}
}
@@ -348,7 +349,7 @@ void WiringReplicator::doEventBind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(values[ARGS].asMap(), args);
string key = values[KEY].asString();
- QPID_LOG(debug, "Replicated binding exchange=" << exchange->getName()
+ QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
exchange->bind(queue, key, &args);
@@ -363,7 +364,6 @@ void WiringReplicator::doResponseQueue(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
string name(values[NAME].asString());
- QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() << " (in catch-up)");
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
@@ -375,11 +375,12 @@ void WiringReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
if (result.second) {
+ QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in catch-up)");
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-11-22: Normal to find queue already
// exists if we're failing over.
- QPID_LOG(warning, "Replicated queue " << values[NAME] << " already exists (in catch-up)");
+ QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already exists (in catch-up)");
}
}
@@ -388,7 +389,7 @@ void WiringReplicator::doResponseExchange(Variant::Map& values) {
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString() << " (in catch-up)");
+ QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in catch-up)");
if (!broker.createExchange(
values[NAME].asString(),
values[TYPE].asString(),
@@ -397,7 +398,7 @@ void WiringReplicator::doResponseExchange(Variant::Map& values) {
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second) {
- QPID_LOG(warning, "Replicated exchange " << values[QNAME] << " already exists (in catch-up)");
+ QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " already exists (in catch-up)");
}
}
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 021401bb08..9b52c2fca7 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -39,10 +39,6 @@ class ShortTests(BrokerTest):
] + args,
**kwargs)
- def setup_wiring(self, primary, backup):
- cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary)
- self.assertEqual(0, os.system(cmd))
-
# FIXME aconway 2011-11-15: work around async replication.
def wait(self, session, address):
def check():