summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Robie <jonathan@apache.org>2010-12-17 15:29:41 +0000
committerJonathan Robie <jonathan@apache.org>2010-12-17 15:29:41 +0000
commit483bbe20da6960ef43773f5889e04c82e16dd00c (patch)
treebea09b1f55b67253d3c27bb79a7029110dd08cbe
parentb01bb777d319ef8e5920fbe5c5805a27c64e32ef (diff)
downloadqpid-python-483bbe20da6960ef43773f5889e04c82e16dd00c.tar.gz
Made qpid-xxx management scripts callable as python functions.
Examples (from cli_tests.py): def qpid_config_api(self, arg = ""): script = import_script(checkenv("QPID_CONFIG_EXEC")) broker = ["-a", "localhost:"+str(self.broker.port)] return script.main(broker + arg.split()) def qpid_route_api(self, arg = ""): script = import_script(checkenv("QPID_ROUTE_EXEC")) return script.main(arg.split()) Useful primarily for qpid-config, qpid-route, and qpid-cluster. Probably not useful for qpid-stat, qpid-printevents, qpid-queue-stats, which just create screen output. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1050425 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xcpp/src/tests/cli_tests.py142
-rwxr-xr-xtools/src/py/qpid-cluster-store8
-rwxr-xr-xtools/src/py/qpid-config593
-rwxr-xr-xtools/src/py/qpid-printevents53
-rwxr-xr-xtools/src/py/qpid-queue-stats6
-rwxr-xr-xtools/src/py/qpid-route437
-rwxr-xr-xtools/src/py/qpid-stat185
7 files changed, 800 insertions, 624 deletions
diff --git a/cpp/src/tests/cli_tests.py b/cpp/src/tests/cli_tests.py
index 4df9169cc4..b197b48193 100755
--- a/cpp/src/tests/cli_tests.py
+++ b/cpp/src/tests/cli_tests.py
@@ -20,11 +20,29 @@
import sys
import os
+import imp
from qpid.testlib import TestBase010
+# from qpid.brokertest import import_script, checkenv
from qpid.datatypes import Message
from qpid.queue import Empty
from time import sleep
+def import_script(path):
+ """
+ Import executable script at path as a module.
+ Requires some trickery as scripts are not in standard module format
+ """
+ f = open(path)
+ try:
+ name=os.path.split(path)[1].replace("-","_")
+ return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE))
+ finally: f.close()
+
+def checkenv(name):
+ value = os.getenv(name)
+ if not value: raise Exception("Environment variable %s is not set" % name)
+ return value
+
class CliTests(TestBase010):
def remote_host(self):
@@ -36,8 +54,12 @@ class CliTests(TestBase010):
def cli_dir(self):
return self.defines["cli-dir"]
- def makeQueue(self, qname, arguments):
- ret = os.system(self.command(" add queue " + qname + " " + arguments))
+ def makeQueue(self, qname, arguments, api=False):
+ if api:
+ ret = self.qpid_config_api(" add queue " + qname + " " + arguments)
+ else:
+ ret = os.system(self.qpid_config_command(" add queue " + qname + " " + arguments))
+
self.assertEqual(ret, 0)
queues = self.qmf.getObjects(_class="queue")
for queue in queues:
@@ -75,12 +97,68 @@ class CliTests(TestBase010):
assert LVQNB not in queue7.arguments
assert LVQNB in queue8.arguments
+
+ def test_queue_params_api(self):
+ self.startQmf()
+ queue1 = self.makeQueue("test_queue_params1", "--limit-policy none", True)
+ queue2 = self.makeQueue("test_queue_params2", "--limit-policy reject", True)
+ queue3 = self.makeQueue("test_queue_params3", "--limit-policy flow-to-disk", True)
+ queue4 = self.makeQueue("test_queue_params4", "--limit-policy ring", True)
+ queue5 = self.makeQueue("test_queue_params5", "--limit-policy ring-strict", True)
+
+ LIMIT = "qpid.policy_type"
+ assert LIMIT not in queue1.arguments
+ self.assertEqual(queue2.arguments[LIMIT], "reject")
+ self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk")
+ self.assertEqual(queue4.arguments[LIMIT], "ring")
+ self.assertEqual(queue5.arguments[LIMIT], "ring_strict")
+
+ queue6 = self.makeQueue("test_queue_params6", "--order fifo", True)
+ queue7 = self.makeQueue("test_queue_params7", "--order lvq", True)
+ queue8 = self.makeQueue("test_queue_params8", "--order lvq-no-browse", True)
+
+ LVQ = "qpid.last_value_queue"
+ LVQNB = "qpid.last_value_queue_no_browse"
+
+ assert LVQ not in queue6.arguments
+ assert LVQ in queue7.arguments
+ assert LVQ not in queue8.arguments
+
+ assert LVQNB not in queue6.arguments
+ assert LVQNB not in queue7.arguments
+ assert LVQNB in queue8.arguments
+
+
def test_qpid_config(self):
self.startQmf();
qmf = self.qmf
qname = "test_qpid_config"
- ret = os.system(self.command(" add queue " + qname))
+ ret = os.system(self.qpid_config_command(" add queue " + qname))
+ self.assertEqual(ret, 0)
+ queues = qmf.getObjects(_class="queue")
+ found = False
+ for queue in queues:
+ if queue.name == qname:
+ self.assertEqual(queue.durable, False)
+ found = True
+ self.assertEqual(found, True)
+
+ ret = os.system(self.qpid_config_command(" del queue " + qname))
+ self.assertEqual(ret, 0)
+ queues = qmf.getObjects(_class="queue")
+ found = False
+ for queue in queues:
+ if queue.name == qname:
+ found = True
+ self.assertEqual(found, False)
+
+ def test_qpid_config_api(self):
+ self.startQmf();
+ qmf = self.qmf
+ qname = "test_qpid_config_api"
+
+ ret = self.qpid_config_api(" add queue " + qname)
self.assertEqual(ret, 0)
queues = qmf.getObjects(_class="queue")
found = False
@@ -90,7 +168,7 @@ class CliTests(TestBase010):
found = True
self.assertEqual(found, True)
- ret = os.system(self.command(" del queue " + qname))
+ ret = self.qpid_config_api(" del queue " + qname)
self.assertEqual(ret, 0)
queues = qmf.getObjects(_class="queue")
found = False
@@ -111,14 +189,14 @@ class CliTests(TestBase010):
self.assertEqual(found, expected)
def helper_create_exchange(self, xchgname, typ="direct", opts=""):
- foo = self.command(opts + " add exchange " + typ + " " + xchgname)
+ foo = self.qpid_config_command(opts + " add exchange " + typ + " " + xchgname)
# print foo
ret = os.system(foo)
self.assertEqual(ret, 0)
self.helper_find_exchange(xchgname, typ, True)
def helper_destroy_exchange(self, xchgname):
- foo = self.command(" del exchange " + xchgname)
+ foo = self.qpid_config_command(" del exchange " + xchgname)
# print foo
ret = os.system(foo)
self.assertEqual(ret, 0)
@@ -134,14 +212,14 @@ class CliTests(TestBase010):
self.assertEqual(found, expected)
def helper_create_queue(self, qname):
- foo = self.command(" add queue " + qname)
+ foo = self.qpid_config_command(" add queue " + qname)
# print foo
ret = os.system(foo)
self.assertEqual(ret, 0)
self.helper_find_queue(qname, True)
def helper_destroy_queue(self, qname):
- foo = self.command(" del queue " + qname)
+ foo = self.qpid_config_command(" del queue " + qname)
# print foo
ret = os.system(foo)
self.assertEqual(ret, 0)
@@ -162,14 +240,14 @@ class CliTests(TestBase010):
self.helper_create_queue(qname)
# now bind the queue to the xchg
- foo = self.command(" bind " + xchgname + " " + qname +
+ foo = self.qpid_config_command(" bind " + xchgname + " " + qname +
" key all foo=bar baz=quux")
# print foo
ret = os.system(foo)
self.assertEqual(ret, 0)
# he likes it, mikey. Ok, now tear it all down. first the binding
- ret = os.system(self.command(" unbind " + xchgname + " " + qname +
+ ret = os.system(self.qpid_config_command(" unbind " + xchgname + " " + qname +
" key"))
self.assertEqual(ret, 0)
@@ -179,7 +257,7 @@ class CliTests(TestBase010):
# then the exchange
self.helper_destroy_exchange(xchgname)
- # test the bind-queue-xml-filter functionality
+
def test_qpid_config_xml(self):
self.startQmf();
qmf = self.qmf
@@ -193,13 +271,13 @@ class CliTests(TestBase010):
self.helper_create_queue(qname)
# now bind the queue to the xchg
- foo = self.command("-f test.xquery bind " + xchgname + " " + qname)
+ foo = self.qpid_config_command("-f test.xquery bind " + xchgname + " " + qname)
# print foo
ret = os.system(foo)
self.assertEqual(ret, 0)
# he likes it, mikey. Ok, now tear it all down. first the binding
- ret = os.system(self.command(" unbind " + xchgname + " " + qname +
+ ret = os.system(self.qpid_config_command(" unbind " + xchgname + " " + qname +
" key"))
self.assertEqual(ret, 0)
@@ -214,7 +292,7 @@ class CliTests(TestBase010):
qmf = self.qmf
qname = "test_qpid_config"
- ret = os.system(self.command(" add queue --durable " + qname))
+ ret = os.system(self.qpid_config_command(" add queue --durable " + qname))
self.assertEqual(ret, 0)
queues = qmf.getObjects(_class="queue")
found = False
@@ -224,7 +302,7 @@ class CliTests(TestBase010):
found = True
self.assertEqual(found, True)
- ret = os.system(self.command(" del queue " + qname))
+ ret = os.system(self.qpid_config_command(" del queue " + qname))
self.assertEqual(ret, 0)
queues = qmf.getObjects(_class="queue")
found = False
@@ -240,7 +318,7 @@ class CliTests(TestBase010):
qName = "testqalt"
altName = "amq.direct"
- ret = os.system(self.command(" add exchange topic %s --alternate-exchange=%s" % (exName, altName)))
+ ret = os.system(self.qpid_config_command(" add exchange topic %s --alternate-exchange=%s" % (exName, altName)))
self.assertEqual(ret, 0)
exchanges = qmf.getObjects(_class="exchange")
@@ -256,7 +334,7 @@ class CliTests(TestBase010):
self.assertEqual(exchange._altExchange_.name, altName)
self.assertEqual(found, True)
- ret = os.system(self.command(" add queue %s --alternate-exchange=%s" % (qName, altName)))
+ ret = os.system(self.qpid_config_command(" add queue %s --alternate-exchange=%s" % (qName, altName)))
self.assertEqual(ret, 0)
queues = qmf.getObjects(_class="queue")
@@ -285,6 +363,25 @@ class CliTests(TestBase010):
found = True
self.assertEqual(found, True)
+ def test_qpid_route_api(self):
+ self.startQmf();
+ qmf = self.qmf
+
+ ret = self.qpid_route_api("dynamic add "
+ + "guest/guest@localhost:"+str(self.broker.port) + " "
+ + str(self.remote_host())+":"+str(self.remote_port()) + " "
+ +"amq.direct")
+
+ self.assertEqual(ret, 0)
+
+ links = qmf.getObjects(_class="link")
+ found = False
+ for link in links:
+ if link.port == self.remote_port():
+ found = True
+ self.assertEqual(found, True)
+
+
def getProperty(self, msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)
@@ -296,5 +393,14 @@ class CliTests(TestBase010):
return headers[name]
return None
- def command(self, arg = ""):
+ def qpid_config_command(self, arg = ""):
return self.cli_dir() + "/qpid-config -a localhost:%d" % self.broker.port + " " + arg
+
+ def qpid_config_api(self, arg = ""):
+ script = import_script(checkenv("QPID_CONFIG_EXEC"))
+ broker = ["-a", "localhost:"+str(self.broker.port)]
+ return script.main(broker + arg.split())
+
+ def qpid_route_api(self, arg = ""):
+ script = import_script(checkenv("QPID_ROUTE_EXEC"))
+ return script.main(arg.split())
diff --git a/tools/src/py/qpid-cluster-store b/tools/src/py/qpid-cluster-store
index 8cbfa5505b..3541b6679c 100755
--- a/tools/src/py/qpid-cluster-store
+++ b/tools/src/py/qpid-cluster-store
@@ -19,6 +19,7 @@
# under the License.
#
+
from qpid.datatypes import uuid4, UUID, parseUUID
import optparse, os.path, sys, string
@@ -61,8 +62,8 @@ class ClusterStoreStatus:
self.shutdown_id = uuid4()
self.write()
-def main():
- opts, args = op.parse_args()
+def main(argv=None):
+ opts, args = op.parse_args(args=argv)
if len(args) != 1: op.error("incorrect number of arguments")
try: status = ClusterStoreStatus(args[0]+"/cluster/store.status")
except Exception,e: print e; return 1
@@ -70,4 +71,5 @@ def main():
if opts.mark_clean: status.mark_clean(); print status
return 0
-if __name__ == "__main__": sys.exit(main())
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config
index 28581daae9..a2358a0abd 100755
--- a/tools/src/py/qpid-config
+++ b/tools/src/py/qpid-config
@@ -25,25 +25,29 @@ import sys
import locale
from qmf.console import Session
-_recursive = False
-_host = "localhost"
-_connTimeout = 10
-_altern_ex = None
-_passive = False
-_durable = False
-_clusterDurable = False
-_if_empty = True
-_if_unused = True
-_fileCount = 8
-_fileSize = 24
-_maxQueueSize = None
-_maxQueueCount = None
-_limitPolicy = None
-_order = None
-_msgSequence = False
-_ive = False
-_eventGeneration = None
-_file = None
+class Config:
+ def __init__(self):
+ self._recursive = False
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._altern_ex = None
+ self._passive = False
+ self._durable = False
+ self._clusterDurable = False
+ self._if_empty = True
+ self._if_unused = True
+ self._fileCount = 8
+ self._fileSize = 24
+ self._maxQueueSize = None
+ self._maxQueueCount = None
+ self._limitPolicy = None
+ self._order = None
+ self._msgSequence = False
+ self._ive = False
+ self._eventGeneration = None
+ self._file = None
+
+config = Config()
FILECOUNT = "qpid.file_count"
FILESIZE = "qpid.file_size"
@@ -57,6 +61,176 @@ MSG_SEQUENCE = "qpid.msg_sequence"
IVE = "qpid.ive"
QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
+class JHelpFormatter(IndentedHelpFormatter):
+ """Format usage and description without stripping newlines from usage strings
+ """
+
+ def format_usage(self, usage):
+ return usage
+
+
+ def format_description(self, description):
+ if description:
+ return description + "\n"
+ else:
+ return ""
+
+def Usage():
+ print "qpid-config: invalid arguments. Try $ qpid-config --help"
+ exit(-1)
+
+def OptionsAndArguments(argv):
+ """ Set global variables for options, return arguments """
+
+ global config
+
+ usage = """
+ Usage: qpid-config [OPTIONS]
+ qpid-config [OPTIONS] exchanges [filter-string]
+ qpid-config [OPTIONS] queues [filter-string]
+ qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]
+ qpid-config [OPTIONS] del exchange <name>
+ qpid-config [OPTIONS] add queue <name> [AddQueueOptions]
+ qpid-config [OPTIONS] del queue <name> [DelQueueOptions]
+ qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]
+ <for type xml> [-f -|filename]
+ <for type header> [all|any] k1=v1 [, k2=v2...]
+ qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"""
+
+ description = """
+ ADDRESS syntax:
+
+ [username/password@] hostname
+ ip-address [:<port>]
+
+ Examples:
+
+ $ qpid-config add queue q
+ $ qpid-config add exchange direct d localhost:5672
+ $ qpid-config exchanges 10.1.1.7:10000
+ $ qpid-config queues guest/guest@broker-host:10000
+
+ Add Exchange <type> values:
+
+ direct Direct exchange for point-to-point communication
+ fanout Fanout exchange for broadcast communication
+ topic Topic exchange that routes messages using binding keys with wildcards
+ headers Headers exchange that matches header fields against the binding keys
+ xml XML Exchange - allows content filtering using an XQuery
+
+
+ Queue Limit Actions
+
+ none (default) - Use broker's default policy
+ reject - Reject enqueued messages
+ flow-to-disk - Page messages to disk
+ ring - Replace oldest unacquired message with new
+ ring-strict - Replace oldest message, reject if oldest is acquired
+
+ Queue Ordering Policies
+
+ fifo (default) - First in, first out
+ lvq - Last Value Queue ordering, allows queue browsing
+ lvq-no-browse - Last Value Queue ordering, browsing clients may lose data"""
+
+ parser = OptionParser(usage=usage,
+ description=description,
+ formatter=JHelpFormatter())
+
+ group1 = OptionGroup(parser, "General Options")
+ group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)")
+ group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list")
+ group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="ADDRESS", help="Maximum time to wait for broker connection (in seconds)")
+ parser.add_option_group(group1)
+
+ group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
+ group2.add_option("--alternate-exchange", action="store", type="string", metavar="NAME", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
+ group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.")
+ group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
+ parser.add_option_group(group2)
+
+ group3 = OptionGroup(parser, "Options for Adding Queues")
+ group3.add_option("--cluster-durable", action="store_true", help="The new queue becomes durable if there is only one functioning cluster node")
+ group3.add_option("--file-count", action="store", type="int", default=8, metavar="N", help="Number of files in queue's persistence journal")
+ group3.add_option("--file-size", action="store", type="int", default=24, metavar="N", help="File size in pages (64Kib/page)")
+ group3.add_option("--max-queue-size", action="store", type="int", metavar="N", help="Number of files in queue's persistence journal")
+ group3.add_option("--max-queue-count", action="store", type="int", metavar="N", help="Number of files in queue's persistence journal")
+ group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="CHOICE", help="Action to take when queue limit is reached")
+ group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="CHOICE", help="Queue ordering policy")
+ group3.add_option("--generate-queue-events", action="store", type="int", metavar="N", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.")
+ # no option for declaring an exclusive queue - which can only be used by the session that creates it.
+ parser.add_option_group(group3)
+
+ group4 = OptionGroup(parser, "Options for Adding Exchanges")
+ group4.add_option("--sequence", action="store_true", help="Exchange will insert a 'qpid.msg_sequence' field in the message header")
+ group4.add_option("--ive", action="store_true", help="Exchange will behave as an 'initial-value-exchange', keeping a reference to the last message forwarded and enqueuing that message to newly bound queues.")
+ parser.add_option_group(group4)
+
+ group5 = OptionGroup(parser, "Options for Deleting Queues")
+ group5.add_option("--force", action="store_true", help="Force delete of queue even if it's currently used or it's not empty")
+ group5.add_option("--force-if-not-empty", action="store_true", help="Force delete of queue even if it's not empty")
+ group5.add_option("--force-if-not-used", action="store_true", help="Force delete of queue even if it's currently used")
+ parser.add_option_group(group5)
+
+ group6 = OptionGroup(parser, "Options for Declaring Bindings")
+ group6.add_option("-f", "--file", action="store", type="string", metavar="FILE.xq", help="For XML Exchange bindings - specifies the name of a file containing an XQuery.")
+ parser.add_option_group(group6)
+
+ opts, encArgs = parser.parse_args(args=argv)
+
+ try:
+ encoding = locale.getpreferredencoding()
+ args = [a.decode(encoding) for a in encArgs]
+ except:
+ args = encArgs
+
+ if opts.bindings:
+ config._recursive = True
+ if opts.broker_addr:
+ config._host = opts.broker_addr
+ if opts.timeout:
+ config._connTimeout = opts.timeout
+ if config._connTimeout == 0:
+ config._connTimeout = None
+ if opts.alternate_exchange:
+ config._altern_ex = opts.alternate_exchange
+ if opts.passive:
+ config._passive = True
+ if opts.durable:
+ config._durable = True
+ if opts.cluster_durable:
+ config._clusterDurable = True
+ if opts.file:
+ config._file = opts.file
+ if opts.file_count:
+ config._fileCount = opts.file_count
+ if opts.file_size:
+ config._fileSize = opts.file_size
+ if opts.max_queue_size:
+ config._maxQueueSize = opts.max_queue_size
+ if opts.max_queue_count:
+ config._maxQueueCount = opts.max_queue_count
+ if opts.limit_policy:
+ config._limitPolicy = opts.limit_policy
+ if opts.order:
+ config._order = opts.order
+ if opts.sequence:
+ config._msgSequence = True
+ if opts.ive:
+ config._ive = True
+ if opts.generate_queue_events:
+ config._eventGeneration = opts.generate_queue_events
+ if opts.force:
+ config._if_empty = False
+ config._if_unused = False
+ if opts.force_if_not_empty:
+ config._if_empty = False
+ if opts.force_if_not_used:
+ config._if_unused = False
+
+ return args
+
+
#
# helpers for the arg parsing in bind(). return multiple values; "ok"
# followed by the resultant args
@@ -67,13 +241,13 @@ QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
# passed to the xml binding.
#
def snarf_xquery_args():
- if not _file:
+ if not config._file:
print "Invalid args to bind xml: need an input file or stdin"
return [False]
- if _file == "-":
+ if config._file == "-":
res = sys.stdin.read()
else:
- f = open(_file) # let this signal if it can't find it
+ f = open(config._file) # let this signal if it can't find it
res = f.read()
f.close()
return [True, res]
@@ -106,7 +280,7 @@ class BrokerManager:
def SetBroker (self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
+ self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == '0':
@@ -131,12 +305,12 @@ class BrokerManager:
print
print " Total Queues: %d" % len (queues)
- _durable = 0
+ durable = 0
for queue in queues:
if queue.durable:
- _durable = _durable + 1
- print " durable: %d" % _durable
- print " non-durable: %d" % (len (queues) - _durable)
+ durable = durable + 1
+ print " durable: %d" % durable
+ print " non-durable: %d" % (len (queues) - durable)
def ExchangeList (self, filter):
exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
@@ -232,77 +406,77 @@ class BrokerManager:
def AddExchange (self, args):
if len (args) < 2:
- Usage(parser)
+ Usage()
etype = args[0]
ename = args[1]
declArgs = {}
- if _msgSequence:
+ if config._msgSequence:
declArgs[MSG_SEQUENCE] = 1
- if _ive:
+ if config._ive:
declArgs[IVE] = 1
- if _altern_ex != None:
- self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs)
+ if config._altern_ex != None:
+ self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
else:
- self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=_passive, durable=_durable, arguments=declArgs)
+ self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=config._passive, durable=config._durable, arguments=declArgs)
def DelExchange (self, args):
if len (args) < 1:
- Usage(parser)
+ Usage()
ename = args[0]
self.broker.getAmqpSession().exchange_delete (exchange=ename)
def AddQueue (self, args):
if len (args) < 1:
- Usage(parser)
+ Usage()
qname = args[0]
declArgs = {}
- if _durable:
- declArgs[FILECOUNT] = _fileCount
- declArgs[FILESIZE] = _fileSize
-
- if _maxQueueSize:
- declArgs[MAX_QUEUE_SIZE] = _maxQueueSize
- if _maxQueueCount:
- declArgs[MAX_QUEUE_COUNT] = _maxQueueCount
- if _limitPolicy:
- if _limitPolicy == "none":
+ if config._durable:
+ declArgs[FILECOUNT] = config._fileCount
+ declArgs[FILESIZE] = config._fileSize
+
+ if config._maxQueueSize:
+ declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize
+ if config._maxQueueCount:
+ declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount
+ if config._limitPolicy:
+ if config._limitPolicy == "none":
pass
- elif _limitPolicy == "reject":
+ elif config._limitPolicy == "reject":
declArgs[POLICY_TYPE] = "reject"
- elif _limitPolicy == "flow-to-disk":
+ elif config._limitPolicy == "flow-to-disk":
declArgs[POLICY_TYPE] = "flow_to_disk"
- elif _limitPolicy == "ring":
+ elif config._limitPolicy == "ring":
declArgs[POLICY_TYPE] = "ring"
- elif _limitPolicy == "ring-strict":
+ elif config._limitPolicy == "ring-strict":
declArgs[POLICY_TYPE] = "ring_strict"
- if _clusterDurable:
+ if config._clusterDurable:
declArgs[CLUSTER_DURABLE] = 1
- if _order:
- if _order == "fifo":
+ if config._order:
+ if config._order == "fifo":
pass
- elif _order == "lvq":
+ elif config._order == "lvq":
declArgs[LVQ] = 1
- elif _order == "lvq-no-browse":
+ elif config._order == "lvq-no-browse":
declArgs[LVQNB] = 1
- if _eventGeneration:
- declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration
+ if config._eventGeneration:
+ declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration
- if _altern_ex != None:
- self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs)
+ if config._altern_ex != None:
+ self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
else:
- self.broker.getAmqpSession().queue_declare (queue=qname, passive=_passive, durable=_durable, arguments=declArgs)
+ self.broker.getAmqpSession().queue_declare (queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs)
def DelQueue (self, args):
if len (args) < 1:
- Usage(parser)
+ Usage()
qname = args[0]
- self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=_if_empty, if_unused=_if_unused)
+ self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=config._if_empty, if_unused=config._if_unused)
def Bind (self, args):
if len (args) < 2:
- Usage(parser)
+ Usage()
ename = args[0]
qname = args[1]
key = ""
@@ -340,7 +514,7 @@ class BrokerManager:
def Unbind (self, args):
if len (args) < 2:
- Usage(parser)
+ Usage()
ename = args[0]
qname = args[1]
key = ""
@@ -366,236 +540,79 @@ def YN (bool):
return 'Y'
return 'N'
-class JHelpFormatter(IndentedHelpFormatter):
- """Format usage and description without stripping newlines from usage strings
- """
-
- def format_usage(self, usage):
- return usage
+def main(argv=None):
+ args = OptionsAndArguments(argv)
+ bm = BrokerManager ()
- def format_description(self, description):
- if description:
- return description + "\n"
+ try:
+ bm.SetBroker(config._host)
+ if len(args) == 0:
+ bm.Overview ()
else:
- return ""
-
-def Usage(parser):
- parser.print_usage()
- exit(-1)
-
-##
-## Main Program
-##
-
-usage = """
-Usage: qpid-config [OPTIONS]
- qpid-config [OPTIONS] exchanges [filter-string]
- qpid-config [OPTIONS] queues [filter-string]
- qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]
- qpid-config [OPTIONS] del exchange <name>
- qpid-config [OPTIONS] add queue <name> [AddQueueOptions]
- qpid-config [OPTIONS] del queue <name> [DelQueueOptions]
- qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]
- <for type xml> [-f -|filename]
- <for type header> [all|any] k1=v1 [, k2=v2...]
- qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"""
-
-description = """
-ADDRESS syntax:
-
- [username/password@] hostname
- ip-address [:<port>]
-
-Examples:
-
-$ qpid-config add queue q
-$ qpid-config add exchange direct d localhost:5672
-$ qpid-config exchanges 10.1.1.7:10000
-$ qpid-config queues guest/guest@broker-host:10000
-
-Add Exchange <type> values:
-
- direct Direct exchange for point-to-point communication
- fanout Fanout exchange for broadcast communication
- topic Topic exchange that routes messages using binding keys with wildcards
- headers Headers exchange that matches header fields against the binding keys
- xml XML Exchange - allows content filtering using an XQuery
-
-
-Queue Limit Actions
-
- none (default) - Use broker's default policy
- reject - Reject enqueued messages
- flow-to-disk - Page messages to disk
- ring - Replace oldest unacquired message with new
- ring-strict - Replace oldest message, reject if oldest is acquired
-
-Queue Ordering Policies
-
- fifo (default) - First in, first out
- lvq - Last Value Queue ordering, allows queue browsing
- lvq-no-browse - Last Value Queue ordering, browsing clients may lose data"""
-
-parser = OptionParser(usage=usage,
- description=description,
- formatter=JHelpFormatter())
-
-group1 = OptionGroup(parser, "General Options")
-group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)")
-group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list")
-group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="ADDRESS", help="Maximum time to wait for broker connection (in seconds)")
-parser.add_option_group(group1)
-
-group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
-group2.add_option("--alternate-exchange", action="store", type="string", metavar="NAME", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
-group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.")
-group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
-parser.add_option_group(group2)
-
-group3 = OptionGroup(parser, "Options for Adding Queues")
-group3.add_option("--cluster-durable", action="store_true", help="The new queue becomes durable if there is only one functioning cluster node")
-group3.add_option("--file-count", action="store", type="int", default=8, metavar="N", help="Number of files in queue's persistence journal")
-group3.add_option("--file-size", action="store", type="int", default=24, metavar="N", help="File size in pages (64Kib/page)")
-group3.add_option("--max-queue-size", action="store", type="int", metavar="N", help="Number of files in queue's persistence journal")
-group3.add_option("--max-queue-count", action="store", type="int", metavar="N", help="Number of files in queue's persistence journal")
-group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="CHOICE", help="Action to take when queue limit is reached")
-group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="CHOICE", help="Queue ordering policy")
-group3.add_option("--generate-queue-events", action="store", type="int", metavar="N", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.")
-# no option for declaring an exclusive queue - which can only be used by the session that creates it.
-parser.add_option_group(group3)
-
-group4 = OptionGroup(parser, "Options for Adding Exchanges")
-group4.add_option("--sequence", action="store_true", help="Exchange will insert a 'qpid.msg_sequence' field in the message header")
-group4.add_option("--ive", action="store_true", help="Exchange will behave as an 'initial-value-exchange', keeping a reference to the last message forwarded and enqueuing that message to newly bound queues.")
-parser.add_option_group(group4)
-
-group5 = OptionGroup(parser, "Options for Deleting Queues")
-group5.add_option("--force", action="store_true", help="Force delete of queue even if it's currently used or it's not empty")
-group5.add_option("--force-if-not-empty", action="store_true", help="Force delete of queue even if it's not empty")
-group5.add_option("--force-if-not-used", action="store_true", help="Force delete of queue even if it's currently used")
-parser.add_option_group(group5)
-
-group6 = OptionGroup(parser, "Options for Declaring Bindings")
-group6.add_option("-f", "--file", action="store", type="string", metavar="FILE.xq", help="For XML Exchange bindings - specifies the name of a file containing an XQuery.")
-parser.add_option_group(group6)
-
-opts, encArgs = parser.parse_args()
-
-try:
- encoding = locale.getpreferredencoding()
- args = [a.decode(encoding) for a in encArgs]
-except:
- args = encArgs
-
-if opts.bindings:
- _recursive = True
-if opts.broker_addr:
- _host = opts.broker_addr
-if opts.timeout:
- _connTimeout = opts.timeout
- if _connTimeout == 0:
- _connTimeout = None
-if opts.alternate_exchange:
- _altern_ex = opts.alternate_exchange
-if opts.passive:
- _passive = True
-if opts.durable:
- _durable = True
-if opts.cluster_durable:
- _clusterDurable = True
-if opts.file:
- _file = opts.file
-if opts.file_count:
- _fileCount = opts.file_count
-if opts.file_size:
- _fileSize = opts.file_size
-if opts.max_queue_size:
- _maxQueueSize = opts.max_queue_size
-if opts.max_queue_count:
- _maxQueueCount = opts.max_queue_count
-if opts.limit_policy:
- _limitPolicy = opts.limit_policy
-if opts.order:
- _order = opts.order
-if opts.sequence:
- _msgSequence = True
-if opts.ive:
- _ive = True
-if opts.generate_queue_events:
- _eventGeneration = opts.generate_queue_events
-if opts.force:
- _if_empty = False
- _if_unused = False
-if opts.force_if_not_empty:
- _if_empty = False
-if opts.force_if_not_used:
- _if_unused = False
-
-bm = BrokerManager ()
-
-try:
- bm.SetBroker(_host)
- if len(args) == 0:
- bm.Overview ()
- else:
- cmd = args[0]
- modifier = ""
- if len(args) > 1:
- modifier = args[1]
- if cmd == "exchanges":
- if _recursive:
- bm.ExchangeListRecurse (modifier)
- else:
- bm.ExchangeList (modifier)
- elif cmd == "queues":
- if _recursive:
- bm.QueueListRecurse (modifier)
- else:
- bm.QueueList (modifier)
- elif cmd == "add":
- if modifier == "exchange":
- bm.AddExchange (args[2:])
- elif modifier == "queue":
- bm.AddQueue (args[2:])
+ cmd = args[0]
+ modifier = ""
+ if len(args) > 1:
+ modifier = args[1]
+ if cmd == "exchanges":
+ if config._recursive:
+ bm.ExchangeListRecurse (modifier)
+ else:
+ bm.ExchangeList (modifier)
+ elif cmd == "queues":
+ if config._recursive:
+ bm.QueueListRecurse (modifier)
+ else:
+ bm.QueueList (modifier)
+ elif cmd == "add":
+ if modifier == "exchange":
+ bm.AddExchange (args[2:])
+ elif modifier == "queue":
+ bm.AddQueue (args[2:])
+ else:
+ Usage()
+ elif cmd == "del":
+ if modifier == "exchange":
+ bm.DelExchange (args[2:])
+ elif modifier == "queue":
+ bm.DelQueue (args[2:])
+ else:
+ Usage()
+ elif cmd == "bind":
+ bm.Bind (args[1:])
+ elif cmd == "unbind":
+ bm.Unbind (args[1:])
else:
- Usage(parser)
- elif cmd == "del":
- if modifier == "exchange":
- bm.DelExchange (args[2:])
- elif modifier == "queue":
- bm.DelQueue (args[2:])
- else:
- Usage(parser)
- elif cmd == "bind":
- bm.Bind (args[1:])
- elif cmd == "unbind":
- bm.Unbind (args[1:])
- else:
- Usage(parser)
-except KeyboardInterrupt:
- print
-except IOError, e:
- print e
- bm.Disconnect()
- sys.exit(1)
-except SystemExit, e:
- bm.Disconnect()
- sys.exit(1)
-except Exception,e:
- if e.__class__.__name__ != "Timeout":
- # ignore Timeout exception, handle in the loop below
- print "Failed: %s: %s" % (e.__class__.__name__, e)
+ Usage()
+ except KeyboardInterrupt:
+ print
+ except IOError, e:
+ print e
bm.Disconnect()
- sys.exit(1)
-
-while True:
- # some commands take longer than the default amqp timeout to complete,
- # so attempt to disconnect until successful, ignoring Timeouts
- try:
+ return 1
+ except SystemExit, e:
bm.Disconnect()
- break
- except Exception, e:
+ return 1
+ except Exception,e:
if e.__class__.__name__ != "Timeout":
+ # ignore Timeout exception, handle in the loop below
print "Failed: %s: %s" % (e.__class__.__name__, e)
- sys.exit(1)
+ bm.Disconnect()
+ return 1
+
+ while True:
+ # some commands take longer than the default amqp timeout to complete,
+ # so attempt to disconnect until successful, ignoring Timeouts
+ try:
+ bm.Disconnect()
+ break
+ except Exception, e:
+ if e.__class__.__name__ != "Timeout":
+ print "Failed: %s: %s" % (e.__class__.__name__, e)
+ return 1
+
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
+
diff --git a/tools/src/py/qpid-printevents b/tools/src/py/qpid-printevents
index e14d85e12a..5da74ca9ef 100755
--- a/tools/src/py/qpid-printevents
+++ b/tools/src/py/qpid-printevents
@@ -20,7 +20,8 @@
#
import os
-import optparse
+import optparse
+from optparse import IndentedHelpFormatter
import sys
import socket
from time import time, strftime, gmtime, sleep
@@ -39,20 +40,43 @@ class EventConsole(Console):
print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl()
sys.stdout.flush()
-##
-## Main Program
-##
-def main():
- _usage = "%prog [options] [broker-addr]..."
- _description = \
-"""Collect and print events from one or more Qpid message brokers. If no broker-addr is
-supplied, %prog will connect to 'localhost:5672'.
-broker-addr is of the form: [username/password@] hostname | ip-address [:<port>]
-ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost
+class JHelpFormatter(IndentedHelpFormatter):
+ """Format usage and description without stripping newlines from usage strings
+ """
+
+ def format_usage(self, usage):
+ return usage
+
+ def format_description(self, description):
+ if description:
+ return description + "\n"
+ else:
+ return ""
+
+_usage = "%prog [options] [broker-addr]..."
+
+_description = \
+"""
+Collect and print events from one or more Qpid message brokers.
+
+If no broker-addr is supplied, %prog connects to 'localhost:5672'.
+
+[broker-addr] syntax:
+
+ [username/password@] hostname
+ ip-address [:<port>]
+
+Examples:
+
+$ %prog localhost:5672
+$ %prog 10.1.1.7:10000
+$ %prog guest/guest@broker-host:10000
"""
- p = optparse.OptionParser(usage=_usage, description=_description)
- options, arguments = p.parse_args()
+def main(argv=None):
+ p = optparse.OptionParser(usage=_usage, description=_description, formatter=JHelpFormatter())
+
+ options, arguments = p.parse_args(args=argv)
if len(arguments) == 0:
arguments.append("localhost")
@@ -74,5 +98,4 @@ ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost
session.delBroker(b)
if __name__ == '__main__':
- main()
-
+ sys.exit(main())
diff --git a/tools/src/py/qpid-queue-stats b/tools/src/py/qpid-queue-stats
index 3b8a0dcb19..6c737a080e 100755
--- a/tools/src/py/qpid-queue-stats
+++ b/tools/src/py/qpid-queue-stats
@@ -124,12 +124,12 @@ class BrokerManager(Console):
##
## Main Program
##
-def main():
+def main(argv=None):
p = optparse.OptionParser()
p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost')
p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show')
- options, arguments = p.parse_args()
+ options, arguments = p.parse_args(args=argv)
host = options.broker_address
filter = []
@@ -142,5 +142,5 @@ def main():
bm.Display()
if __name__ == '__main__':
- main()
+ sys.exit(main())
diff --git a/tools/src/py/qpid-route b/tools/src/py/qpid-route
index db0fca679e..3674ed7913 100755
--- a/tools/src/py/qpid-route
+++ b/tools/src/py/qpid-route
@@ -26,14 +26,113 @@ import os
import locale
from qmf.console import Session, BrokerURL
-_verbose = False
-_quiet = False
-_durable = False
-_dellink = False
-_srclocal = False
-_transport = "tcp"
-_ack = 0
-_connTimeout = 10
+class Config:
+ def __init__(self):
+ self._verbose = False
+ self._quiet = False
+ self._durable = False
+ self._dellink = False
+ self._srclocal = False
+ self._transport = "tcp"
+ self._ack = 0
+ self._connTimeout = 10
+
+config = Config()
+
+class JHelpFormatter(IndentedHelpFormatter):
+ """Format usage and description without stripping newlines from usage strings
+ """
+
+ def format_usage(self, usage):
+ return usage
+
+
+ def format_description(self, description):
+ if description:
+ return description + "\n"
+ else:
+ return ""
+
+def usage(parser):
+ parser.print_help()
+ exit(-1)
+
+usage = """
+Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]
+ qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>
+
+ qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism]
+ qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>
+ qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>
+ qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>
+ qpid-route [OPTIONS] route list [<dest-broker>]
+ qpid-route [OPTIONS] route flush [<dest-broker>]
+ qpid-route [OPTIONS] route map [<broker>]
+
+ qpid-route [OPTIONS] link add <dest-broker> <src-broker>
+ qpid-route [OPTIONS] link del <dest-broker> <src-broker>
+ qpid-route [OPTIONS] link list [<dest-broker>]"""
+
+description = """
+ADDRESS syntax:
+
+ [username/password@] hostname
+ ip-address [:<port>]"""
+
+
+def OptionsAndArguments(argv):
+
+ parser = OptionParser(usage=usage,
+ description=description,
+ formatter=JHelpFormatter())
+
+ parser.add_option("--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)")
+ parser.add_option("-v", "--verbose", action="store_true", help="Verbose output")
+ parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings")
+ parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable")
+
+ parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link")
+ parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)")
+
+ parser.add_option("--ack", action="store", type="int", metavar="N", help="Acknowledge transfers over the bridge in batches of N")
+ parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
+
+ opts, encArgs = parser.parse_args(args=argv)
+
+ try:
+ encoding = locale.getpreferredencoding()
+ args = [a.decode(encoding) for a in encArgs]
+ except:
+ args = encArgs
+
+ if opts.timeout:
+ config._connTimeout = opts.timeout
+ if config._connTimeout == 0:
+ config._connTimeout = None
+
+ if opts.verbose:
+ config._verbose = True
+
+ if opts.quiet:
+ config._quiet = True
+
+ if opts.durable:
+ config._durable = True
+
+ if opts.del_empty_link:
+ config._dellink = True
+
+ if opts.src_local:
+ config._srclocal = true
+
+ if opts.transport:
+ config._transport = opts.transport
+
+ if opts.ack:
+ config._ack = opts.ack
+
+ return args
+
class RouteManager:
def __init__(self, localBroker):
@@ -41,7 +140,7 @@ class RouteManager:
self.local = BrokerURL(localBroker)
self.remote = None
self.qmf = Session()
- self.broker = self.qmf.addBroker(localBroker, _connTimeout)
+ self.broker = self.qmf.addBroker(localBroker, config._connTimeout)
self.broker._waitForStable()
self.agent = self.broker.getBrokerAgent()
@@ -73,10 +172,10 @@ class RouteManager:
broker = brokers[0]
link = self.getLink()
if link == None:
- res = broker.connect(self.remote.host, self.remote.port, _durable,
+ res = broker.connect(self.remote.host, self.remote.port, config._durable,
mech, self.remote.authName or "", self.remote.authPass or "",
- _transport)
- if _verbose:
+ config._transport)
+ if config._verbose:
print "Connect method returned:", res.status, res.text
def delLink(self, remoteBroker):
@@ -88,7 +187,7 @@ class RouteManager:
raise Exception("Link not found")
res = link.close()
- if _verbose:
+ if config._verbose:
print "Close method returned:", res.status, res.text
def listLinks(self):
@@ -119,7 +218,7 @@ class RouteManager:
if url.name() not in self.brokerList:
print " %s..." % url.name(),
try:
- b = self.qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout)
+ b = self.qmf.addBroker("%s:%d" % (link.host, link.port), config._connTimeout)
self.brokerList[url.name()] = b
added = True
print "Ok"
@@ -194,7 +293,7 @@ class RouteManager:
self.qmf.delBroker(b[1])
def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, mech="PLAIN", dynamic=False):
- if dynamic and _srclocal:
+ if dynamic and config._srclocal:
raise Exception("--src-local is not permitted on dynamic routes")
self.addLink(remoteBroker, mech)
@@ -206,16 +305,16 @@ class RouteManager:
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
- if not _quiet:
+ if not config._quiet:
raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
sys.exit(0)
- if _verbose:
+ if config._verbose:
print "Creating inter-broker binding..."
- res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack)
+ res = link.bridge(config._durable, exchange, exchange, routingKey, tag, excludes, False, config._srclocal, dynamic, config._ack)
if res.status != 0:
raise Exception(res.text)
- if _verbose:
+ if config._verbose:
print "Bridge method returned:", res.status, res.text
def addQueueRoute(self, remoteBroker, exchange, queue):
@@ -228,23 +327,23 @@ class RouteManager:
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
- if not _quiet:
+ if not config._quiet:
raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
sys.exit(0)
- if _verbose:
+ if config._verbose:
print "Creating inter-broker binding..."
- res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack)
+ res = link.bridge(config._durable, queue, exchange, "", "", "", True, config._srclocal, False, config._ack)
if res.status != 0:
raise Exception(res.text)
- if _verbose:
+ if config._verbose:
print "Bridge method returned:", res.status, res.text
def delQueueRoute(self, remoteBroker, exchange, queue):
self.remote = BrokerURL(remoteBroker)
link = self.getLink()
if link == None:
- if not _quiet:
+ if not config._quiet:
raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
sys.exit(0)
@@ -252,29 +351,29 @@ class RouteManager:
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
- if _verbose:
+ if config._verbose:
print "Closing bridge..."
res = bridge.close()
if res.status != 0:
raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
- if len(bridges) == 1 and _dellink:
+ if len(bridges) == 1 and config._dellink:
link = self.getLink()
if link == None:
sys.exit(0)
- if _verbose:
+ if config._verbose:
print "Last bridge on link, closing link..."
res = link.close()
if res.status != 0:
raise Exception("Error closing link: %d - %s" % (res.status, res.text))
sys.exit(0)
- if not _quiet:
+ if not config._quiet:
raise Exception("Route not found")
def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
self.remote = BrokerURL(remoteBroker)
link = self.getLink()
if link == None:
- if not _quiet:
+ if not config._quiet:
raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
sys.exit(0)
@@ -282,22 +381,22 @@ class RouteManager:
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
and bridge.dynamic == dynamic:
- if _verbose:
+ if config._verbose:
print "Closing bridge..."
res = bridge.close()
if res.status != 0:
raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
- if len(bridges) == 1 and _dellink:
+ if len(bridges) == 1 and config._dellink:
link = self.getLink()
if link == None:
sys.exit(0)
- if _verbose:
+ if config._verbose:
print "Last bridge on link, closing link..."
res = link.close()
if res.status != 0:
raise Exception("Error closing link: %d - %s" % (res.status, res.text))
return
- if not _quiet:
+ if not config._quiet:
raise Exception("Route not found")
def listRoutes(self):
@@ -322,7 +421,7 @@ class RouteManager:
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
- if _verbose:
+ if config._verbose:
myLink = None
for link in links:
if bridge.linkRef == link.getObjectId():
@@ -333,18 +432,18 @@ class RouteManager:
res = bridge.close()
if res.status != 0:
print "Error: %d - %s" % (res.status, res.text)
- elif _verbose:
+ elif config._verbose:
print "Ok"
- if _dellink:
+ if config._dellink:
links = self.qmf.getObjects(_class="link")
for link in links:
- if _verbose:
+ if config._verbose:
print "Deleting Link: %s:%d... " % (link.host, link.port),
res = link.close()
if res.status != 0:
print "Error: %d - %s" % (res.status, res.text)
- elif _verbose:
+ elif config._verbose:
print "Ok"
class RoutePair:
@@ -374,190 +473,104 @@ def YN(val):
return 'Y'
return 'N'
-##
-## Main Program
-##
-
-
-class JHelpFormatter(IndentedHelpFormatter):
- """Format usage and description without stripping newlines from usage strings
- """
-
- def format_usage(self, usage):
- return usage
-
-
- def format_description(self, description):
- if description:
- return description + "\n"
- else:
- return ""
-
-def usage(parser):
- parser.print_help()
- exit(-1)
-
-usage = """
-Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]
- qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>
-
- qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism]
- qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>
- qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>
- qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>
- qpid-route [OPTIONS] route list [<dest-broker>]
- qpid-route [OPTIONS] route flush [<dest-broker>]
- qpid-route [OPTIONS] route map [<broker>]
-
- qpid-route [OPTIONS] link add <dest-broker> <src-broker>
- qpid-route [OPTIONS] link del <dest-broker> <src-broker>
- qpid-route [OPTIONS] link list [<dest-broker>]"""
-
-description = """
-ADDRESS syntax:
-
- [username/password@] hostname
- ip-address [:<port>]"""
-
-parser = OptionParser(usage=usage,
- description=description,
- formatter=JHelpFormatter())
-
-parser.add_option("--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)")
-parser.add_option("-v", "--verbose", action="store_true", help="Verbose output")
-parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings")
-parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable")
-
-parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link")
-parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)")
-
-parser.add_option("--ack", action="store", type="int", metavar="N", help="Acknowledge transfers over the bridge in batches of N")
-parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
-
-opts, encArgs = parser.parse_args()
-
-try:
- encoding = locale.getpreferredencoding()
- args = [a.decode(encoding) for a in encArgs]
-except:
- args = encArgs
-
-if opts.timeout:
- _connTimeout = opts.timeout
- if _connTimeout == 0:
- _connTimeout = None
-
-if opts.verbose:
- _verbose = True
-
-if opts.quiet:
- _quiet = True
-
-if opts.durable:
- _durable = True
-
-if opts.del_empty_link:
- _dellink = True
-if opts.src_local:
- _srclocal = true
+def main(argv=None):
-if opts.transport:
- _transport = opts.transport
+ args = OptionsAndArguments(argv)
+ nargs = len(args)
+ if nargs < 2:
+ usage(parser)
-if opts.ack:
- _ack = opts.ack
-
-nargs = len(args)
-if nargs < 2:
- usage(parser)
-
-if nargs == 2:
- localBroker = socket.gethostname()
-else:
- if _srclocal:
- localBroker = args[3]
- remoteBroker = args[2]
+ if nargs == 2:
+ localBroker = socket.gethostname()
else:
- localBroker = args[2]
- if nargs > 3:
- remoteBroker = args[3]
-
-group = args[0]
-cmd = args[1]
-
-rm = None
-try:
- rm = RouteManager(localBroker)
- if group == "link":
- if cmd == "add":
- if nargs != 4:
- usage(parser)
- rm.addLink(remoteBroker)
- elif cmd == "del":
- if nargs != 4:
- usage(parser)
- rm.delLink(remoteBroker)
- elif cmd == "list":
- rm.listLinks()
-
- elif group == "dynamic":
- if cmd == "add":
- if nargs < 5 or nargs > 7:
- usage(parser)
-
- tag = ""
- excludes = ""
- mech = "PLAIN"
- if nargs > 5: tag = args[5]
- if nargs > 6: excludes = args[6]
- rm.addRoute(remoteBroker, args[4], "", tag, excludes, mech, dynamic=True)
- elif cmd == "del":
- if nargs != 5:
- usage(parser)
+ if config._srclocal:
+ localBroker = args[3]
+ remoteBroker = args[2]
+ else:
+ localBroker = args[2]
+ if nargs > 3:
+ remoteBroker = args[3]
+
+ group = args[0]
+ cmd = args[1]
+
+ rm = None
+ try:
+ rm = RouteManager(localBroker)
+ if group == "link":
+ if cmd == "add":
+ if nargs != 4:
+ usage(parser)
+ rm.addLink(remoteBroker)
+ elif cmd == "del":
+ if nargs != 4:
+ usage(parser)
+ rm.delLink(remoteBroker)
+ elif cmd == "list":
+ rm.listLinks()
+
+ elif group == "dynamic":
+ if cmd == "add":
+ if nargs < 5 or nargs > 7:
+ usage(parser)
+
+ tag = ""
+ excludes = ""
+ mech = "PLAIN"
+ if nargs > 5: tag = args[5]
+ if nargs > 6: excludes = args[6]
+ rm.addRoute(remoteBroker, args[4], "", tag, excludes, mech, dynamic=True)
+ elif cmd == "del":
+ if nargs != 5:
+ usage(parser)
+ else:
+ rm.delRoute(remoteBroker, args[4], "", dynamic=True)
+
+ elif group == "route":
+ if cmd == "add":
+ if nargs < 6 or nargs > 9:
+ usage(parser)
+
+ tag = ""
+ excludes = ""
+ mech = "PLAIN"
+ if nargs > 6: tag = args[6]
+ if nargs > 7: excludes = args[7]
+ if nargs > 8: mech = args[8]
+ rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, mech, dynamic=False)
+ elif cmd == "del":
+ if nargs != 6:
+ usage(parser)
+ rm.delRoute(remoteBroker, args[4], args[5], dynamic=False)
+ elif cmd == "map":
+ rm.mapRoutes()
else:
- rm.delRoute(remoteBroker, args[4], "", dynamic=True)
-
- elif group == "route":
- if cmd == "add":
- if nargs < 6 or nargs > 9:
- usage(parser)
+ if cmd == "list":
+ rm.listRoutes()
+ elif cmd == "flush":
+ rm.clearAllRoutes()
+ else:
+ usage(parser)
- tag = ""
- excludes = ""
- mech = "PLAIN"
- if nargs > 6: tag = args[6]
- if nargs > 7: excludes = args[7]
- if nargs > 8: mech = args[8]
- rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, mech, dynamic=False)
- elif cmd == "del":
+ elif group == "queue":
if nargs != 6:
usage(parser)
- rm.delRoute(remoteBroker, args[4], args[5], dynamic=False)
- elif cmd == "map":
- rm.mapRoutes()
- else:
- if cmd == "list":
- rm.listRoutes()
- elif cmd == "flush":
- rm.clearAllRoutes()
+ if cmd == "add":
+ rm.addQueueRoute(remoteBroker, exchange=args[4], queue=args[5])
+ elif cmd == "del":
+ rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5])
else:
usage(parser)
- elif group == "queue":
- if nargs != 6:
- usage(parser)
- if cmd == "add":
- rm.addQueueRoute(remoteBroker, exchange=args[4], queue=args[5])
- elif cmd == "del":
- rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5])
- else:
- usage(parser)
+ except Exception,e:
+ if rm:
+ rm.disconnect() # try to release broker resources
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ return 1
-except Exception,e:
- if rm:
- rm.disconnect() # try to release broker resources
- print "Failed: %s - %s" % (e.__class__.__name__, e)
- sys.exit(1)
+ rm.disconnect()
+ return 0
-rm.disconnect()
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat
index 9229fbec29..150c918f6d 100755
--- a/tools/src/py/qpid-stat
+++ b/tools/src/py/qpid-stat
@@ -21,6 +21,7 @@
import os
from optparse import OptionParser, OptionGroup
+from time import sleep ### debug
import sys
import locale
import socket
@@ -28,13 +29,68 @@ import re
from qmf.console import Session, Console
from qpid.disp import Display, Header, Sorter
-_host = "localhost"
-_connTimeout = 10
-_types = ""
-_limit = 50
-_increasing = False
-_sortcol = None
-_cluster_detail = False
+class Config:
+ def __init__(self):
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._types = ""
+ self._limit = 50
+ self._increasing = False
+ self._sortcol = None
+ self._cluster_detail = False
+
+config = Config()
+
+def OptionsAndArguments(argv):
+ """ Set global variables for options, return arguments """
+
+ global config
+
+ parser = OptionParser(usage="usage: %prog [options] BROKER",
+ description="Example: $ qpid-stat -q broker-host:10000")
+
+ group1 = OptionGroup(parser, "General Options")
+ group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)")
+
+ parser.add_option_group(group1)
+
+ group2 = OptionGroup(parser, "Display Options")
+ group2.add_option("-b", "--broker", help="Show Brokers",
+ action="store_const", const="b", dest="show")
+ group2.add_option("-c", "--connections", help="Show Connections",
+ action="store_const", const="c", dest="show")
+ group2.add_option("-e", "--exchanges", help="Show Exchanges",
+ action="store_const", const="e", dest="show")
+ group2.add_option("-q", "--queues", help="Show Queues",
+ action="store_const", const="q", dest="show")
+ group2.add_option("-u", "--subscriptions", help="Show Subscriptions",
+ action="store_const", const="u", dest="show")
+ group2.add_option("-S", "--sort-by", metavar="COLNAME",
+ help="Sort by column name")
+ group2.add_option("-I", "--increasing", action="store_true", default=False,
+ help="Sort by increasing value (default = decreasing)")
+ group2.add_option("-L", "--limit", default=50, metavar="NUM",
+ help="Limit output to NUM rows")
+ group2.add_option("-C", "--cluster", action="store_true", default=False,
+ help="Display per-broker cluster detail.")
+ parser.add_option_group(group2)
+
+ opts, args = parser.parse_args(args=argv)
+
+ if not opts.show:
+ parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help")
+
+ config._types = opts.show
+ config._sortcol = opts.sort_by
+ config._connTimeout = opts.timeout
+ config._increasing = opts.increasing
+ config._limit = opts.limit
+ config._cluster_detail = opts.cluster
+
+ if args:
+ config._host = args[0]
+
+ return args
class IpAddr:
def __init__(self, text):
@@ -125,7 +181,7 @@ class BrokerManager(Console):
def SetBroker(self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
+ self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == '0':
@@ -157,7 +213,7 @@ class BrokerManager(Console):
def _getHostList(self, urlList):
hosts = []
- hostAddr = IpAddr(_host)
+ hostAddr = IpAddr(config._host)
for url in urlList:
if url.find("amqp:") != 0:
raise Exception("Invalid URL 1")
@@ -223,8 +279,8 @@ class BrokerManager(Console):
len(broker.exchanges), len(broker.queues))
rows.append(row)
title = "Brokers"
- if _sortcol:
- sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
@@ -263,8 +319,8 @@ class BrokerManager(Console):
title = "Connections"
if self.cluster:
title += " for cluster '%s'" % self.cluster.clusterName
- if _sortcol:
- sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
@@ -309,8 +365,8 @@ class BrokerManager(Console):
title = "Exchanges"
if self.cluster:
title += " for cluster '%s'" % self.cluster.clusterName
- if _sortcol:
- sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
@@ -356,8 +412,8 @@ class BrokerManager(Console):
title = "Queues"
if self.cluster:
title += " for cluster '%s'" % self.cluster.clusterName
- if _sortcol:
- sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
@@ -403,8 +459,8 @@ class BrokerManager(Console):
title = "Subscriptions"
if self.cluster:
title += " for cluster '%s'" % self.cluster.clusterName
- if _sortcol:
- sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
@@ -419,7 +475,7 @@ class BrokerManager(Console):
elif main == 'u': self.displaySubscriptions(subs)
def display(self):
- if _cluster_detail or _types[0] == 'b':
+ if config._cluster_detail or config._types[0] == 'b':
# always show cluster detail when dumping broker stats
self._getCluster()
if self.cluster:
@@ -427,77 +483,36 @@ class BrokerManager(Console):
hostList = self._getHostList(memberList)
self.qmf.delBroker(self.broker)
self.broker = None
- if _host.find("@") > 0:
- authString = _host.split("@")[0] + "@"
+ if config._host.find("@") > 0:
+ authString = config._host.split("@")[0] + "@"
else:
authString = ""
for host in hostList:
- b = self.qmf.addBroker(authString + host, _connTimeout)
+ b = self.qmf.addBroker(authString + host, config._connTimeout)
self.brokers.append(Broker(self.qmf, b))
else:
self.brokers.append(Broker(self.qmf, self.broker))
- self.displayMain(_types[0], _types[1:])
-
-
-##
-## Main Program
-##
-
-parser = OptionParser(usage="usage: %prog [options] BROKER",
- description="Example: $ qpid-stat -q broker-host:10000")
-
-group1 = OptionGroup(parser, "General Options")
-group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)")
-parser.add_option_group(group1)
-
-group2 = OptionGroup(parser, "Display Options")
-group2.add_option("-b", "--broker", help="Show Brokers",
- action="store_const", const="b", dest="show")
-group2.add_option("-c", "--connections", help="Show Connections",
- action="store_const", const="c", dest="show")
-group2.add_option("-e", "--exchanges", help="Show Exchanges",
- action="store_const", const="e", dest="show")
-group2.add_option("-q", "--queues", help="Show Queues",
- action="store_const", const="q", dest="show")
-group2.add_option("-u", "--subscriptions", help="Show Subscriptions",
- action="store_const", const="u", dest="show")
-group2.add_option("-S", "--sort-by", metavar="COLNAME",
- help="Sort by column name")
-group2.add_option("-I", "--increasing", action="store_true", default=False,
- help="Sort by increasing value (default = decreasing)")
-group2.add_option("-L", "--limit", default=50, metavar="NUM",
- help="Limit output to NUM rows")
-group2.add_option("-C", "--cluster", action="store_true", default=False,
- help="Display per-broker cluster detail.")
-parser.add_option_group(group2)
-
-opts, args = parser.parse_args()
-
-if not opts.show:
- parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help")
-
-_types = opts.show
-_sortcol = opts.sort_by
-_connTimeout = opts.timeout
-_increasing = opts.increasing
-_limit = opts.limit
-_cluster_detail = opts.cluster
-
-if args:
- _host = args[0]
-
-bm = BrokerManager()
-
-try:
- bm.SetBroker(_host)
- bm.display()
-except KeyboardInterrupt:
- print
-except Exception,e:
- print "Failed: %s - %s" % (e.__class__.__name__, e)
+ self.displayMain(config._types[0], config._types[1:])
+
+
+def main(argv=None):
+
+ args = OptionsAndArguments(argv)
+ bm = BrokerManager()
+
+ try:
+ bm.SetBroker(config._host)
+ bm.display()
+ bm.Disconnect()
+ return 0
+ except KeyboardInterrupt:
+ print
+ except Exception,e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
bm.Disconnect() # try to deallocate brokers
- raise # FIXME aconway 2010-03-03:
- sys.exit(1)
+ return 1
-bm.Disconnect()
+if __name__ == "__main__":
+ sys.exit(main())