summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-19 15:03:16 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-19 15:03:16 +0000
commit81584c84fadc886b0ad53dceb479073e56bf8cdd (patch)
treef48206d10d52fdbb5a4ce93ec8068f0de4fbc9f5 /tools
parentccd0e27fdf0c5a90a7f85099dac4f63dbd7a5d15 (diff)
downloadqpid-python-81584c84fadc886b0ad53dceb479073e56bf8cdd.tar.gz
QPID-2935: merge producer flow control (C++ broker).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1072356 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tools')
-rwxr-xr-xtools/src/py/qpid-config38
1 files changed, 38 insertions, 0 deletions
diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config
index 04b31e98ed..9ff405541c 100755
--- a/tools/src/py/qpid-config
+++ b/tools/src/py/qpid-config
@@ -97,6 +97,10 @@ class Config:
self._eventGeneration = None
self._file = None
self._sasl_mechanism = None
+ self._flowStopCount = None
+ self._flowResumeCount = None
+ self._flowStopSize = None
+ self._flowResumeSize = None
config = Config()
@@ -111,6 +115,10 @@ LVQNB = "qpid.last_value_queue_no_browse"
MSG_SEQUENCE = "qpid.msg_sequence"
IVE = "qpid.ive"
QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
+FLOW_STOP_COUNT = "qpid.flow_stop_count"
+FLOW_RESUME_COUNT = "qpid.flow_resume_count"
+FLOW_STOP_SIZE = "qpid.flow_stop_size"
+FLOW_RESUME_SIZE = "qpid.flow_resume_size"
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage strings
@@ -162,6 +170,14 @@ def OptionsAndArguments(argv):
group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached")
group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy")
group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.")
+ group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>",
+ help="Turn on sender flow control when the number of queued bytes exceeds this value.")
+ group3.add_option("--flow-resume-size", action="store", type="int", metavar="<n>",
+ help="Turn off sender flow control when the number of queued bytes drops below this value.")
+ group3.add_option("--flow-stop-count", action="store", type="int", metavar="<n>",
+ help="Turn on sender flow control when the number of queued messages exceeds this value.")
+ group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
+ help="Turn off sender flow control when the number of queued messages drops below this value.")
# no option for declaring an exclusive queue - which can only be used by the session that creates it.
parser.add_option_group(group3)
@@ -233,6 +249,14 @@ def OptionsAndArguments(argv):
config._if_unused = False
if opts.sasl_mechanism:
config._sasl_mechanism = opts.sasl_mechanism
+ if opts.flow_stop_size:
+ config._flowStopSize = opts.flow_stop_size
+ if opts.flow_resume_size:
+ config._flowResumeSize = opts.flow_resume_size
+ if opts.flow_stop_count:
+ config._flowStopCount = opts.flow_stop_count
+ if opts.flow_resume_count:
+ config._flowResumeCount = opts.flow_resume_count
return args
@@ -391,6 +415,10 @@ class BrokerManager:
if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
if q.altExchange:
print "--alternate-exchange=%s" % q._altExchange_.name,
+ if FLOW_STOP_SIZE in args: print "--flow-stop-size=%d" % args[FLOW_STOP_SIZE],
+ if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%d" % args[FLOW_RESUME_SIZE],
+ if FLOW_STOP_COUNT in args: print "--flow-stop-count=%d" % args[FLOW_STOP_COUNT],
+ if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%d" % args[FLOW_RESUME_COUNT],
print
def QueueListRecurse(self, filter):
@@ -468,11 +496,21 @@ class BrokerManager:
if config._eventGeneration:
declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration
+ if config._flowStopSize:
+ declArgs[FLOW_STOP_SIZE] = config._flowStopSize
+ if config._flowResumeSize:
+ declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize
+ if config._flowStopCount:
+ declArgs[FLOW_STOP_COUNT] = config._flowStopCount
+ if config._flowResumeCount:
+ declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount
+
if config._altern_ex != None:
self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
else:
self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs)
+
def DelQueue(self, args):
if len(args) < 1:
Usage()