#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import optparse, sys, time, os, re, math from qpid.messaging import Connection from qpid.messaging import Message as QpidMessage from qpid.util import URL from qpidtoollibs.broker import BrokerAgent from qpidtoollibs.config import parse_qpidd_conf try: from uuid import uuid4 except ImportError: from qpid.datatypes import uuid4 # QMF address for the HA broker object. HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker" # Define these defaults here rather than in add_option because we want # to use qpidd.conf for defaults if --config is specified and # these defaults otherwise: DEFAULTS = { "broker":"0.0.0.0", "timeout":10.0} class ExitStatus(Exception): """Raised if a command want's a non-0 exit status from the script""" def __init__(self, status): self.status = status def find_qpidd_conf(): """Return the path to the local qpid.conf file or None if it is not found""" p = os.path prefix, bin = p.split(p.dirname(__file__)) if bin == "bin": # Installed in a standard place. conf = p.join(prefix, "etc", "qpid", "qpidd.conf") if p.isfile(conf): return conf return None class Command(object): """ Common options and logic for all commands. Subclasses provide additional options and execution logic. """ commands = [] def __init__(self, name, help, arg_names=[], connect_agent=True): """@param connect_agent true if we should establish a QMF agent connection""" Command.commands.append(self) self.name = name self.connect_agent = connect_agent self.arg_names = arg_names usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help) self.help = help self.op=optparse.OptionParser(usage) common = optparse.OptionGroup(self.op, "Broker connection options") def help_default(what): return " (Default %s)"%DEFAULTS[what] common.add_option("-b", "--broker", metavar="
", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:]"+help_default("broker")) common.add_option("--timeout", type="float", metavar="", help="Give up if the broker does not respond within the timeout. 0 means wait forever"+help_default("timeout")) common.add_option("--sasl-mechanism", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override") common.add_option("--ssl-certificate", metavar="", help="Client SSL certificate (PEM Format)") common.add_option("--ssl-key", metavar="", help="Client SSL private key (PEM Format)") common.add_option("--config", metavar="", help="Read default connection configuration from the qpidd.conf broker configuration file. Defaults are overridden by command-line options.)") self.op.add_option_group(common) def connect(self, opts): conn_options = {} if not opts.broker: opts.broker = DEFAULTS["broker"] # If we are connecting locally, use local qpidd.conf by default if not opts.config: opts.config = find_qpidd_conf() url = URL(opts.broker) if opts.config: # Use broker config file for defaults config = parse_qpidd_conf(opts.config) if not url.user: url.user = config.get("ha-username") if not url.password: url.password = config.get("ha-password") if not url.port: url.port = config.get("port") opts.broker = str(url) if not opts.sasl_mechanism: opts.sasl_mechanism = config.get("ha-mechanism") if not opts.timeout: timeout = config.get("ha-heartbeat-interval") or config.get("link-heartbeat-interval") if timeout: opts.timeout = float(timeout) else: # Use DEFAULTS if not opts.timeout: opts.timeout = DEFAULTS["timeout"] if opts.sasl_mechanism: conn_options['sasl_mechanisms'] = opts.sasl_mechanism if opts.ssl_certificate: conn_options['ssl_certfile'] = opts.ssl_certificate if opts.ssl_key: if not opts.ssl_certificate: self.op.error("missing '--ssl-certificate' (required by '--ssl-key')") conn_options['ssl_keyfile'] = opts.ssl_key conn_options['client_properties'] = {'qpid.ha-admin' : 1} if opts.timeout: conn_options['timeout'] = opts.timeout conn_options['heartbeat'] = int(math.ceil(opts.timeout/2)) connection = Connection.establish(opts.broker, **conn_options) qmf_broker = self.connect_agent and BrokerAgent(connection) ha_broker = self.connect_agent and qmf_broker.getHaBroker() return (connection, qmf_broker, ha_broker) def all_brokers(self, ha_broker, opts, func): """@return: List of (broker_addr, ha_broker) for all brokers in the cluster. Returns (broker_addr, Exception) if an exception is raised accessing a broker. """ # The brokersUrl setting is not in python URL format, simpler parsing here. result = [] brokers = filter(None, re.sub(r'(^amqps?:)|(tcp:)', "", ha_broker.brokersUrl).split(",")) if brokers and opts.all: if "@" in opts.broker: userpass = opts.broker.split("@")[0] else: userpass = None for b in brokers: if userpass and not "@" in b: opts.broker = userpass+"@"+b else: opts.broker = b try: connection, qmf_broker, ha_broker = self.connect(opts) func(ha_broker, b) except Exception,e: func(ha_broker, b, e) else: func(ha_broker) def execute(self, args): opts, args = self.op.parse_args(args) if len(args) != len(self.arg_names)+1: self.op.print_help() raise Exception("Wrong number of arguments") self.connection, qmf_broker, ha_broker = self.connect(opts) if self.connect_agent and not ha_broker: raise Exception("HA module is not loaded on broker at %s" % opts.broker) try: self.do_execute(qmf_broker, ha_broker, opts, args) finally: self.connection.close() def do_execute(self, qmf_broker, opts, args): raise Exception("Command '%s' is not yet implemented"%self.name) class ManagerCommand(Command): """ Base for commands that should only be used by a cluster manager tool that ensures cluster consistency. """ manager_commands = [] # Cluster manager commands def __init__(self, name, help, arg_names=[], connect_agent=True): """@param connect_agent true if we should establish a QMF agent connection""" super(ManagerCommand, self).__init__(name, "[Cluster manager only] "+help, arg_names, connect_agent) self.commands.remove(self) # Not a user command self.manager_commands.append(self) class PingCmd(Command): def __init__(self): Command.__init__(self, "ping","Check if the broker is alive and responding", connect_agent=False) def do_execute(self, qmf_broker, ha_broker, opts, args): self.connection.session() # Make sure we can establish a session. PingCmd() class PromoteCmd(ManagerCommand): def __init__(self): super(PromoteCmd, self).__init__("promote", "Promote a backup broker to primary. This command should *only* be used by a cluster manager (such as rgmanager) that ensures only one broker is primary at a time. Promoting more than one broker to primary at the same time will make the cluster inconsistent and will cause data loss and unexpected behavior.") def do_execute(self, qmf_broker, ha_broker, opts, args): qmf_broker._method("promote", {}, HA_BROKER, timeout=opts.timeout) PromoteCmd() class StatusCmd(Command): def __init__(self): Command.__init__(self, "status", "Print HA status") self.op.add_option( "--expect", metavar="", help="Don't print status. Return 0 if it matches , 1 otherwise") self.op.add_option( "--is-primary", action="store_true", default=False, help="Don't print status. Return 0 if the broker is primary, 1 otherwise") self.op.add_option( "--all", action="store_true", default=False, help="Print status for all brokers in the cluster") def do_execute(self, qmf_broker, ha_broker, opts, args): if opts.is_primary: if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1) return if opts.expect: if opts.expect != ha_broker.status: raise ExitStatus(1) return def status(hb, b=None, ex=None): if ex: print b, ex elif b: print b, hb.status else: print hb.status self.all_brokers(ha_broker, opts, status) StatusCmd() class ReplicateCmd(Command): def __init__(self): Command.__init__(self, "replicate", "Set up replication from on to on the current broker.", ["", ""]) def do_execute(self, qmf_broker, ha_broker, opts, args): qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER, timeout=opts.timeout) ReplicateCmd() class QueryCmd(Command): def __init__(self): Command.__init__(self, "query", "Print HA configuration and status") self.op.add_option( "--all", action="store_true", default=False, help="Print configuration and status for all brokers in the cluster") def do_execute(self, qmf_broker, ha_broker, opts, args): def query(hb, b=None, ex=None): if ex: print "%s %s\n" % (b, ex) else: if b: print "%-20s %s"%("Address:", b) for x in [("Status:", hb.status), ("Broker ID:", hb.systemId), ("Brokers URL:", hb.brokersUrl), ("Public URL:", hb.publicUrl), ("Replicate: ", hb.replicateDefault) ]: print "%-20s %s"%x if b: print self.all_brokers(ha_broker, opts, query) QueryCmd() def print_usage(prog): print "usage: %s []\n\nCommands are:\n"%prog for cmd in Command.commands: print " %-12s %s."%(cmd.name, cmd.help.split(".")[0]) print "\nFor help with a command type: %s --help\n"%prog def find_command(args, commands): """Find a command among the arguments and options""" for arg in args: cmds = [cmd for cmd in commands if cmd.name == arg] if cmds: return cmds[0] return None def main_except(argv): """This version of main raises exceptions""" args = argv[1:] commands = Command.commands if "--cluster-manager" in args: commands += ManagerCommand.manager_commands args.remove("--cluster-manager") if len(args) and args[0] in ['help', '--help', '-help', '-h', 'help-all', '--help-all']: if 'help-all' in args[0]: for c in commands: c.op.print_help(); print else: print_usage(os.path.basename(argv[0])); else: command = find_command(args, commands) if command: command.execute(args) else: # Check for attempt to use a manager command without --cluster-manager command = find_command(args, ManagerCommand.manager_commands) if command: message="""'%s' should only be called by the cluster manager. Incorrect use of '%s' will cause cluster malfunction. To call from a cluster manager use '%s --cluster-manager'. """ raise Exception(message%((command.name,)*3)) else: print_usage(os.path.basename(argv[0])); raise Exception("No valid command") def main(argv): try: main_except(argv) return 0 except ExitStatus, e: return e.status except Exception, e: print "%s: %s"%(type(e).__name__, e) return 1 if __name__ == "__main__": sys.exit(main(sys.argv))