diff options
| author | Alan Conway <aconway@apache.org> | 2012-02-24 20:05:47 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-02-24 20:05:47 +0000 |
| commit | 67d8640e8d9315a22c1f54fce885ca8c80b09b2c (patch) | |
| tree | 8e56619a2a259b8370965d6414c096162b190bb0 | |
| parent | 84aa47345dd4fd972284086e6f3a1f06bd1adb6f (diff) | |
| download | qpid-python-67d8640e8d9315a22c1f54fce885ca8c80b09b2c.tar.gz | |
QPID-3603: Improved command-based qpid-ha tool and ha config option names.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1293397 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/cpp/src/Makefile.am | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 31 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaPlugin.cpp | 20 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Settings.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/management-schema.xml | 26 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 16 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/test_env.sh.in | 2 | ||||
| -rwxr-xr-x | qpid/tools/src/py/qpid-ha | 234 | ||||
| -rwxr-xr-x | qpid/tools/src/py/qpid-ha-tool | 183 |
10 files changed, 305 insertions, 221 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index bdaf40b073..e03c88ec8b 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -102,15 +102,16 @@ $(srcdir)/rubygen.cmake: $(rgen_generator) $(specs) # Management generator. mgen_dir=$(top_srcdir)/managementgen -mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk \ - -c $(srcdir)/managementgen.cmake -q -b -o qmf \ - $(top_srcdir)/../specs/management-schema.xml \ +mgen_xml=$(top_srcdir)/../specs/management-schema.xml \ $(srcdir)/qpid/acl/management-schema.xml \ $(srcdir)/qpid/cluster/management-schema.xml \ $(srcdir)/qpid/ha/management-schema.xml +mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk \ + -c $(srcdir)/managementgen.cmake -q -b -o qmf \ + $(mgen_xml) $(srcdir)/managementgen.mk $(mgen_broker_cpp) $(dist_qpid_management_HEADERS): mgen.timestamp -mgen.timestamp: $(mgen_generator) +mgen.timestamp: $(mgen_generator) $(mgen_xml) $(mgen_cmd); touch $@ $(mgen_generator): diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 18cd578d90..d92749abeb 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -27,8 +27,9 @@ #include "qpid/broker/Broker.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/ha/Package.h" -#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetClientAddresses.h" -#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokerAddresses.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h" #include "qpid/log/Statement.h" namespace qpid { @@ -88,18 +89,19 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, } break; } - case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: { - setClientUrl( - Url(dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args). - i_clientAddresses), l); + case _qmf::HaBroker::METHOD_SETBROKERS: { + setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokers&>(args).i_url), l); break; } - case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: { - setBrokerUrl( - Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args) - .i_brokerAddresses), l); + case _qmf::HaBroker::METHOD_SETPUBLICBROKERS: { + setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicBrokers&>(args).i_url), l); break; } + case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: { + setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l); + break; + } + default: return Manageable::STATUS_UNKNOWN_METHOD; } @@ -115,7 +117,7 @@ void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) { void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) { Url url = clientUrl.empty() ? brokerUrl : clientUrl; assert(!url.empty()); - mgmtObject->set_clientAddresses(url.str()); + mgmtObject->set_publicBrokers(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); QPID_LOG(debug, "HA: Setting client URL to: " << url); @@ -125,12 +127,17 @@ void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) { if (url.empty()) throw Exception("Invalid empty URL for HA broker failover"); QPID_LOG(debug, "HA: Setting broker URL to: " << url); brokerUrl = url; - mgmtObject->set_brokerAddresses(brokerUrl.str()); + mgmtObject->set_brokers(brokerUrl.str()); if (backup.get()) backup->setBrokerUrl(brokerUrl); // Updating broker URL also updates defaulted client URL: if (clientUrl.empty()) updateClientUrl(l); } +void HaBroker::setExpectedBackups(size_t n, const sys::Mutex::ScopedLock&) { + expectedBackups = n; + mgmtObject->set_expectedBackups(n); +} + std::vector<Url> HaBroker::getKnownBrokers() const { return knownBrokers; } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 835a47c749..4f4ee4c944 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -55,6 +55,7 @@ class HaBroker : public management::Manageable private: void setClientUrl(const Url&, const sys::Mutex::ScopedLock&); void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&); + void setExpectedBackups(size_t, const sys::Mutex::ScopedLock&); void updateClientUrl(const sys::Mutex::ScopedLock&); bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); } std::vector<Url> getKnownBrokers() const; @@ -67,6 +68,7 @@ class HaBroker : public management::Manageable qmf::org::apache::qpid::ha::HaBroker* mgmtObject; Url clientUrl, brokerUrl; std::vector<Url> knownBrokers; + size_t expectedBackups; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index e9b3d4d311..b3080330fb 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -31,12 +31,20 @@ struct Options : public qpid::Options { Settings& settings; Options(Settings& s) : qpid::Options("HA Options"), settings(s) { addOptions() - ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features") - ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.") - ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.") - ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers") - ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers") - ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers") + ("ha-cluster", optValue(settings.enabled, "yes|no"), + "Join a HA active/passive cluster.") + ("ha-brokers", optValue(settings.brokerUrl,"URL"), + "URL that backup brokers use to connect and fail over.") + ("ha-public-brokers", optValue(settings.clientUrl,"URL"), + "URL that clients use to connect and fail over, defaults to ha-brokers.") + ("ha-expected-backups", optValue(settings.expectedBackups, "N"), + "Number of backups expected to be active in the HA cluster.") + ("ha-username", optValue(settings.username, "USER"), + "Username for connections between HA brokers") + ("ha-password", optValue(settings.password, "PASS"), + "Password for connections between HA brokers") + ("ha-mechanism", optValue(settings.mechanism, "MECH"), + "Authentication mechanism for connections between HA brokers") ; } }; diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index 049c873b9f..52a64c8330 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -33,10 +33,11 @@ namespace ha { class Settings { public: - Settings() : enabled(false) {} + Settings() : enabled(false), expectedBackups(0) {} bool enabled; std::string clientUrl; std::string brokerUrl; + size_t expectedBackups; std::string username, password, mechanism; private: }; diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml index fe4a14d111..05ed5f02ce 100644 --- a/qpid/cpp/src/qpid/ha/management-schema.xml +++ b/qpid/cpp/src/qpid/ha/management-schema.xml @@ -22,16 +22,30 @@ <!-- Monitor and control HA status of a broker. --> <class name="HaBroker"> <property name="name" type="sstr" access="RC" index="y" desc="Primary Key"/> + <property name="status" type="sstr" desc="HA status: primary or backup"/> - <property name="clientAddresses" type="sstr" desc="List of addresses used by clients to connect to the HA cluster."/> - <property name="brokerAddresses" type="sstr" desc="List of addresses used by HA brokers to connect to each other."/> + + <property name="brokers" type="sstr" + desc="Multiple-address URL used by HA brokers to connect to each other."/> + + <property name="publicBrokers" type="sstr" + desc="Multiple-address URL used by clients to connect to the HA brokers."/> + + <property name="expectedBackups" type="uint16" + desc="Number of HA backup brokers expected."/>> <method name="promote" desc="Promote a backup broker to primary."/> - <method name="setClientAddresses" desc="Set HA client addresses"> - <arg name="clientAddresses" type="sstr" dir="I"/> + + <method name="setBrokers" desc="Set URL for HA brokers to connect to each other."> + <arg name="url" type="sstr" dir="I"/> + </method> + + <method name="setPublicBrokers" desc="Set URL for clients to connect to HA brokers"> + <arg name="url" type="sstr" dir="I"/> </method> - <method name="setBrokerAddresses" desc="Set HA broker addresses"> - <arg name="brokerAddresses" type="sstr" dir="I"/> + + <method name="setExpectedBackups" desc="Set number of backups expected"> + <arg name="expectedBackups" type="uint16" dir="I"/> </method> </class> diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index dfcec26d46..264a636f29 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -34,20 +34,20 @@ class HaBroker(Broker): args=["--load-module", BrokerTest.ha_lib, # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", - "--ha-enable=yes"] - if broker_url: args += [ "--ha-broker-url", broker_url ] + "--ha-cluster=yes"] + if broker_url: args += [ "--ha-brokers", broker_url ] Broker.__init__(self, test, args, **kwargs) def promote(self): - assert os.system("$QPID_HA_TOOL_EXEC --promote %s"%(self.host_port())) == 0 + assert os.system("$QPID_HA_EXEC promote -b %s"%(self.host_port())) == 0 def set_client_url(self, url): assert os.system( - "$QPID_HA_TOOL_EXEC --client-addresses=%s %s"%(url,self.host_port())) == 0 + "$QPID_HA_EXEC set --public-brokers=%s -b %s"%(url,self.host_port())) == 0 def set_broker_url(self, url): assert os.system( - "$QPID_HA_TOOL_EXEC --broker-addresses=%s %s"%(url, self.host_port())) == 0 + "$QPID_HA_EXEC set --brokers=%s -b %s"%(url, self.host_port())) == 0 def set_broker_urls(brokers): url = ",".join([b.host_port() for b in brokers]) @@ -475,10 +475,10 @@ class LongTests(BrokerTest): if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) - qpid_ha_tool = os.getenv("QPID_HA_TOOL_EXEC") - if qpid_ha_tool and os.path.exists(qpid_ha_tool): + qpid_ha = os.getenv("QPID_HA_EXEC") + if qpid_ha and os.path.exists(qpid_ha): os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) else: - print "Skipping ha_tests, qpid_ha_tool not available" + print "Skipping ha_tests, qpid_ha not available" diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in index 667a2f0a1b..5c07bcdc2e 100644 --- a/qpid/cpp/src/tests/test_env.sh.in +++ b/qpid/cpp/src/tests/test_env.sh.in @@ -44,7 +44,7 @@ export PYTHONPATH=$srcdir:$PYTHON_DIR:$PYTHON_COMMANDS:$QPID_TESTS_PY:$QMF_LIB:$ export QPID_CONFIG_EXEC=$PYTHON_COMMANDS/qpid-config export QPID_ROUTE_EXEC=$PYTHON_COMMANDS/qpid-route export QPID_CLUSTER_EXEC=$PYTHON_COMMANDS/qpid-cluster -export QPID_HA_TOOL_EXEC=$PYTHON_COMMANDS/qpid-ha-tool +export QPID_HA_EXEC=$PYTHON_COMMANDS/qpid-ha # Executables export QPIDD_EXEC=$top_builddir/src/qpidd diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha new file mode 100755 index 0000000000..d81a7a466c --- /dev/null +++ b/qpid/tools/src/py/qpid-ha @@ -0,0 +1,234 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import qmf.console, optparse, sys +from qpid.management import managementChannel, managementClient +from qpid.messaging import Connection +from qpid.messaging import Message as QpidMessage +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +# Utility for doing fast qmf2 operations on a broker. +class QmfBroker(object): + def __init__(self, conn): + self.conn = conn + self.sess = self.conn.session() + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ + str(uuid4()) + self.reply_rx = self.sess.receiver(self.reply_to) + self.reply_rx.capacity = 10 + self.tx = self.sess.sender("qmf.default.direct/broker") + self.next_correlator = 1 + + def close(self): + self.conn.close() + + def __repr__(self): + return "Qpid Broker: %s" % self.url + + def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"): + props = {'method' : 'request', + 'qmf.opcode' : '_method_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_object_id' : {'_object_name' : addr}, + '_method_name' : method, + '_arguments' : arguments} + + message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] == '_exception': + raise Exception("Exception from Agent: %r" % response.content['_values']) + if response.properties['qmf.opcode'] != '_method_response': + raise Exception("bad response: %r" % response.properties) + return response.content['_arguments'] + + def _sendRequest(self, opcode, content): + props = {'method' : 'request', + 'qmf.opcode' : opcode, + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + return correlator + + def _doClassQuery(self, class_name): + query = {'_what' : 'OBJECT', + '_schema_id' : {'_class_name' : class_name}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item['_values']) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + return items + + def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): + query = {'_what' : 'OBJECT', + '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item['_values']) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + if len(items) == 1: + return items[0] + return None + + def _getAllBrokerObjects(self, cls): + items = self._doClassQuery(cls.__name__.lower()) + objs = [] + for item in items: + objs.append(cls(self, item)) + return objs + + def _getBrokerObject(self, cls, name): + obj = self._doNameQuery(cls.__name__.lower(), name) + if obj: + return cls(self, obj) + return None + + def get_ha_broker(self): + ha_brokers = self._doClassQuery("habroker") + if (not ha_brokers): raise Exception("Broker does not have HA enabled.") + return ha_brokers[0] + +HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker" + +class Command: + commands = {} + + def __init__(self, name, help, args=""): + Command.commands[name] = self + self.name = name + usage="%s [options]%s\n\n%s"%(name, args, help) + self.help = help + self.op=optparse.OptionParser(usage) + self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>") + + def execute(self, command): + opts, args = self.op.parse_args(command) + broker = opts.broker or "localhost:5672" + # FIXME aconway 2012-02-23: enforce not doing primary-only operations on a backup & vice versa + connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) + try: self.do_execute(QmfBroker(connection), opts, args) + finally: connection.close() + + def do_execute(self, qmf_broker, opts, args): + raise Exception("Command '%s' is not yet implemented"%self.name) + +def print_all_help(name): + print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%name + for c in Command.commands: + help = Command.commands[c].help + print " %-12s %s."%(c, help.split(".")[0]) + print "\nFor help with a command: %s <command> --help\n"%name + + +class PromoteCmd(Command): + def __init__(self): + Command.__init__(self, "promote","Promote broker from backup to primary") + def do_execute(self, qmf_broker, opts, args): + qmf_broker._method("promote", {}, HA_BROKER) +PromoteCmd() + +class ReadyCmd(Command): + def __init__(self): + Command.__init__(self, "ready", "Test if a backup broker is ready.\nReturn 0 if broker is a ready backup, non-0 otherwise.") + self.op.add_option( + "--wait", type="int", metavar="<seconds>", + help="Wait up to <seconds> for broker to be ready. 0 means wait forever.") +ReadyCmd() + +class ReplicateCmd(Command): + def __init__(self): + Command.__init__(self, "replicate", "Replicate <queue> from <broker> to the current broker.", "<queue> <broker>") +ReplicateCmd() + +class SetCmd(Command): + def __init__(self): + Command.__init__(self, "set", "Set HA configuration settings") + def add(optname, metavar, type, help): + self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store") + add("--brokers", "<url>", "string", "HA brokers use <url> to connect to each other") + add("--public-brokers", "<url>", "string", "Clients use <url> to connect to HA brokers") + add("--backups", "<n>", "int", "Expect <n> backups to be running") + + def do_execute(self, qmf_broker, opts, args): + if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER) + if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER) + if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER) +SetCmd() + +class QueryCmd(Command): + def __init__(self): + Command.__init__(self, "query", "Print HA configuration settings") + + def do_execute(self, qmf_broker, opts, args): + hb = qmf_broker.get_ha_broker() + for x in [("Status:", "status"), + ("Brokers URL:", "brokers"), + ("Public URL:", "publicBrokers")]: + print "%-16s%s"%(x[0], hb[x[1]]) + +QueryCmd() + +def main(argv): + try: + command=argv[1:] + if command and command[0] == "--help-all": + for c in Command.commands.itervalues(): + c.op.print_help(); print + return 1 + if not command or not command[0] in Command.commands: + print_all_help(argv[0]); + return 1; + Command.commands[command[0]].execute(command) + except Exception, e: + raise # FIXME aconway 2012-02-23: + print e + return 1 + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/qpid/tools/src/py/qpid-ha-tool b/qpid/tools/src/py/qpid-ha-tool deleted file mode 100755 index 8e8107657c..0000000000 --- a/qpid/tools/src/py/qpid-ha-tool +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import qmf.console, optparse, sys -from qpid.management import managementChannel, managementClient -from qpid.messaging import Connection -from qpid.messaging import Message as QpidMessage -try: - from uuid import uuid4 -except ImportError: - from qpid.datatypes import uuid4 - -# Utility for doing fast qmf2 operations on a broker. -class QmfBroker(object): - def __init__(self, conn): - self.conn = conn - self.sess = self.conn.session() - self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ - str(uuid4()) - self.reply_rx = self.sess.receiver(self.reply_to) - self.reply_rx.capacity = 10 - self.tx = self.sess.sender("qmf.default.direct/broker") - self.next_correlator = 1 - - def close(self): - self.conn.close() - - def __repr__(self): - return "Qpid Broker: %s" % self.url - - def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"): - props = {'method' : 'request', - 'qmf.opcode' : '_method_request', - 'x-amqp-0-10.app-id' : 'qmf2'} - correlator = str(self.next_correlator) - self.next_correlator += 1 - - content = {'_object_id' : {'_object_name' : addr}, - '_method_name' : method, - '_arguments' : arguments} - - message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, - properties=props, subject="broker") - self.tx.send(message) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] == '_exception': - raise Exception("Exception from Agent: %r" % response.content['_values']) - if response.properties['qmf.opcode'] != '_method_response': - raise Exception("bad response: %r" % response.properties) - return response.content['_arguments'] - - def _sendRequest(self, opcode, content): - props = {'method' : 'request', - 'qmf.opcode' : opcode, - 'x-amqp-0-10.app-id' : 'qmf2'} - correlator = str(self.next_correlator) - self.next_correlator += 1 - message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, - properties=props, subject="broker") - self.tx.send(message) - return correlator - - def _doClassQuery(self, class_name): - query = {'_what' : 'OBJECT', - '_schema_id' : {'_class_name' : class_name}} - correlator = self._sendRequest('_query_request', query) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] != '_query_response': - raise Exception("bad response") - items = [] - done = False - while not done: - for item in response.content: - items.append(item['_values']) - if 'partial' in response.properties: - response = self.reply_rx.fetch(10) - else: - done = True - return items - - def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): - query = {'_what' : 'OBJECT', - '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} - correlator = self._sendRequest('_query_request', query) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] != '_query_response': - raise Exception("bad response") - items = [] - done = False - while not done: - for item in response.content: - items.append(item['_values']) - if 'partial' in response.properties: - response = self.reply_rx.fetch(10) - else: - done = True - if len(items) == 1: - return items[0] - return None - - def _getAllBrokerObjects(self, cls): - items = self._doClassQuery(cls.__name__.lower()) - objs = [] - for item in items: - objs.append(cls(self, item)) - return objs - - def _getBrokerObject(self, cls, name): - obj = self._doNameQuery(cls.__name__.lower(), name) - if obj: - return cls(self, obj) - return None - - -op=optparse.OptionParser(usage="Usage: %prog [options] [broker-address]") - -op.add_option("-p", "--promote", action="store_true", - help="Promote a backup broker to become the primary.") -op.add_option("-c", "--client-addresses", action="store", type="string", - help="Set list of addresses used by clients to connect to the HA cluster.") -op.add_option("-b", "--broker-addresses", action="store", type="string", - help="Set list of addresses used by HA brokers to connect to each other.") -op.add_option("-q", "--query", action="store_true", - help="Show the current HA settings on the broker.") - -def get_ha_broker(qmf_broker): - ha_brokers = qmf_broker._doClassQuery("habroker") - if (not ha_brokers): raise Exception("Broker does not have HA enabled.") - return ha_brokers[0] - -def main(argv): - try: - opts, args = op.parse_args(argv) - if len(args) >1: broker = args[1] - else: broker = "localhost:5672" - conn = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) - ha_broker = "org.apache.qpid.ha:habroker:ha-broker" - try: - qmf_broker = QmfBroker(conn) - get_ha_broker(qmf_broker) # Verify that HA is enabled - action=False - if opts.promote: - qmf_broker._method("promote", {}, ha_broker) - action=True - if opts.broker_addresses: - qmf_broker._method('setBrokerAddresses', {'brokerAddresses':opts.broker_addresses}, ha_broker) - action=True - if opts.client_addresses: - qmf_broker._method('setClientAddresses', {'clientAddresses':opts.client_addresses}, ha_broker) - action=True - if opts.query or not action: - hb = get_ha_broker(qmf_broker) - print "status=%s"%hb["status"] - print "broker-addresses=%s"%hb["brokerAddresses"] - print "client-addresses=%s"%hb["clientAddresses"] - return 0 - finally: - conn.close() # Avoid errors shutting down threads. - except Exception, e: - raise # FIXME aconway 2012-01-31: - print e - return 1 - -if __name__ == "__main__": - sys.exit(main(sys.argv)) |
