diff options
| author | Alan Conway <aconway@apache.org> | 2012-02-24 20:05:47 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-02-24 20:05:47 +0000 |
| commit | 67d8640e8d9315a22c1f54fce885ca8c80b09b2c (patch) | |
| tree | 8e56619a2a259b8370965d6414c096162b190bb0 /qpid/tools | |
| parent | 84aa47345dd4fd972284086e6f3a1f06bd1adb6f (diff) | |
| download | qpid-python-67d8640e8d9315a22c1f54fce885ca8c80b09b2c.tar.gz | |
QPID-3603: Improved command-based qpid-ha tool and ha config option names.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1293397 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/tools')
| -rwxr-xr-x | qpid/tools/src/py/qpid-ha | 234 | ||||
| -rwxr-xr-x | qpid/tools/src/py/qpid-ha-tool | 183 |
2 files changed, 234 insertions, 183 deletions
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha new file mode 100755 index 0000000000..d81a7a466c --- /dev/null +++ b/qpid/tools/src/py/qpid-ha @@ -0,0 +1,234 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import qmf.console, optparse, sys +from qpid.management import managementChannel, managementClient +from qpid.messaging import Connection +from qpid.messaging import Message as QpidMessage +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +# Utility for doing fast qmf2 operations on a broker. +class QmfBroker(object): + def __init__(self, conn): + self.conn = conn + self.sess = self.conn.session() + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ + str(uuid4()) + self.reply_rx = self.sess.receiver(self.reply_to) + self.reply_rx.capacity = 10 + self.tx = self.sess.sender("qmf.default.direct/broker") + self.next_correlator = 1 + + def close(self): + self.conn.close() + + def __repr__(self): + return "Qpid Broker: %s" % self.url + + def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"): + props = {'method' : 'request', + 'qmf.opcode' : '_method_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_object_id' : {'_object_name' : addr}, + '_method_name' : method, + '_arguments' : arguments} + + message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] == '_exception': + raise Exception("Exception from Agent: %r" % response.content['_values']) + if response.properties['qmf.opcode'] != '_method_response': + raise Exception("bad response: %r" % response.properties) + return response.content['_arguments'] + + def _sendRequest(self, opcode, content): + props = {'method' : 'request', + 'qmf.opcode' : opcode, + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + return correlator + + def _doClassQuery(self, class_name): + query = {'_what' : 'OBJECT', + '_schema_id' : {'_class_name' : class_name}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item['_values']) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + return items + + def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): + query = {'_what' : 'OBJECT', + '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item['_values']) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + if len(items) == 1: + return items[0] + return None + + def _getAllBrokerObjects(self, cls): + items = self._doClassQuery(cls.__name__.lower()) + objs = [] + for item in items: + objs.append(cls(self, item)) + return objs + + def _getBrokerObject(self, cls, name): + obj = self._doNameQuery(cls.__name__.lower(), name) + if obj: + return cls(self, obj) + return None + + def get_ha_broker(self): + ha_brokers = self._doClassQuery("habroker") + if (not ha_brokers): raise Exception("Broker does not have HA enabled.") + return ha_brokers[0] + +HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker" + +class Command: + commands = {} + + def __init__(self, name, help, args=""): + Command.commands[name] = self + self.name = name + usage="%s [options]%s\n\n%s"%(name, args, help) + self.help = help + self.op=optparse.OptionParser(usage) + self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>") + + def execute(self, command): + opts, args = self.op.parse_args(command) + broker = opts.broker or "localhost:5672" + # FIXME aconway 2012-02-23: enforce not doing primary-only operations on a backup & vice versa + connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) + try: self.do_execute(QmfBroker(connection), opts, args) + finally: connection.close() + + def do_execute(self, qmf_broker, opts, args): + raise Exception("Command '%s' is not yet implemented"%self.name) + +def print_all_help(name): + print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%name + for c in Command.commands: + help = Command.commands[c].help + print " %-12s %s."%(c, help.split(".")[0]) + print "\nFor help with a command: %s <command> --help\n"%name + + +class PromoteCmd(Command): + def __init__(self): + Command.__init__(self, "promote","Promote broker from backup to primary") + def do_execute(self, qmf_broker, opts, args): + qmf_broker._method("promote", {}, HA_BROKER) +PromoteCmd() + +class ReadyCmd(Command): + def __init__(self): + Command.__init__(self, "ready", "Test if a backup broker is ready.\nReturn 0 if broker is a ready backup, non-0 otherwise.") + self.op.add_option( + "--wait", type="int", metavar="<seconds>", + help="Wait up to <seconds> for broker to be ready. 0 means wait forever.") +ReadyCmd() + +class ReplicateCmd(Command): + def __init__(self): + Command.__init__(self, "replicate", "Replicate <queue> from <broker> to the current broker.", "<queue> <broker>") +ReplicateCmd() + +class SetCmd(Command): + def __init__(self): + Command.__init__(self, "set", "Set HA configuration settings") + def add(optname, metavar, type, help): + self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store") + add("--brokers", "<url>", "string", "HA brokers use <url> to connect to each other") + add("--public-brokers", "<url>", "string", "Clients use <url> to connect to HA brokers") + add("--backups", "<n>", "int", "Expect <n> backups to be running") + + def do_execute(self, qmf_broker, opts, args): + if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER) + if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER) + if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER) +SetCmd() + +class QueryCmd(Command): + def __init__(self): + Command.__init__(self, "query", "Print HA configuration settings") + + def do_execute(self, qmf_broker, opts, args): + hb = qmf_broker.get_ha_broker() + for x in [("Status:", "status"), + ("Brokers URL:", "brokers"), + ("Public URL:", "publicBrokers")]: + print "%-16s%s"%(x[0], hb[x[1]]) + +QueryCmd() + +def main(argv): + try: + command=argv[1:] + if command and command[0] == "--help-all": + for c in Command.commands.itervalues(): + c.op.print_help(); print + return 1 + if not command or not command[0] in Command.commands: + print_all_help(argv[0]); + return 1; + Command.commands[command[0]].execute(command) + except Exception, e: + raise # FIXME aconway 2012-02-23: + print e + return 1 + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/qpid/tools/src/py/qpid-ha-tool b/qpid/tools/src/py/qpid-ha-tool deleted file mode 100755 index 8e8107657c..0000000000 --- a/qpid/tools/src/py/qpid-ha-tool +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import qmf.console, optparse, sys -from qpid.management import managementChannel, managementClient -from qpid.messaging import Connection -from qpid.messaging import Message as QpidMessage -try: - from uuid import uuid4 -except ImportError: - from qpid.datatypes import uuid4 - -# Utility for doing fast qmf2 operations on a broker. -class QmfBroker(object): - def __init__(self, conn): - self.conn = conn - self.sess = self.conn.session() - self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ - str(uuid4()) - self.reply_rx = self.sess.receiver(self.reply_to) - self.reply_rx.capacity = 10 - self.tx = self.sess.sender("qmf.default.direct/broker") - self.next_correlator = 1 - - def close(self): - self.conn.close() - - def __repr__(self): - return "Qpid Broker: %s" % self.url - - def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"): - props = {'method' : 'request', - 'qmf.opcode' : '_method_request', - 'x-amqp-0-10.app-id' : 'qmf2'} - correlator = str(self.next_correlator) - self.next_correlator += 1 - - content = {'_object_id' : {'_object_name' : addr}, - '_method_name' : method, - '_arguments' : arguments} - - message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, - properties=props, subject="broker") - self.tx.send(message) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] == '_exception': - raise Exception("Exception from Agent: %r" % response.content['_values']) - if response.properties['qmf.opcode'] != '_method_response': - raise Exception("bad response: %r" % response.properties) - return response.content['_arguments'] - - def _sendRequest(self, opcode, content): - props = {'method' : 'request', - 'qmf.opcode' : opcode, - 'x-amqp-0-10.app-id' : 'qmf2'} - correlator = str(self.next_correlator) - self.next_correlator += 1 - message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, - properties=props, subject="broker") - self.tx.send(message) - return correlator - - def _doClassQuery(self, class_name): - query = {'_what' : 'OBJECT', - '_schema_id' : {'_class_name' : class_name}} - correlator = self._sendRequest('_query_request', query) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] != '_query_response': - raise Exception("bad response") - items = [] - done = False - while not done: - for item in response.content: - items.append(item['_values']) - if 'partial' in response.properties: - response = self.reply_rx.fetch(10) - else: - done = True - return items - - def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): - query = {'_what' : 'OBJECT', - '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} - correlator = self._sendRequest('_query_request', query) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] != '_query_response': - raise Exception("bad response") - items = [] - done = False - while not done: - for item in response.content: - items.append(item['_values']) - if 'partial' in response.properties: - response = self.reply_rx.fetch(10) - else: - done = True - if len(items) == 1: - return items[0] - return None - - def _getAllBrokerObjects(self, cls): - items = self._doClassQuery(cls.__name__.lower()) - objs = [] - for item in items: - objs.append(cls(self, item)) - return objs - - def _getBrokerObject(self, cls, name): - obj = self._doNameQuery(cls.__name__.lower(), name) - if obj: - return cls(self, obj) - return None - - -op=optparse.OptionParser(usage="Usage: %prog [options] [broker-address]") - -op.add_option("-p", "--promote", action="store_true", - help="Promote a backup broker to become the primary.") -op.add_option("-c", "--client-addresses", action="store", type="string", - help="Set list of addresses used by clients to connect to the HA cluster.") -op.add_option("-b", "--broker-addresses", action="store", type="string", - help="Set list of addresses used by HA brokers to connect to each other.") -op.add_option("-q", "--query", action="store_true", - help="Show the current HA settings on the broker.") - -def get_ha_broker(qmf_broker): - ha_brokers = qmf_broker._doClassQuery("habroker") - if (not ha_brokers): raise Exception("Broker does not have HA enabled.") - return ha_brokers[0] - -def main(argv): - try: - opts, args = op.parse_args(argv) - if len(args) >1: broker = args[1] - else: broker = "localhost:5672" - conn = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) - ha_broker = "org.apache.qpid.ha:habroker:ha-broker" - try: - qmf_broker = QmfBroker(conn) - get_ha_broker(qmf_broker) # Verify that HA is enabled - action=False - if opts.promote: - qmf_broker._method("promote", {}, ha_broker) - action=True - if opts.broker_addresses: - qmf_broker._method('setBrokerAddresses', {'brokerAddresses':opts.broker_addresses}, ha_broker) - action=True - if opts.client_addresses: - qmf_broker._method('setClientAddresses', {'clientAddresses':opts.client_addresses}, ha_broker) - action=True - if opts.query or not action: - hb = get_ha_broker(qmf_broker) - print "status=%s"%hb["status"] - print "broker-addresses=%s"%hb["brokerAddresses"] - print "client-addresses=%s"%hb["clientAddresses"] - return 0 - finally: - conn.close() # Avoid errors shutting down threads. - except Exception, e: - raise # FIXME aconway 2012-01-31: - print e - return 1 - -if __name__ == "__main__": - sys.exit(main(sys.argv)) |
