summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-12-08 14:20:43 +0000
committerAlan Conway <aconway@apache.org>2011-12-08 14:20:43 +0000
commit586ae44e8cd4f92eca23de5066a4021a28bd8827 (patch)
tree0c5c4f892bf3483ec9c63d570c8d32fc81b3f0f9
parent04dcab92880ad16f7ac57a83ca90fb626448831f (diff)
downloadqpid-python-586ae44e8cd4f92eca23de5066a4021a28bd8827.tar.gz
QPID-3603: Cleaned up log messages, update qpid-cluster-benchmark to set replicate=all
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1211902 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp16
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py23
-rwxr-xr-xqpid/cpp/src/tests/qpid-cluster-benchmark11
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark11
6 files changed, 47 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 28f7911614..1d14b23ee1 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -86,7 +86,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest);
}
void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 875c2926c5..108f85637c 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -138,8 +138,9 @@ ReplicatingSubscription::ReplicatingSubscription(
//local queue (i.e. master) is empty
range.add(lwm, queue->getPosition());
}
- QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << " are " << range
- << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")");
+ QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": "
+ << range << " (lwm=" << lwm << ", hwm=" << hwm
+ << ", current=" << queue->getPosition() << ")");
//set position of 'cursor'
position = hwm;
}
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 62d456b21f..105a83118e 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -229,18 +229,14 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
Variant::Map& map = i->asMap();
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
- QPID_LOG(debug, "HA: Backup received event: schema=" << schema
+ QPID_LOG(trace, "HA: Backup received event: schema=" << schema
<< " values=" << values);
if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
else if (match<EventBind>(schema)) doEventBind(values);
- // FIXME aconway 2011-11-21: handle unbind & all other events.
- else if (match<EventSubscribe>(schema)) {} // Deliberately ignored.
- // FIXME aconway 2011-12-02: error handling
- else throw(Exception(QPID_MSG("Backup received unexpected event, schema="
- << schema)));
+ // FIXME aconway 2011-11-21: handle unbind & all other relevant events.
}
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -253,15 +249,13 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
- else throw Exception(QPID_MSG("HA: Unexpected response type: " << type));
+ // FIXME aconway 2011-12-06: handle all relevant response types.
}
} else {
- QPID_LOG(error, QPID_MSG("HA: Backup received unexpected message: "
- << *headers));
+ QPID_LOG(error, "HA: Backup replication got unexpected message: " << *headers);
}
} catch (const std::exception& e) {
- QPID_LOG(error, "HA: Backup replication error: " << e.what());
- QPID_LOG(error, "HA: Backup replication error while processing: " << list);
+ QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
}
}
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index fdf3562bc0..5957a68f9b 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -118,6 +118,29 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(p, "foo", [])
self.assert_browse_retry(b, "foo", [])
+ def test_sync(self):
+ def queue(name, replicate):
+ return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+ primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
+ p = primary.connect().session()
+ s = p.sender(queue("q","all"))
+ for m in [str(i) for i in range(0,10)]: s.send(m)
+ s.sync()
+ backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port())
+ for m in [str(i) for i in range(10,20)]: s.send(m)
+ s.sync()
+ backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port())
+ for m in [str(i) for i in range(20,30)]: s.send(m)
+ s.sync()
+ msgs = [str(i) for i in range(30)]
+
+ b = backup1.connect().session()
+ self.assert_browse_retry(b, "q", msgs)
+
+ b = backup2.connect().session()
+ self.assert_browse_retry(backup2.connect().session(), "q", msgs)
+
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark
index a5076799f6..fb0982bf83 100755
--- a/qpid/cpp/src/tests/qpid-cluster-benchmark
+++ b/qpid/cpp/src/tests/qpid-cluster-benchmark
@@ -30,7 +30,7 @@ RECEIVERS="-r 3"
BROKERS= # Local broker
CLIENT_HOSTS= # No ssh, all clients are local
-while getopts "m:f:n:b:q:s:r:c:txy" opt; do
+while getopts "m:f:n:b:q:s:r:c:txyv" opt; do
case $opt in
m) MESSAGES="-m $OPTARG";;
f) FLOW="--flow-control $OPTARG";;
@@ -43,13 +43,16 @@ while getopts "m:f:n:b:q:s:r:c:txy" opt; do
t) TCP_NODELAY="--connection-options {tcp-nodelay:true}";;
x) SAVE_RECEIVED="--save-received";;
y) NO_DELETE="--no-delete";;
+ v) OPTS="--verbose";;
*) echo "Unknown option"; exit 1;;
esac
done
+REPLICATE="node:{x-declare:{arguments:{'qpid.replicate':all}}}"
BROKER=$(echo $BROKERS | sed s/,.*//)
run_test() { echo $*; shift; "$@"; echo; echo; echo; }
-OPTS="$REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE"
-run_test "Queue contention:" qpid-cpp-benchmark $OPTS
-run_test "No queue contention: :" qpid-cpp-benchmark $OPTS --group-receivers
+OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE"
+OPTS="$OPTS --create-option $REPLICATE"
+run_test "Benchmark:" qpid-cpp-benchmark $OPTS
+
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 5dde7958d6..41cfc12ded 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -55,6 +55,8 @@ op.add_option("--send-option", default=[], action="append", type="str",
help="Additional option for sending addresses")
op.add_option("--receive-option", default=[], action="append", type="str",
help="Additional option for receiving addresses")
+op.add_option("--create-option", default=[], action="append", type="str",
+ help="Additional option for creating addresses")
op.add_option("--send-arg", default=[], action="append", type="str",
help="Additional argument for qpid-send")
op.add_option("--receive-arg", default=[], action="append", type="str",
@@ -75,6 +77,7 @@ op.add_option("--verbose", default=False, action="store_true",
help="Show commands executed")
op.add_option("--no-delete", default=False, action="store_true",
help="Don't delete the test queues.")
+
single_quote_re = re.compile("'")
def posix_quote(string):
""" Quote a string for use as an argument in a posix shell"""
@@ -176,7 +179,7 @@ def queue_exists(queue,broker):
return False
finally: c.close()
-def recreate_queues(queues, brokers, no_delete):
+def recreate_queues(queues, brokers, no_delete, opts):
c = qpid.messaging.Connection(brokers[0])
c.open()
s = c.session()
@@ -187,7 +190,9 @@ def recreate_queues(queues, brokers, no_delete):
# FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
for b in brokers:
while queue_exists(q,b): time.sleep(0.1);
- s.sender("%s;{create:always}"%q)
+ address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"]))
+ if opts.verbose: print "Creating", address
+ s.sender(address)
# FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
for b in brokers:
while not queue_exists(q,b): time.sleep(0.1);
@@ -285,7 +290,7 @@ def main():
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
try:
for i in xrange(opts.repeat):
- recreate_queues(queues, opts.broker, opts.no_delete)
+ recreate_queues(queues, opts.broker, opts.no_delete, opts)
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
if opts.group_receivers: # Run receivers for same queue against same broker.