summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-24 20:05:47 +0000
committerAlan Conway <aconway@apache.org>2012-02-24 20:05:47 +0000
commit67d8640e8d9315a22c1f54fce885ca8c80b09b2c (patch)
tree8e56619a2a259b8370965d6414c096162b190bb0
parent84aa47345dd4fd972284086e6f3a1f06bd1adb6f (diff)
downloadqpid-python-67d8640e8d9315a22c1f54fce885ca8c80b09b2c.tar.gz
QPID-3603: Improved command-based qpid-ha tool and ha config option names.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1293397 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am9
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp31
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp20
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h3
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml26
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py16
-rw-r--r--qpid/cpp/src/tests/test_env.sh.in2
-rwxr-xr-xqpid/tools/src/py/qpid-ha234
-rwxr-xr-xqpid/tools/src/py/qpid-ha-tool183
10 files changed, 305 insertions, 221 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index bdaf40b073..e03c88ec8b 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -102,15 +102,16 @@ $(srcdir)/rubygen.cmake: $(rgen_generator) $(specs)
# Management generator.
mgen_dir=$(top_srcdir)/managementgen
-mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk \
- -c $(srcdir)/managementgen.cmake -q -b -o qmf \
- $(top_srcdir)/../specs/management-schema.xml \
+mgen_xml=$(top_srcdir)/../specs/management-schema.xml \
$(srcdir)/qpid/acl/management-schema.xml \
$(srcdir)/qpid/cluster/management-schema.xml \
$(srcdir)/qpid/ha/management-schema.xml
+mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk \
+ -c $(srcdir)/managementgen.cmake -q -b -o qmf \
+ $(mgen_xml)
$(srcdir)/managementgen.mk $(mgen_broker_cpp) $(dist_qpid_management_HEADERS): mgen.timestamp
-mgen.timestamp: $(mgen_generator)
+mgen.timestamp: $(mgen_generator) $(mgen_xml)
$(mgen_cmd); touch $@
$(mgen_generator):
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 18cd578d90..d92749abeb 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -27,8 +27,9 @@
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/ha/Package.h"
-#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetClientAddresses.h"
-#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokerAddresses.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
#include "qpid/log/Statement.h"
namespace qpid {
@@ -88,18 +89,19 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
}
break;
}
- case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: {
- setClientUrl(
- Url(dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).
- i_clientAddresses), l);
+ case _qmf::HaBroker::METHOD_SETBROKERS: {
+ setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokers&>(args).i_url), l);
break;
}
- case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: {
- setBrokerUrl(
- Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args)
- .i_brokerAddresses), l);
+ case _qmf::HaBroker::METHOD_SETPUBLICBROKERS: {
+ setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicBrokers&>(args).i_url), l);
break;
}
+ case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: {
+ setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l);
+ break;
+ }
+
default:
return Manageable::STATUS_UNKNOWN_METHOD;
}
@@ -115,7 +117,7 @@ void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
Url url = clientUrl.empty() ? brokerUrl : clientUrl;
assert(!url.empty());
- mgmtObject->set_clientAddresses(url.str());
+ mgmtObject->set_publicBrokers(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
QPID_LOG(debug, "HA: Setting client URL to: " << url);
@@ -125,12 +127,17 @@ void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
if (url.empty()) throw Exception("Invalid empty URL for HA broker failover");
QPID_LOG(debug, "HA: Setting broker URL to: " << url);
brokerUrl = url;
- mgmtObject->set_brokerAddresses(brokerUrl.str());
+ mgmtObject->set_brokers(brokerUrl.str());
if (backup.get()) backup->setBrokerUrl(brokerUrl);
// Updating broker URL also updates defaulted client URL:
if (clientUrl.empty()) updateClientUrl(l);
}
+void HaBroker::setExpectedBackups(size_t n, const sys::Mutex::ScopedLock&) {
+ expectedBackups = n;
+ mgmtObject->set_expectedBackups(n);
+}
+
std::vector<Url> HaBroker::getKnownBrokers() const {
return knownBrokers;
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 835a47c749..4f4ee4c944 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -55,6 +55,7 @@ class HaBroker : public management::Manageable
private:
void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
+ void setExpectedBackups(size_t, const sys::Mutex::ScopedLock&);
void updateClientUrl(const sys::Mutex::ScopedLock&);
bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); }
std::vector<Url> getKnownBrokers() const;
@@ -67,6 +68,7 @@ class HaBroker : public management::Manageable
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
Url clientUrl, brokerUrl;
std::vector<Url> knownBrokers;
+ size_t expectedBackups;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index e9b3d4d311..b3080330fb 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -31,12 +31,20 @@ struct Options : public qpid::Options {
Settings& settings;
Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
addOptions()
- ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features")
- ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.")
- ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.")
- ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers")
- ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers")
- ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers")
+ ("ha-cluster", optValue(settings.enabled, "yes|no"),
+ "Join a HA active/passive cluster.")
+ ("ha-brokers", optValue(settings.brokerUrl,"URL"),
+ "URL that backup brokers use to connect and fail over.")
+ ("ha-public-brokers", optValue(settings.clientUrl,"URL"),
+ "URL that clients use to connect and fail over, defaults to ha-brokers.")
+ ("ha-expected-backups", optValue(settings.expectedBackups, "N"),
+ "Number of backups expected to be active in the HA cluster.")
+ ("ha-username", optValue(settings.username, "USER"),
+ "Username for connections between HA brokers")
+ ("ha-password", optValue(settings.password, "PASS"),
+ "Password for connections between HA brokers")
+ ("ha-mechanism", optValue(settings.mechanism, "MECH"),
+ "Authentication mechanism for connections between HA brokers")
;
}
};
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index 049c873b9f..52a64c8330 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -33,10 +33,11 @@ namespace ha {
class Settings
{
public:
- Settings() : enabled(false) {}
+ Settings() : enabled(false), expectedBackups(0) {}
bool enabled;
std::string clientUrl;
std::string brokerUrl;
+ size_t expectedBackups;
std::string username, password, mechanism;
private:
};
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
index fe4a14d111..05ed5f02ce 100644
--- a/qpid/cpp/src/qpid/ha/management-schema.xml
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -22,16 +22,30 @@
<!-- Monitor and control HA status of a broker. -->
<class name="HaBroker">
<property name="name" type="sstr" access="RC" index="y" desc="Primary Key"/>
+
<property name="status" type="sstr" desc="HA status: primary or backup"/>
- <property name="clientAddresses" type="sstr" desc="List of addresses used by clients to connect to the HA cluster."/>
- <property name="brokerAddresses" type="sstr" desc="List of addresses used by HA brokers to connect to each other."/>
+
+ <property name="brokers" type="sstr"
+ desc="Multiple-address URL used by HA brokers to connect to each other."/>
+
+ <property name="publicBrokers" type="sstr"
+ desc="Multiple-address URL used by clients to connect to the HA brokers."/>
+
+ <property name="expectedBackups" type="uint16"
+ desc="Number of HA backup brokers expected."/>>
<method name="promote" desc="Promote a backup broker to primary."/>
- <method name="setClientAddresses" desc="Set HA client addresses">
- <arg name="clientAddresses" type="sstr" dir="I"/>
+
+ <method name="setBrokers" desc="Set URL for HA brokers to connect to each other.">
+ <arg name="url" type="sstr" dir="I"/>
+ </method>
+
+ <method name="setPublicBrokers" desc="Set URL for clients to connect to HA brokers">
+ <arg name="url" type="sstr" dir="I"/>
</method>
- <method name="setBrokerAddresses" desc="Set HA broker addresses">
- <arg name="brokerAddresses" type="sstr" dir="I"/>
+
+ <method name="setExpectedBackups" desc="Set number of backups expected">
+ <arg name="expectedBackups" type="uint16" dir="I"/>
</method>
</class>
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index dfcec26d46..264a636f29 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -34,20 +34,20 @@ class HaBroker(Broker):
args=["--load-module", BrokerTest.ha_lib,
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
- "--ha-enable=yes"]
- if broker_url: args += [ "--ha-broker-url", broker_url ]
+ "--ha-cluster=yes"]
+ if broker_url: args += [ "--ha-brokers", broker_url ]
Broker.__init__(self, test, args, **kwargs)
def promote(self):
- assert os.system("$QPID_HA_TOOL_EXEC --promote %s"%(self.host_port())) == 0
+ assert os.system("$QPID_HA_EXEC promote -b %s"%(self.host_port())) == 0
def set_client_url(self, url):
assert os.system(
- "$QPID_HA_TOOL_EXEC --client-addresses=%s %s"%(url,self.host_port())) == 0
+ "$QPID_HA_EXEC set --public-brokers=%s -b %s"%(url,self.host_port())) == 0
def set_broker_url(self, url):
assert os.system(
- "$QPID_HA_TOOL_EXEC --broker-addresses=%s %s"%(url, self.host_port())) == 0
+ "$QPID_HA_EXEC set --brokers=%s -b %s"%(url, self.host_port())) == 0
def set_broker_urls(brokers):
url = ",".join([b.host_port() for b in brokers])
@@ -475,10 +475,10 @@ class LongTests(BrokerTest):
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
- qpid_ha_tool = os.getenv("QPID_HA_TOOL_EXEC")
- if qpid_ha_tool and os.path.exists(qpid_ha_tool):
+ qpid_ha = os.getenv("QPID_HA_EXEC")
+ if qpid_ha and os.path.exists(qpid_ha):
os.execvp("qpid-python-test",
["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
else:
- print "Skipping ha_tests, qpid_ha_tool not available"
+ print "Skipping ha_tests, qpid_ha not available"
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 667a2f0a1b..5c07bcdc2e 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -44,7 +44,7 @@ export PYTHONPATH=$srcdir:$PYTHON_DIR:$PYTHON_COMMANDS:$QPID_TESTS_PY:$QMF_LIB:$
export QPID_CONFIG_EXEC=$PYTHON_COMMANDS/qpid-config
export QPID_ROUTE_EXEC=$PYTHON_COMMANDS/qpid-route
export QPID_CLUSTER_EXEC=$PYTHON_COMMANDS/qpid-cluster
-export QPID_HA_TOOL_EXEC=$PYTHON_COMMANDS/qpid-ha-tool
+export QPID_HA_EXEC=$PYTHON_COMMANDS/qpid-ha
# Executables
export QPIDD_EXEC=$top_builddir/src/qpidd
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha
new file mode 100755
index 0000000000..d81a7a466c
--- /dev/null
+++ b/qpid/tools/src/py/qpid-ha
@@ -0,0 +1,234 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import qmf.console, optparse, sys
+from qpid.management import managementChannel, managementClient
+from qpid.messaging import Connection
+from qpid.messaging import Message as QpidMessage
+try:
+ from uuid import uuid4
+except ImportError:
+ from qpid.datatypes import uuid4
+
+# Utility for doing fast qmf2 operations on a broker.
+class QmfBroker(object):
+ def __init__(self, conn):
+ self.conn = conn
+ self.sess = self.conn.session()
+ self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \
+ str(uuid4())
+ self.reply_rx = self.sess.receiver(self.reply_to)
+ self.reply_rx.capacity = 10
+ self.tx = self.sess.sender("qmf.default.direct/broker")
+ self.next_correlator = 1
+
+ def close(self):
+ self.conn.close()
+
+ def __repr__(self):
+ return "Qpid Broker: %s" % self.url
+
+ def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"):
+ props = {'method' : 'request',
+ 'qmf.opcode' : '_method_request',
+ 'x-amqp-0-10.app-id' : 'qmf2'}
+ correlator = str(self.next_correlator)
+ self.next_correlator += 1
+
+ content = {'_object_id' : {'_object_name' : addr},
+ '_method_name' : method,
+ '_arguments' : arguments}
+
+ message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
+ self.tx.send(message)
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] == '_exception':
+ raise Exception("Exception from Agent: %r" % response.content['_values'])
+ if response.properties['qmf.opcode'] != '_method_response':
+ raise Exception("bad response: %r" % response.properties)
+ return response.content['_arguments']
+
+ def _sendRequest(self, opcode, content):
+ props = {'method' : 'request',
+ 'qmf.opcode' : opcode,
+ 'x-amqp-0-10.app-id' : 'qmf2'}
+ correlator = str(self.next_correlator)
+ self.next_correlator += 1
+ message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
+ self.tx.send(message)
+ return correlator
+
+ def _doClassQuery(self, class_name):
+ query = {'_what' : 'OBJECT',
+ '_schema_id' : {'_class_name' : class_name}}
+ correlator = self._sendRequest('_query_request', query)
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] != '_query_response':
+ raise Exception("bad response")
+ items = []
+ done = False
+ while not done:
+ for item in response.content:
+ items.append(item['_values'])
+ if 'partial' in response.properties:
+ response = self.reply_rx.fetch(10)
+ else:
+ done = True
+ return items
+
+ def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'):
+ query = {'_what' : 'OBJECT',
+ '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}}
+ correlator = self._sendRequest('_query_request', query)
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] != '_query_response':
+ raise Exception("bad response")
+ items = []
+ done = False
+ while not done:
+ for item in response.content:
+ items.append(item['_values'])
+ if 'partial' in response.properties:
+ response = self.reply_rx.fetch(10)
+ else:
+ done = True
+ if len(items) == 1:
+ return items[0]
+ return None
+
+ def _getAllBrokerObjects(self, cls):
+ items = self._doClassQuery(cls.__name__.lower())
+ objs = []
+ for item in items:
+ objs.append(cls(self, item))
+ return objs
+
+ def _getBrokerObject(self, cls, name):
+ obj = self._doNameQuery(cls.__name__.lower(), name)
+ if obj:
+ return cls(self, obj)
+ return None
+
+ def get_ha_broker(self):
+ ha_brokers = self._doClassQuery("habroker")
+ if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
+ return ha_brokers[0]
+
+HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"
+
+class Command:
+ commands = {}
+
+ def __init__(self, name, help, args=""):
+ Command.commands[name] = self
+ self.name = name
+ usage="%s [options]%s\n\n%s"%(name, args, help)
+ self.help = help
+ self.op=optparse.OptionParser(usage)
+ self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>")
+
+ def execute(self, command):
+ opts, args = self.op.parse_args(command)
+ broker = opts.broker or "localhost:5672"
+ # FIXME aconway 2012-02-23: enforce not doing primary-only operations on a backup & vice versa
+ connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1})
+ try: self.do_execute(QmfBroker(connection), opts, args)
+ finally: connection.close()
+
+ def do_execute(self, qmf_broker, opts, args):
+ raise Exception("Command '%s' is not yet implemented"%self.name)
+
+def print_all_help(name):
+ print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%name
+ for c in Command.commands:
+ help = Command.commands[c].help
+ print " %-12s %s."%(c, help.split(".")[0])
+ print "\nFor help with a command: %s <command> --help\n"%name
+
+
+class PromoteCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "promote","Promote broker from backup to primary")
+ def do_execute(self, qmf_broker, opts, args):
+ qmf_broker._method("promote", {}, HA_BROKER)
+PromoteCmd()
+
+class ReadyCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "ready", "Test if a backup broker is ready.\nReturn 0 if broker is a ready backup, non-0 otherwise.")
+ self.op.add_option(
+ "--wait", type="int", metavar="<seconds>",
+ help="Wait up to <seconds> for broker to be ready. 0 means wait forever.")
+ReadyCmd()
+
+class ReplicateCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "replicate", "Replicate <queue> from <broker> to the current broker.", "<queue> <broker>")
+ReplicateCmd()
+
+class SetCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "set", "Set HA configuration settings")
+ def add(optname, metavar, type, help):
+ self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store")
+ add("--brokers", "<url>", "string", "HA brokers use <url> to connect to each other")
+ add("--public-brokers", "<url>", "string", "Clients use <url> to connect to HA brokers")
+ add("--backups", "<n>", "int", "Expect <n> backups to be running")
+
+ def do_execute(self, qmf_broker, opts, args):
+ if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER)
+ if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER)
+ if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER)
+SetCmd()
+
+class QueryCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "query", "Print HA configuration settings")
+
+ def do_execute(self, qmf_broker, opts, args):
+ hb = qmf_broker.get_ha_broker()
+ for x in [("Status:", "status"),
+ ("Brokers URL:", "brokers"),
+ ("Public URL:", "publicBrokers")]:
+ print "%-16s%s"%(x[0], hb[x[1]])
+
+QueryCmd()
+
+def main(argv):
+ try:
+ command=argv[1:]
+ if command and command[0] == "--help-all":
+ for c in Command.commands.itervalues():
+ c.op.print_help(); print
+ return 1
+ if not command or not command[0] in Command.commands:
+ print_all_help(argv[0]);
+ return 1;
+ Command.commands[command[0]].execute(command)
+ except Exception, e:
+ raise # FIXME aconway 2012-02-23:
+ print e
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv))
diff --git a/qpid/tools/src/py/qpid-ha-tool b/qpid/tools/src/py/qpid-ha-tool
deleted file mode 100755
index 8e8107657c..0000000000
--- a/qpid/tools/src/py/qpid-ha-tool
+++ /dev/null
@@ -1,183 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import qmf.console, optparse, sys
-from qpid.management import managementChannel, managementClient
-from qpid.messaging import Connection
-from qpid.messaging import Message as QpidMessage
-try:
- from uuid import uuid4
-except ImportError:
- from qpid.datatypes import uuid4
-
-# Utility for doing fast qmf2 operations on a broker.
-class QmfBroker(object):
- def __init__(self, conn):
- self.conn = conn
- self.sess = self.conn.session()
- self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \
- str(uuid4())
- self.reply_rx = self.sess.receiver(self.reply_to)
- self.reply_rx.capacity = 10
- self.tx = self.sess.sender("qmf.default.direct/broker")
- self.next_correlator = 1
-
- def close(self):
- self.conn.close()
-
- def __repr__(self):
- return "Qpid Broker: %s" % self.url
-
- def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"):
- props = {'method' : 'request',
- 'qmf.opcode' : '_method_request',
- 'x-amqp-0-10.app-id' : 'qmf2'}
- correlator = str(self.next_correlator)
- self.next_correlator += 1
-
- content = {'_object_id' : {'_object_name' : addr},
- '_method_name' : method,
- '_arguments' : arguments}
-
- message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator,
- properties=props, subject="broker")
- self.tx.send(message)
- response = self.reply_rx.fetch(10)
- if response.properties['qmf.opcode'] == '_exception':
- raise Exception("Exception from Agent: %r" % response.content['_values'])
- if response.properties['qmf.opcode'] != '_method_response':
- raise Exception("bad response: %r" % response.properties)
- return response.content['_arguments']
-
- def _sendRequest(self, opcode, content):
- props = {'method' : 'request',
- 'qmf.opcode' : opcode,
- 'x-amqp-0-10.app-id' : 'qmf2'}
- correlator = str(self.next_correlator)
- self.next_correlator += 1
- message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator,
- properties=props, subject="broker")
- self.tx.send(message)
- return correlator
-
- def _doClassQuery(self, class_name):
- query = {'_what' : 'OBJECT',
- '_schema_id' : {'_class_name' : class_name}}
- correlator = self._sendRequest('_query_request', query)
- response = self.reply_rx.fetch(10)
- if response.properties['qmf.opcode'] != '_query_response':
- raise Exception("bad response")
- items = []
- done = False
- while not done:
- for item in response.content:
- items.append(item['_values'])
- if 'partial' in response.properties:
- response = self.reply_rx.fetch(10)
- else:
- done = True
- return items
-
- def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'):
- query = {'_what' : 'OBJECT',
- '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}}
- correlator = self._sendRequest('_query_request', query)
- response = self.reply_rx.fetch(10)
- if response.properties['qmf.opcode'] != '_query_response':
- raise Exception("bad response")
- items = []
- done = False
- while not done:
- for item in response.content:
- items.append(item['_values'])
- if 'partial' in response.properties:
- response = self.reply_rx.fetch(10)
- else:
- done = True
- if len(items) == 1:
- return items[0]
- return None
-
- def _getAllBrokerObjects(self, cls):
- items = self._doClassQuery(cls.__name__.lower())
- objs = []
- for item in items:
- objs.append(cls(self, item))
- return objs
-
- def _getBrokerObject(self, cls, name):
- obj = self._doNameQuery(cls.__name__.lower(), name)
- if obj:
- return cls(self, obj)
- return None
-
-
-op=optparse.OptionParser(usage="Usage: %prog [options] [broker-address]")
-
-op.add_option("-p", "--promote", action="store_true",
- help="Promote a backup broker to become the primary.")
-op.add_option("-c", "--client-addresses", action="store", type="string",
- help="Set list of addresses used by clients to connect to the HA cluster.")
-op.add_option("-b", "--broker-addresses", action="store", type="string",
- help="Set list of addresses used by HA brokers to connect to each other.")
-op.add_option("-q", "--query", action="store_true",
- help="Show the current HA settings on the broker.")
-
-def get_ha_broker(qmf_broker):
- ha_brokers = qmf_broker._doClassQuery("habroker")
- if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
- return ha_brokers[0]
-
-def main(argv):
- try:
- opts, args = op.parse_args(argv)
- if len(args) >1: broker = args[1]
- else: broker = "localhost:5672"
- conn = Connection.establish(broker, client_properties={"qpid.ha-admin":1})
- ha_broker = "org.apache.qpid.ha:habroker:ha-broker"
- try:
- qmf_broker = QmfBroker(conn)
- get_ha_broker(qmf_broker) # Verify that HA is enabled
- action=False
- if opts.promote:
- qmf_broker._method("promote", {}, ha_broker)
- action=True
- if opts.broker_addresses:
- qmf_broker._method('setBrokerAddresses', {'brokerAddresses':opts.broker_addresses}, ha_broker)
- action=True
- if opts.client_addresses:
- qmf_broker._method('setClientAddresses', {'clientAddresses':opts.client_addresses}, ha_broker)
- action=True
- if opts.query or not action:
- hb = get_ha_broker(qmf_broker)
- print "status=%s"%hb["status"]
- print "broker-addresses=%s"%hb["brokerAddresses"]
- print "client-addresses=%s"%hb["clientAddresses"]
- return 0
- finally:
- conn.close() # Avoid errors shutting down threads.
- except Exception, e:
- raise # FIXME aconway 2012-01-31:
- print e
- return 1
-
-if __name__ == "__main__":
- sys.exit(main(sys.argv))