diff options
author | Alan Conway <aconway@apache.org> | 2011-12-08 14:20:43 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-12-08 14:20:43 +0000 |
commit | 586ae44e8cd4f92eca23de5066a4021a28bd8827 (patch) | |
tree | 0c5c4f892bf3483ec9c63d570c8d32fc81b3f0f9 | |
parent | 04dcab92880ad16f7ac57a83ca90fb626448831f (diff) | |
download | qpid-python-586ae44e8cd4f92eca23de5066a4021a28bd8827.tar.gz |
QPID-3603: Cleaned up log messages, update qpid-cluster-benchmark to set replicate=all
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1211902 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 16 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 23 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cluster-benchmark | 11 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 11 |
6 files changed, 47 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 28f7911614..1d14b23ee1 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -86,7 +86,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest); } void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/) diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 875c2926c5..108f85637c 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -138,8 +138,9 @@ ReplicatingSubscription::ReplicatingSubscription( //local queue (i.e. master) is empty range.add(lwm, queue->getPosition()); } - QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << " are " << range - << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")"); + QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": " + << range << " (lwm=" << lwm << ", hwm=" << hwm + << ", current=" << queue->getPosition() << ")"); //set position of 'cursor' position = hwm; } diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index 62d456b21f..105a83118e 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -229,18 +229,14 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram Variant::Map& map = i->asMap(); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); - QPID_LOG(debug, "HA: Backup received event: schema=" << schema + QPID_LOG(trace, "HA: Backup received event: schema=" << schema << " values=" << values); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); else if (match<EventBind>(schema)) doEventBind(values); - // FIXME aconway 2011-11-21: handle unbind & all other events. - else if (match<EventSubscribe>(schema)) {} // Deliberately ignored. - // FIXME aconway 2011-12-02: error handling - else throw(Exception(QPID_MSG("Backup received unexpected event, schema=" - << schema))); + // FIXME aconway 2011-11-21: handle unbind & all other relevant events. } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { @@ -253,15 +249,13 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); - else throw Exception(QPID_MSG("HA: Unexpected response type: " << type)); + // FIXME aconway 2011-12-06: handle all relevant response types. } } else { - QPID_LOG(error, QPID_MSG("HA: Backup received unexpected message: " - << *headers)); + QPID_LOG(error, "HA: Backup replication got unexpected message: " << *headers); } } catch (const std::exception& e) { - QPID_LOG(error, "HA: Backup replication error: " << e.what()); - QPID_LOG(error, "HA: Backup replication error while processing: " << list); + QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); } } diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index fdf3562bc0..5957a68f9b 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -118,6 +118,29 @@ class ShortTests(BrokerTest): self.assert_browse_retry(p, "foo", []) self.assert_browse_retry(b, "foo", []) + def test_sync(self): + def queue(name, replicate): + return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) + primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary + p = primary.connect().session() + s = p.sender(queue("q","all")) + for m in [str(i) for i in range(0,10)]: s.send(m) + s.sync() + backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port()) + for m in [str(i) for i in range(10,20)]: s.send(m) + s.sync() + backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port()) + for m in [str(i) for i in range(20,30)]: s.send(m) + s.sync() + msgs = [str(i) for i in range(30)] + + b = backup1.connect().session() + self.assert_browse_retry(b, "q", msgs) + + b = backup2.connect().session() + self.assert_browse_retry(backup2.connect().session(), "q", msgs) + + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark index a5076799f6..fb0982bf83 100755 --- a/qpid/cpp/src/tests/qpid-cluster-benchmark +++ b/qpid/cpp/src/tests/qpid-cluster-benchmark @@ -30,7 +30,7 @@ RECEIVERS="-r 3" BROKERS= # Local broker CLIENT_HOSTS= # No ssh, all clients are local -while getopts "m:f:n:b:q:s:r:c:txy" opt; do +while getopts "m:f:n:b:q:s:r:c:txyv" opt; do case $opt in m) MESSAGES="-m $OPTARG";; f) FLOW="--flow-control $OPTARG";; @@ -43,13 +43,16 @@ while getopts "m:f:n:b:q:s:r:c:txy" opt; do t) TCP_NODELAY="--connection-options {tcp-nodelay:true}";; x) SAVE_RECEIVED="--save-received";; y) NO_DELETE="--no-delete";; + v) OPTS="--verbose";; *) echo "Unknown option"; exit 1;; esac done +REPLICATE="node:{x-declare:{arguments:{'qpid.replicate':all}}}" BROKER=$(echo $BROKERS | sed s/,.*//) run_test() { echo $*; shift; "$@"; echo; echo; echo; } -OPTS="$REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE" -run_test "Queue contention:" qpid-cpp-benchmark $OPTS -run_test "No queue contention: :" qpid-cpp-benchmark $OPTS --group-receivers +OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE" +OPTS="$OPTS --create-option $REPLICATE" +run_test "Benchmark:" qpid-cpp-benchmark $OPTS + diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 5dde7958d6..41cfc12ded 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -55,6 +55,8 @@ op.add_option("--send-option", default=[], action="append", type="str", help="Additional option for sending addresses") op.add_option("--receive-option", default=[], action="append", type="str", help="Additional option for receiving addresses") +op.add_option("--create-option", default=[], action="append", type="str", + help="Additional option for creating addresses") op.add_option("--send-arg", default=[], action="append", type="str", help="Additional argument for qpid-send") op.add_option("--receive-arg", default=[], action="append", type="str", @@ -75,6 +77,7 @@ op.add_option("--verbose", default=False, action="store_true", help="Show commands executed") op.add_option("--no-delete", default=False, action="store_true", help="Don't delete the test queues.") + single_quote_re = re.compile("'") def posix_quote(string): """ Quote a string for use as an argument in a posix shell""" @@ -176,7 +179,7 @@ def queue_exists(queue,broker): return False finally: c.close() -def recreate_queues(queues, brokers, no_delete): +def recreate_queues(queues, brokers, no_delete, opts): c = qpid.messaging.Connection(brokers[0]) c.open() s = c.session() @@ -187,7 +190,9 @@ def recreate_queues(queues, brokers, no_delete): # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate for b in brokers: while queue_exists(q,b): time.sleep(0.1); - s.sender("%s;{create:always}"%q) + address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"])) + if opts.verbose: print "Creating", address + s.sender(address) # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate for b in brokers: while not queue_exists(q,b): time.sleep(0.1); @@ -285,7 +290,7 @@ def main(): queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] try: for i in xrange(opts.repeat): - recreate_queues(queues, opts.broker, opts.no_delete) + recreate_queues(queues, opts.broker, opts.no_delete, opts) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) if opts.group_receivers: # Run receivers for same queue against same broker. |