From 8404c0cd50291445fb57736bb3314bd0d75bebba Mon Sep 17 00:00:00 2001 From: "Darryl L. Pierce" Date: Tue, 12 Mar 2013 13:21:20 +0000 Subject: QPID-4115: Moved the Python send and receive apps. Instead of being API examples, they are actually tools that belong with the other tool apps. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1455522 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/examples/README.txt | 5 - qpid/python/examples/api/receive | 194 --------------------------- qpid/python/examples/api/send | 281 --------------------------------------- qpid/tools/src/py/qpid-receive | 194 +++++++++++++++++++++++++++ qpid/tools/src/py/qpid-send | 281 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 475 insertions(+), 480 deletions(-) delete mode 100755 qpid/python/examples/api/receive delete mode 100755 qpid/python/examples/api/send create mode 100755 qpid/tools/src/py/qpid-receive create mode 100755 qpid/tools/src/py/qpid-send diff --git a/qpid/python/examples/README.txt b/qpid/python/examples/README.txt index 3a3e421a1e..4395160fec 100644 --- a/qpid/python/examples/README.txt +++ b/qpid/python/examples/README.txt @@ -14,11 +14,6 @@ api/spout -- A simple messaging client that sends messages to the target specified on the command line. -api/send -- Sends messages to a specified queue. - -api/receive -- Receives messages from a specified queue. - Use with the send example above. - api/server -- An example server that process incoming messages and sends replies. diff --git a/qpid/python/examples/api/receive b/qpid/python/examples/api/receive deleted file mode 100755 index f14df277ac..0000000000 --- a/qpid/python/examples/api/receive +++ /dev/null @@ -1,194 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, sys, time -import statistics -from qpid.messaging import * - -SECOND = 1000 -TIME_SEC = 1000000000 - -op = optparse.OptionParser(usage="usage: %prog [options]", description="Drains messages from the specified address") -op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to") -op.add_option("-a", "--address", type="str", help="address to receive from") -op.add_option("--connection-options", default={}, help="options for the connection") -op.add_option("-m", "--messages", default=0, type="int", help="stop after N messages have been received, 0 means no limit") -op.add_option("--timeout", default=0, type="int", help="timeout in seconds to wait before exiting") -op.add_option("-f", "--forever", default=False, action="store_true", help="ignore timeout and wait forever") -op.add_option("--ignore-duplicates", default=False, action="store_true", help="Detect and ignore duplicates (by checking 'sn' header)") -op.add_option("--verify-sequence", default=False, action="store_true", help="Verify there are no gaps in the message sequence (by checking 'sn' header)") -op.add_option("--check-redelivered", default=False, action="store_true", help="Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") -op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue") -op.add_option("--ack-frequency", default=100, type="int", help="Ack frequency (0 implies none of the messages will get accepted)") -op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)") -op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)") -op.add_option("--print-content", type="str", default="yes", help="print out message content") -op.add_option("--print-headers", type="str", default="no", help="print out message headers") -op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover") -op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics") -op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages") -op.add_option("--report-header", type="str", default="yes", help="Headers on report") -op.add_option("--ready-address", type="str", help="send a message to this address when ready to receive") -op.add_option("--receive-rate", default=0, type="int", help="Receive at rate of N messages/second. 0 means receive as fast as possible") -#op.add_option("--help", default=False, action="store_true", help="print this usage statement") - -def getTimeout(timeout, forever): - if forever: - return None - else: - return SECOND*timeout - - -EOS = "eos" -SN = "sn" - -# Check for duplicate or dropped messages by sequence number -class SequenceTracker: - def __init__(self, opts): - self.opts = opts - self.lastSn = 0 - - # Return True if the message should be procesed, false if it should be ignored. - def track(self, message): - if not(self.opts.verify_sequence) or (self.opts.ignore_duplicates): - return True - sn = message.properties[SN] - duplicate = (sn <= lastSn) - dropped = (sn > lastSn+1) - if self.opts.verify_sequence and dropped: - raise Exception("Gap in sequence numbers %s-%s" %(lastSn, sn)) - ignore = (duplicate and self.opts.ignore_duplicates) - if ignore and self.opts.check_redelivered and (not msg.redelivered): - raise Exception("duplicate sequence number received, message not marked as redelivered!") - if not duplicate: - lastSn = sn - return (not(ignore)) - - -def main(): - opts, args = op.parse_args() - if not opts.address: - raise Exception("Address must be specified!") - - broker = opts.broker - address = opts.address - connection = Connection(opts.broker, **opts.connection_options) - - try: - connection.open() - if opts.failover_updates: - auto_fetch_reconnect_urls(connection) - session = connection.session(transactional=(opts.tx)) - receiver = session.receiver(opts.address) - if opts.capacity > 0: - receiver.capacity = opts.capacity - msg = Message() - count = 0 - txCount = 0 - sequenceTracker = SequenceTracker(opts) - timeout = getTimeout(opts.timeout, opts.forever) - done = False - stats = statistics.ThroughputAndLatency() - reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats) - - if opts.ready_address is not None: - session.sender(opts.ready_address).send(msg) - if opts.tx > 0: - session.commit() - # For receive rate calculation - start = time.time()*TIME_SEC - interval = 0 - if opts.receive_rate > 0: - interval = TIME_SEC / opts.receive_rate - - replyTo = {} # a dictionary of reply-to address -> sender mapping - - while (not done): - try: - msg = receiver.fetch(timeout=timeout) - reporter.message(msg) - if sequenceTracker.track(msg): - if msg.content == EOS: - done = True - else: - count+=1 - if opts.print_headers == "yes": - if msg.subject is not None: - print "Subject: %s" %msg.subject - if msg.reply_to is not None: - print "ReplyTo: %s" %msg.reply_to - if msg.correlation_id is not None: - print "CorrelationId: %s" %msg.correlation_id - if msg.user_id is not None: - print "UserId: %s" %msg.user_id - if msg.ttl is not None: - print "TTL: %s" %msg.ttl - if msg.priority is not None: - print "Priority: %s" %msg.priority - if msg.durable: - print "Durable: true" - if msg.redelivered: - print "Redelivered: true" - print "Properties: %s" %msg.properties - print - if opts.print_content == "yes": - print msg.content - if (opts.messages > 0) and (count >= opts.messages): - done = True - # end of "if sequenceTracker.track(msg):" - if (opts.tx > 0) and (count % opts.tx == 0): - txCount+=1 - if (opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0): - session.rollback() - else: - session.commit() - elif (opts.ack_frequency > 0) and (count % opts.ack_frequency == 0): - session.acknowledge() - if msg.reply_to is not None: # Echo message back to reply-to address. - if msg.reply_to not in replyTo: - replyTo[msg.reply_to] = session.sender(msg.reply_to) - replyTo[msg.reply_to].capacity = opts.capacity - replyTo[msg.reply_to].send(msg) - if opts.receive_rate > 0: - delay = start + count*interval - time.time()*TIME_SEC - if delay > 0: - time.sleep(delay) - # Clear out message properties & content for next iteration. - msg = Message() - except Empty: # no message fetched => break the while cycle - break - # end of while cycle - if opts.report_total: - reporter.report() - if opts.tx > 0: - txCount+=1 - if opts.rollback_frequency and (txCount % opts.rollback_frequency == 0): - session.rollback() - else: - session.commit() - else: - session.acknowledge() - session.close() - connection.close() - except Exception,e: - print e - connection.close() - -if __name__ == "__main__": main() diff --git a/qpid/python/examples/api/send b/qpid/python/examples/api/send deleted file mode 100755 index b0105e41a6..0000000000 --- a/qpid/python/examples/api/send +++ /dev/null @@ -1,281 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, random, os, time, uuid -from qpid.messaging import * -import statistics - -EOS = "eos" -SN = "sn" -TS = "ts" - -TIME_SEC = 1000000000 -SECOND = 1000 - -def nameval(st): - idx = st.find("=") - if idx >= 0: - name = st[0:idx] - value = st[idx+1:] - else: - name = st - value = None - return name, value - - -op = optparse.OptionParser(usage="usage: %prog [options]", description="Spouts messages to the specified address") -op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to") -op.add_option("-a", "--address", type="str", help="address to send to") -op.add_option("--connection-options", default={}, help="options for the connection") -op.add_option("-m", "--messages", default=1, type="int", help="stop after N messages have been sent, 0 means no limit") -op.add_option("-i", "--id", type="str", help="use the supplied id instead of generating one") -op.add_option("--reply-to", type="str", help="specify reply-to address") -op.add_option("--send-eos", default=0, type="int", help="Send N EOS messages to mark end of input") -op.add_option("--durable", default=False, action="store_true", help="Mark messages as durable") -op.add_option("--ttl", default=0, type="int", help="Time-to-live for messages, in milliseconds") -op.add_option("--priority", default=0, type="int", help="Priority for messages (higher value implies higher priority)") -op.add_option("-P", "--property", default=[], action="append", type="str", help="specify message property") -op.add_option("--correlation-id", type="str", help="correlation-id for message") -op.add_option("--user-id", type="str", help="userid for message") -op.add_option("--content-string", type="str", help="use CONTENT as message content") -op.add_option("--content-size", default=0, type="int", help="create an N-byte message content") -op.add_option("-M", "--content-map", default=[], action="append", type="str", help="specify entry for map content") -op.add_option("--content-stdin", default=False, action="store_true", help="read message content from stdin, one line per message") -op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue") -op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)") -op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)") -op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover") -op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics") -op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages") -op.add_option("--report-header", type="str", default="yes", help="Headers on report") -op.add_option("--send-rate", default=0, type="int", help="Send at rate of N messages/second. 0 means send as fast as possible") -op.add_option("--flow-control", default=0, type="int", help="Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.") -op.add_option("--sequence", type="str", default="yes", help="Add a sequence number messages property (required for duplicate/lost message detection)") -op.add_option("--timestamp", type="str", default="yes", help="Add a time stamp messages property (required for latency measurement)") -op.add_option("--group-key", type="str", help="Generate groups of messages using message header 'KEY' to hold the group identifier") -op.add_option("--group-prefix", default="GROUP-", type="str", help="Generate group identifers with 'STRING' prefix (if group-key specified)") -op.add_option("--group-size", default=10, type="int", help="Number of messages per a group (if group-key specified)") -op.add_option("--group-randomize-size", default=False, action="store_true", help="Randomize the number of messages per group to [1...group-size] (if group-key specified)") -op.add_option("--group-interleave", default=1, type="int", help="Simultaineously interleave messages from N different groups (if group-key specified)") - - -class ContentGenerator: - def setContent(self, msg): - return - -class GetlineContentGenerator(ContentGenerator): - def setContent(self, msg): - content = sys.stdin.readline() - got = (not line) - if (got): - msg.content = content - return got - -class FixedContentGenerator(ContentGenerator): - def __init__(self, content=None): - self.content = content - - def setContent(self, msg): - msg.content = self.content - return True - -class MapContentGenerator(ContentGenerator): - def __init__(self, opts=None): - self.opts = opts - - def setContent(self, msg): - self.content = {} - for e in self.opts.content_map: - name, val = nameval(p) - content[name] = val - msg.content = self.content - return True - - -# tag each generated message with a group identifer -class GroupGenerator: - def __init__(self, key, prefix, size, randomize, interleave): - groupKey = key - groupPrefix = prefix - groupSize = size - randomizeSize = randomize - groupSuffix = 0 - if (randomize > 0): - random.seed(os.getpid()) - - for i in range(0, interleave): - self.newGroup() - current = 0 - - def setGroupInfo(self, msg): - if (current == len(groups)): - current = 0 - my_group = groups[current] - msg.properties[groupKey] = my_group[id]; - # print "SENDING GROUPID=[%s]\n" % my_group[id] - my_group[count]=my_group[count]+1 - if (my_group[count] == my_group[size]): - self.newGroup() - del groups[current] - else: - current+=1 - - def newGroup(self): - groupId = "%s%s" % (groupPrefix, groupSuffix) - groupSuffix+=1 - size = groupSize - if (randomizeSize == True): - size = random.randint(1,groupSize) - # print "New group: GROUPID=["%s] size=%s" % (groupId, size) - groups.append({'id':groupId, 'size':size, 'count':0}) - - - -def main(): - opts, args = op.parse_args() - if not opts.address: - raise Exception("Address must be specified!") - - broker = opts.broker - address = opts.address - connection = Connection(opts.broker, **opts.connection_options) - - try: - connection.open() - if (opts.failover_updates): - auto_fetch_reconnect_urls(connection) - session = connection.session(transactional=(opts.tx)) - sender = session.sender(opts.address) - if (opts.capacity>0): - sender.capacity = opts.capacity - sent = 0 - txCount = 0 - stats = statistics.Throughput() - reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats) - - contentGen = ContentGenerator() - content = "" # auxiliary variable for determining content type of message - needs to be changed to {} for Map message - if opts.content_stdin: - opts.messages = 0 # Don't limit number of messages sent. - contentGen = GetlineContentGenerator() - elif opts.content_map is not None: - contentGen = MapContentGenerator(opts) - content = {} - elif opts.content_size is not None: - contentGen = FixedContentGenerator('X' * opts.content_size) - else: - contentGen = FixedContentGenerator(opts.content_string) - if opts.group_key is not None: - groupGen = GroupGenerator(opts.group_key, opts.group_prefix, opts.group_size, opts.group_random_size, opts.group_interleave) - - msg = Message(content=content) - msg.durable = opts.durable - if opts.ttl: - msg.ttl = opts.ttl/1000.0 - if opts.priority: - msg.priority = opts.priority - if opts.reply_to is not None: - if opts.flow_control > 0: - raise Exception("Can't use reply-to and flow-control together") - msg.reply_to = opts.reply_to - if opts.user_id is not None: - msg.user_id = opts.user_id - if opts.correlation_id is not None: - msg.correlation_id = opts.correlation_id - for p in opts.property: - name, val = nameval(p) - msg.properties[name] = val - - start = time.time()*TIME_SEC - interval = 0 - if opts.send_rate > 0: - interval = TIME_SEC/opts.send_rate - - flowControlAddress = "flow-" + str(uuid.uuid1()) + ";{create:always,delete:always}" - flowSent = 0 - if opts.flow_control > 0: - flowControlReceiver = session.receiver(flowControlAddress) - flowControlReceiver.capacity = 2 - - while (contentGen.setContent(msg) == True): - sent+=1 - if opts.sequence == "yes": - msg.properties[SN] = sent - - if opts.flow_control > 0: - if (sent % opts.flow_control == 0): - msg.reply_to = flowControlAddress - flowSent+=1 - else: - msg.reply_to = "" # Clear the reply address. - - if 'groupGen' in vars(): - groupGen.setGroupInfo(msg) - - if (opts.timestamp == "yes"): - msg.properties[TS] = int(time.time()*TIME_SEC) - sender.send(msg) - reporter.message(msg) - - if ((opts.tx > 0) and (sent % opts.tx == 0)): - txCount+=1 - if ((opts.rollbackFrequency > 0) and (txCount % opts.rollbackFrequency == 0)): - session.rollback() - else: - session.commit() - if ((opts.messages > 0) and (sent >= opts.messages)): - break - - if (opts.flow_control > 0) and (flowSent == 2): - flowControlReceiver.fetch(timeout=SECOND) - flowSent -= 1 - - if (opts.send_rate > 0): - delay = start + sent*interval - time.time()*TIME_SEC - if (delay > 0): - time.sleep(delay) - #end of while - - while flowSent > 0: - flowControlReceiver.fetch(timeout=SECOND) - flowSent -= 1 - - if (opts.report_total): - reporter.report() - for i in reversed(range(1,opts.send_eos+1)): - if (opts.sequence == "yes"): - sent+=1 - msg.properties[SN] = sent - msg.properties[EOS] = True #TODO (also in C++ client): add in ability to send digest or similar - sender.send(msg) - if ((opts.tx > 0) and (sent % opts.tx == 0)): - txCount+=1 - if ((opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0)): - session.rollback() - else: - session.commit() - session.sync() - session.close() - connection.close() - except Exception,e: - print e - connection.close() - -if __name__ == "__main__": main() diff --git a/qpid/tools/src/py/qpid-receive b/qpid/tools/src/py/qpid-receive new file mode 100755 index 0000000000..f14df277ac --- /dev/null +++ b/qpid/tools/src/py/qpid-receive @@ -0,0 +1,194 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, sys, time +import statistics +from qpid.messaging import * + +SECOND = 1000 +TIME_SEC = 1000000000 + +op = optparse.OptionParser(usage="usage: %prog [options]", description="Drains messages from the specified address") +op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to") +op.add_option("-a", "--address", type="str", help="address to receive from") +op.add_option("--connection-options", default={}, help="options for the connection") +op.add_option("-m", "--messages", default=0, type="int", help="stop after N messages have been received, 0 means no limit") +op.add_option("--timeout", default=0, type="int", help="timeout in seconds to wait before exiting") +op.add_option("-f", "--forever", default=False, action="store_true", help="ignore timeout and wait forever") +op.add_option("--ignore-duplicates", default=False, action="store_true", help="Detect and ignore duplicates (by checking 'sn' header)") +op.add_option("--verify-sequence", default=False, action="store_true", help="Verify there are no gaps in the message sequence (by checking 'sn' header)") +op.add_option("--check-redelivered", default=False, action="store_true", help="Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") +op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue") +op.add_option("--ack-frequency", default=100, type="int", help="Ack frequency (0 implies none of the messages will get accepted)") +op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)") +op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)") +op.add_option("--print-content", type="str", default="yes", help="print out message content") +op.add_option("--print-headers", type="str", default="no", help="print out message headers") +op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover") +op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics") +op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages") +op.add_option("--report-header", type="str", default="yes", help="Headers on report") +op.add_option("--ready-address", type="str", help="send a message to this address when ready to receive") +op.add_option("--receive-rate", default=0, type="int", help="Receive at rate of N messages/second. 0 means receive as fast as possible") +#op.add_option("--help", default=False, action="store_true", help="print this usage statement") + +def getTimeout(timeout, forever): + if forever: + return None + else: + return SECOND*timeout + + +EOS = "eos" +SN = "sn" + +# Check for duplicate or dropped messages by sequence number +class SequenceTracker: + def __init__(self, opts): + self.opts = opts + self.lastSn = 0 + + # Return True if the message should be procesed, false if it should be ignored. + def track(self, message): + if not(self.opts.verify_sequence) or (self.opts.ignore_duplicates): + return True + sn = message.properties[SN] + duplicate = (sn <= lastSn) + dropped = (sn > lastSn+1) + if self.opts.verify_sequence and dropped: + raise Exception("Gap in sequence numbers %s-%s" %(lastSn, sn)) + ignore = (duplicate and self.opts.ignore_duplicates) + if ignore and self.opts.check_redelivered and (not msg.redelivered): + raise Exception("duplicate sequence number received, message not marked as redelivered!") + if not duplicate: + lastSn = sn + return (not(ignore)) + + +def main(): + opts, args = op.parse_args() + if not opts.address: + raise Exception("Address must be specified!") + + broker = opts.broker + address = opts.address + connection = Connection(opts.broker, **opts.connection_options) + + try: + connection.open() + if opts.failover_updates: + auto_fetch_reconnect_urls(connection) + session = connection.session(transactional=(opts.tx)) + receiver = session.receiver(opts.address) + if opts.capacity > 0: + receiver.capacity = opts.capacity + msg = Message() + count = 0 + txCount = 0 + sequenceTracker = SequenceTracker(opts) + timeout = getTimeout(opts.timeout, opts.forever) + done = False + stats = statistics.ThroughputAndLatency() + reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats) + + if opts.ready_address is not None: + session.sender(opts.ready_address).send(msg) + if opts.tx > 0: + session.commit() + # For receive rate calculation + start = time.time()*TIME_SEC + interval = 0 + if opts.receive_rate > 0: + interval = TIME_SEC / opts.receive_rate + + replyTo = {} # a dictionary of reply-to address -> sender mapping + + while (not done): + try: + msg = receiver.fetch(timeout=timeout) + reporter.message(msg) + if sequenceTracker.track(msg): + if msg.content == EOS: + done = True + else: + count+=1 + if opts.print_headers == "yes": + if msg.subject is not None: + print "Subject: %s" %msg.subject + if msg.reply_to is not None: + print "ReplyTo: %s" %msg.reply_to + if msg.correlation_id is not None: + print "CorrelationId: %s" %msg.correlation_id + if msg.user_id is not None: + print "UserId: %s" %msg.user_id + if msg.ttl is not None: + print "TTL: %s" %msg.ttl + if msg.priority is not None: + print "Priority: %s" %msg.priority + if msg.durable: + print "Durable: true" + if msg.redelivered: + print "Redelivered: true" + print "Properties: %s" %msg.properties + print + if opts.print_content == "yes": + print msg.content + if (opts.messages > 0) and (count >= opts.messages): + done = True + # end of "if sequenceTracker.track(msg):" + if (opts.tx > 0) and (count % opts.tx == 0): + txCount+=1 + if (opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0): + session.rollback() + else: + session.commit() + elif (opts.ack_frequency > 0) and (count % opts.ack_frequency == 0): + session.acknowledge() + if msg.reply_to is not None: # Echo message back to reply-to address. + if msg.reply_to not in replyTo: + replyTo[msg.reply_to] = session.sender(msg.reply_to) + replyTo[msg.reply_to].capacity = opts.capacity + replyTo[msg.reply_to].send(msg) + if opts.receive_rate > 0: + delay = start + count*interval - time.time()*TIME_SEC + if delay > 0: + time.sleep(delay) + # Clear out message properties & content for next iteration. + msg = Message() + except Empty: # no message fetched => break the while cycle + break + # end of while cycle + if opts.report_total: + reporter.report() + if opts.tx > 0: + txCount+=1 + if opts.rollback_frequency and (txCount % opts.rollback_frequency == 0): + session.rollback() + else: + session.commit() + else: + session.acknowledge() + session.close() + connection.close() + except Exception,e: + print e + connection.close() + +if __name__ == "__main__": main() diff --git a/qpid/tools/src/py/qpid-send b/qpid/tools/src/py/qpid-send new file mode 100755 index 0000000000..b0105e41a6 --- /dev/null +++ b/qpid/tools/src/py/qpid-send @@ -0,0 +1,281 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, random, os, time, uuid +from qpid.messaging import * +import statistics + +EOS = "eos" +SN = "sn" +TS = "ts" + +TIME_SEC = 1000000000 +SECOND = 1000 + +def nameval(st): + idx = st.find("=") + if idx >= 0: + name = st[0:idx] + value = st[idx+1:] + else: + name = st + value = None + return name, value + + +op = optparse.OptionParser(usage="usage: %prog [options]", description="Spouts messages to the specified address") +op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to") +op.add_option("-a", "--address", type="str", help="address to send to") +op.add_option("--connection-options", default={}, help="options for the connection") +op.add_option("-m", "--messages", default=1, type="int", help="stop after N messages have been sent, 0 means no limit") +op.add_option("-i", "--id", type="str", help="use the supplied id instead of generating one") +op.add_option("--reply-to", type="str", help="specify reply-to address") +op.add_option("--send-eos", default=0, type="int", help="Send N EOS messages to mark end of input") +op.add_option("--durable", default=False, action="store_true", help="Mark messages as durable") +op.add_option("--ttl", default=0, type="int", help="Time-to-live for messages, in milliseconds") +op.add_option("--priority", default=0, type="int", help="Priority for messages (higher value implies higher priority)") +op.add_option("-P", "--property", default=[], action="append", type="str", help="specify message property") +op.add_option("--correlation-id", type="str", help="correlation-id for message") +op.add_option("--user-id", type="str", help="userid for message") +op.add_option("--content-string", type="str", help="use CONTENT as message content") +op.add_option("--content-size", default=0, type="int", help="create an N-byte message content") +op.add_option("-M", "--content-map", default=[], action="append", type="str", help="specify entry for map content") +op.add_option("--content-stdin", default=False, action="store_true", help="read message content from stdin, one line per message") +op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue") +op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)") +op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)") +op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover") +op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics") +op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages") +op.add_option("--report-header", type="str", default="yes", help="Headers on report") +op.add_option("--send-rate", default=0, type="int", help="Send at rate of N messages/second. 0 means send as fast as possible") +op.add_option("--flow-control", default=0, type="int", help="Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.") +op.add_option("--sequence", type="str", default="yes", help="Add a sequence number messages property (required for duplicate/lost message detection)") +op.add_option("--timestamp", type="str", default="yes", help="Add a time stamp messages property (required for latency measurement)") +op.add_option("--group-key", type="str", help="Generate groups of messages using message header 'KEY' to hold the group identifier") +op.add_option("--group-prefix", default="GROUP-", type="str", help="Generate group identifers with 'STRING' prefix (if group-key specified)") +op.add_option("--group-size", default=10, type="int", help="Number of messages per a group (if group-key specified)") +op.add_option("--group-randomize-size", default=False, action="store_true", help="Randomize the number of messages per group to [1...group-size] (if group-key specified)") +op.add_option("--group-interleave", default=1, type="int", help="Simultaineously interleave messages from N different groups (if group-key specified)") + + +class ContentGenerator: + def setContent(self, msg): + return + +class GetlineContentGenerator(ContentGenerator): + def setContent(self, msg): + content = sys.stdin.readline() + got = (not line) + if (got): + msg.content = content + return got + +class FixedContentGenerator(ContentGenerator): + def __init__(self, content=None): + self.content = content + + def setContent(self, msg): + msg.content = self.content + return True + +class MapContentGenerator(ContentGenerator): + def __init__(self, opts=None): + self.opts = opts + + def setContent(self, msg): + self.content = {} + for e in self.opts.content_map: + name, val = nameval(p) + content[name] = val + msg.content = self.content + return True + + +# tag each generated message with a group identifer +class GroupGenerator: + def __init__(self, key, prefix, size, randomize, interleave): + groupKey = key + groupPrefix = prefix + groupSize = size + randomizeSize = randomize + groupSuffix = 0 + if (randomize > 0): + random.seed(os.getpid()) + + for i in range(0, interleave): + self.newGroup() + current = 0 + + def setGroupInfo(self, msg): + if (current == len(groups)): + current = 0 + my_group = groups[current] + msg.properties[groupKey] = my_group[id]; + # print "SENDING GROUPID=[%s]\n" % my_group[id] + my_group[count]=my_group[count]+1 + if (my_group[count] == my_group[size]): + self.newGroup() + del groups[current] + else: + current+=1 + + def newGroup(self): + groupId = "%s%s" % (groupPrefix, groupSuffix) + groupSuffix+=1 + size = groupSize + if (randomizeSize == True): + size = random.randint(1,groupSize) + # print "New group: GROUPID=["%s] size=%s" % (groupId, size) + groups.append({'id':groupId, 'size':size, 'count':0}) + + + +def main(): + opts, args = op.parse_args() + if not opts.address: + raise Exception("Address must be specified!") + + broker = opts.broker + address = opts.address + connection = Connection(opts.broker, **opts.connection_options) + + try: + connection.open() + if (opts.failover_updates): + auto_fetch_reconnect_urls(connection) + session = connection.session(transactional=(opts.tx)) + sender = session.sender(opts.address) + if (opts.capacity>0): + sender.capacity = opts.capacity + sent = 0 + txCount = 0 + stats = statistics.Throughput() + reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats) + + contentGen = ContentGenerator() + content = "" # auxiliary variable for determining content type of message - needs to be changed to {} for Map message + if opts.content_stdin: + opts.messages = 0 # Don't limit number of messages sent. + contentGen = GetlineContentGenerator() + elif opts.content_map is not None: + contentGen = MapContentGenerator(opts) + content = {} + elif opts.content_size is not None: + contentGen = FixedContentGenerator('X' * opts.content_size) + else: + contentGen = FixedContentGenerator(opts.content_string) + if opts.group_key is not None: + groupGen = GroupGenerator(opts.group_key, opts.group_prefix, opts.group_size, opts.group_random_size, opts.group_interleave) + + msg = Message(content=content) + msg.durable = opts.durable + if opts.ttl: + msg.ttl = opts.ttl/1000.0 + if opts.priority: + msg.priority = opts.priority + if opts.reply_to is not None: + if opts.flow_control > 0: + raise Exception("Can't use reply-to and flow-control together") + msg.reply_to = opts.reply_to + if opts.user_id is not None: + msg.user_id = opts.user_id + if opts.correlation_id is not None: + msg.correlation_id = opts.correlation_id + for p in opts.property: + name, val = nameval(p) + msg.properties[name] = val + + start = time.time()*TIME_SEC + interval = 0 + if opts.send_rate > 0: + interval = TIME_SEC/opts.send_rate + + flowControlAddress = "flow-" + str(uuid.uuid1()) + ";{create:always,delete:always}" + flowSent = 0 + if opts.flow_control > 0: + flowControlReceiver = session.receiver(flowControlAddress) + flowControlReceiver.capacity = 2 + + while (contentGen.setContent(msg) == True): + sent+=1 + if opts.sequence == "yes": + msg.properties[SN] = sent + + if opts.flow_control > 0: + if (sent % opts.flow_control == 0): + msg.reply_to = flowControlAddress + flowSent+=1 + else: + msg.reply_to = "" # Clear the reply address. + + if 'groupGen' in vars(): + groupGen.setGroupInfo(msg) + + if (opts.timestamp == "yes"): + msg.properties[TS] = int(time.time()*TIME_SEC) + sender.send(msg) + reporter.message(msg) + + if ((opts.tx > 0) and (sent % opts.tx == 0)): + txCount+=1 + if ((opts.rollbackFrequency > 0) and (txCount % opts.rollbackFrequency == 0)): + session.rollback() + else: + session.commit() + if ((opts.messages > 0) and (sent >= opts.messages)): + break + + if (opts.flow_control > 0) and (flowSent == 2): + flowControlReceiver.fetch(timeout=SECOND) + flowSent -= 1 + + if (opts.send_rate > 0): + delay = start + sent*interval - time.time()*TIME_SEC + if (delay > 0): + time.sleep(delay) + #end of while + + while flowSent > 0: + flowControlReceiver.fetch(timeout=SECOND) + flowSent -= 1 + + if (opts.report_total): + reporter.report() + for i in reversed(range(1,opts.send_eos+1)): + if (opts.sequence == "yes"): + sent+=1 + msg.properties[SN] = sent + msg.properties[EOS] = True #TODO (also in C++ client): add in ability to send digest or similar + sender.send(msg) + if ((opts.tx > 0) and (sent % opts.tx == 0)): + txCount+=1 + if ((opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0)): + session.rollback() + else: + session.commit() + session.sync() + session.close() + connection.close() + except Exception,e: + print e + connection.close() + +if __name__ == "__main__": main() -- cgit v1.2.1