#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os import getopt import sys import socket import qpid from threading import Condition from qpid.management import managementClient from qpid.peer import Closed from qpid.client import Client from time import sleep _defspecpath = "/usr/share/amqp/amqp.0-10-preview.xml" _specpath = _defspecpath _recursive = False _host = "localhost" _durable = False _fileCount = 8 _fileSize = 24 FILECOUNT = "qpid.file_count" FILESIZE = "qpid.file_size" def Usage (): print "Usage: qpid-config [OPTIONS]" print " qpid-config [OPTIONS] exchanges [filter-string]" print " qpid-config [OPTIONS] queues [filter-string]" print " qpid-config [OPTIONS] add exchange [AddExchangeOptions]" print " qpid-config [OPTIONS] del exchange " print " qpid-config [OPTIONS] add queue [AddQueueOptions]" print " qpid-config [OPTIONS] del queue " print " qpid-config [OPTIONS] bind [binding-key]" print " qpid-config [OPTIONS] unbind [binding-key]" print print "Options:" print " -b [ --bindings ] Show bindings in queue or exchange list" print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker" print " broker-addr is in the form: hostname | ip-address [:]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000" print " -s [ --spec-file] Path (" + _defspecpath + ")" print " AMQP specification file" print print "Add Queue Options:" print " --durable Queue is durable" print " --file-count N (8) Number of files in queue's persistence journal" print " --file-size N (24) File size in pages (64Kib/page)" print print "Add Exchange Options:" print " --durable Exchange is durable" print sys.exit (1) class Broker: def __init__ (self, text): colon = text.find (":") if colon == -1: host = text self.port = 5672 else: host = text[:colon] self.port = int (text[colon+1:]) self.host = socket.gethostbyname (host) def name (self): return self.host + ":" + str (self.port) class BrokerManager: def __init__ (self): self.dest = None self.src = None self.broker = None def SetBroker (self, broker): self.broker = broker def ConnectToBroker (self): try: self.spec = qpid.spec.load (_specpath) self.client = Client (self.broker.host, self.broker.port, self.spec) self.client.start ({"LOGIN":"guest","PASSWORD":"guest"}) self.channel = self.client.channel (1) self.mclient = managementClient (self.spec) self.mchannel = self.mclient.addChannel (self.channel) except socket.error, e: print "Connect Error:", e exit (1) def Overview (self): self.ConnectToBroker () mc = self.mclient mch = self.mchannel mc.syncWaitForStable (mch) exchanges = mc.syncGetObjects (mch, "exchange") queues = mc.syncGetObjects (mch, "queue") print "Total Exchanges: %d" % len (exchanges) etype = {} for ex in exchanges: if ex.type not in etype: etype[ex.type] = 1 else: etype[ex.type] = etype[ex.type] + 1 for typ in etype: print "%15s: %d" % (typ, etype[typ]) print print " Total Queues: %d" % len (queues) _durable = 0 for queue in queues: if queue.durable: _durable = _durable + 1 print " durable: %d" % _durable print " non-durable: %d" % (len (queues) - _durable) def ExchangeList (self, filter): self.ConnectToBroker () mc = self.mclient mch = self.mchannel mc.syncWaitForStable (mch) exchanges = mc.syncGetObjects (mch, "exchange") print "Type Bindings Exchange Name" print "=============================================" for ex in exchanges: if self.match (ex.name, filter): print "%-10s%5d %s" % (ex.type, ex.bindings, ex.name) def ExchangeListRecurse (self, filter): self.ConnectToBroker () mc = self.mclient mch = self.mchannel mc.syncWaitForStable (mch) exchanges = mc.syncGetObjects (mch, "exchange") bindings = mc.syncGetObjects (mch, "binding") queues = mc.syncGetObjects (mch, "queue") for ex in exchanges: if self.match (ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: if bind.exchangeRef == ex.id: qname = "" queue = self.findById (queues, bind.queueRef) if queue != None: qname = queue.name print " bind [%s] => %s" % (bind.bindingKey, qname) def QueueList (self, filter): self.ConnectToBroker () mc = self.mclient mch = self.mchannel mc.syncWaitForStable (mch) queues = mc.syncGetObjects (mch, "queue") journals = mc.syncGetObjects (mch, "journal") print " Store Size" print "Durable AutoDel Excl Bindings (files x file pages) Queue Name" print "===========================================================================================" for q in queues: if self.match (q.name, filter): args = q.arguments if q.durable and FILESIZE in args and FILECOUNT in args: fs = int (args[FILESIZE]) fc = int (args[FILECOUNT]) print "%4c%9c%7c%10d%11dx%-14d%s" % \ (YN (q.durable), YN (q.autoDelete), YN (q.exclusive), q.bindings, fc, fs, q.name) else: if not _durable: print "%4c%9c%7c%10d %s" % \ (YN (q.durable), YN (q.autoDelete), YN (q.exclusive), q.bindings, q.name) def QueueListRecurse (self, filter): self.ConnectToBroker () mc = self.mclient mch = self.mchannel mc.syncWaitForStable (mch) exchanges = mc.syncGetObjects (mch, "exchange") bindings = mc.syncGetObjects (mch, "binding") queues = mc.syncGetObjects (mch, "queue") for queue in queues: if self.match (queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: if bind.queueRef == queue.id: ename = "" ex = self.findById (exchanges, bind.exchangeRef) if ex != None: ename = ex.name if ename == "": ename = "''" print " bind [%s] => %s" % (bind.bindingKey, ename) def AddExchange (self, args): if len (args) < 2: Usage () self.ConnectToBroker () etype = args[0] ename = args[1] try: self.channel.exchange_declare (exchange=ename, type=etype, durable=_durable) except Closed, e: print "Failed:", e def DelExchange (self, args): if len (args) < 1: Usage () self.ConnectToBroker () ename = args[0] try: self.channel.exchange_delete (exchange=ename) except Closed, e: print "Failed:", e def AddQueue (self, args): if len (args) < 1: Usage () self.ConnectToBroker () qname = args[0] declArgs = {} if _durable: declArgs[FILECOUNT] = _fileCount declArgs[FILESIZE] = _fileSize try: self.channel.queue_declare (queue=qname, durable=_durable, arguments=declArgs) except Closed, e: print "Failed:", e def DelQueue (self, args): if len (args) < 1: Usage () self.ConnectToBroker () qname = args[0] try: self.channel.queue_delete (queue=qname) except Closed, e: print "Failed:", e def Bind (self, args): if len (args) < 2: Usage () self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] try: self.channel.queue_bind (queue=qname, exchange=ename, routing_key=key) except Closed, e: print "Failed:", e def Unbind (self, args): if len (args) < 2: Usage () self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] try: self.channel.queue_unbind (queue=qname, exchange=ename, routing_key=key) except Closed, e: print "Failed:", e def findById (self, items, id): for item in items: if item.id == id: return item return None def match (self, name, filter): if filter == "": return True if name.find (filter) == -1: return False return True def YN (bool): if bool: return 'Y' return 'N' ## ## Main Program ## try: longOpts = ("durable", "spec-file=", "bindings", "broker-addr=", "file-count=", "file-size=") (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "s:a:b", longOpts) except: Usage () for opt in optlist: if opt[0] == "-s" or opt[0] == "--spec-file": _specpath = opt[1] if opt[0] == "-b" or opt[0] == "--bindings": _recursive = True if opt[0] == "-a" or opt[0] == "--broker-addr": _host = opt[1] if opt[0] == "--durable": _durable = True if opt[0] == "--file-count": _fileCount = int (opt[1]) if opt[0] == "--file-size": _fileSize = int (opt[1]) nargs = len (cargs) bm = BrokerManager () bm.SetBroker (Broker (_host)) if nargs == 0: bm.Overview () else: cmd = cargs[0] modifier = "" if nargs > 1: modifier = cargs[1] if cmd[0] == 'e': if _recursive: bm.ExchangeListRecurse (modifier) else: bm.ExchangeList (modifier) elif cmd[0] == 'q': if _recursive: bm.QueueListRecurse (modifier) else: bm.QueueList (modifier) elif cmd == "add": if modifier == "exchange": bm.AddExchange (cargs[2:]) elif modifier == "queue": bm.AddQueue (cargs[2:]) else: Usage () elif cmd == "del": if modifier == "exchange": bm.DelExchange (cargs[2:]) elif modifier == "queue": bm.DelQueue (cargs[2:]) else: Usage () elif cmd == "bind": bm.Bind (cargs[1:]) elif cmd == "unbind": bm.Unbind (cargs[1:]) else: Usage ()