From fce2eb5dbe35cdbf44e634a4dcc329540461b96b Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 14 Jun 2010 13:44:00 +0000 Subject: Rename tests qpid_* to qpid-* for consistency. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@954471 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 237 ++++++++++++++++++++++++++++++++++++ 1 file changed, 237 insertions(+) create mode 100644 qpid/cpp/src/tests/qpid-receive.cpp (limited to 'qpid/cpp/src/tests/qpid-receive.cpp') diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp new file mode 100644 index 0000000000..a0394ccd21 --- /dev/null +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -0,0 +1,237 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "TestOptions.h" +#include "Statistics.h" + +#include +#include + +using namespace qpid::messaging; +using namespace qpid::types; +using namespace std; + +namespace qpid { +namespace tests { + +struct Options : public qpid::Options +{ + bool help; + std::string url; + std::string address; + std::string connectionOptions; + int64_t timeout; + bool forever; + uint messages; + bool ignoreDuplicates; + uint capacity; + uint ackFrequency; + uint tx; + uint rollbackFrequency; + bool printContent; + bool printHeaders; + bool failoverUpdates; + qpid::log::Options log; + bool reportTotal; + uint reportEvery; + bool reportHeader; + string readyAddress; + + Options(const std::string& argv0=std::string()) + : qpid::Options("Options"), + help(false), + url("amqp:tcp:127.0.0.1"), + timeout(0), + forever(false), + messages(0), + ignoreDuplicates(false), + capacity(1000), + ackFrequency(100), + tx(0), + rollbackFrequency(0), + printContent(true), + printHeaders(false), + failoverUpdates(false), + log(argv0), + reportTotal(false), + reportEvery(0), + reportHeader(true) + { + addOptions() + ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") + ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from") + ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") + ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting") + ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") + ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") + ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") + ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") + ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") + ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") + ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") + ("print-content", qpid::optValue(printContent, "yes|no"), "print out message content") + ("print-headers", qpid::optValue(printHeaders, "yes|no"), "print out message headers") + ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") + ("report-total", qpid::optValue(reportTotal), "Report total throughput and latency statistics") + ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") + ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), + "send a message to this address when ready to receive") + ("help", qpid::optValue(help), "print this usage statement"); + add(log); + } + + Duration getTimeout() + { + if (forever) return Duration::FOREVER; + else return Duration::SECOND*timeout; + + } + bool parse(int argc, char** argv) + { + try { + qpid::Options::parse(argc, argv); + if (address.empty()) throw qpid::Exception("Address must be specified!"); + qpid::log::Logger::instance().configure(log); + if (help) { + std::ostringstream msg; + std::cout << msg << *this << std::endl << std::endl + << "Drains messages from the specified address" << std::endl; + return false; + } else { + return true; + } + } catch (const std::exception& e) { + std::cerr << *this << std::endl << std::endl << e.what() << std::endl; + return false; + } + } +}; + +const string EOS("eos"); +const string SN("sn"); + +class SequenceTracker +{ + uint lastSn; + public: + SequenceTracker() : lastSn(0) {} + + bool isDuplicate(Message& message) + { + uint sn = message.getProperties()[SN]; + if (lastSn < sn) { + lastSn = sn; + return false; + } else { + return true; + } + } +}; + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char ** argv) +{ + Connection connection; + try { + Options opts; + if (opts.parse(argc, argv)) { + connection = Connection(opts.url, opts.connectionOptions); + connection.open(); + std::auto_ptr updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); + Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); + Receiver receiver = session.createReceiver(opts.address); + receiver.setCapacity(opts.capacity); + Message msg; + uint count = 0; + uint txCount = 0; + SequenceTracker sequenceTracker; + Duration timeout = opts.getTimeout(); + bool done = false; + Reporter reporter(std::cout, opts.reportEvery, opts.reportHeader); + if (!opts.readyAddress.empty()) + session.createSender(opts.readyAddress).send(msg); + while (!done && receiver.fetch(msg, timeout)) { + reporter.message(msg); + if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { + if (msg.getContent() == EOS) { + done = true; + } else { + ++count; + if (opts.printHeaders) { + if (msg.getSubject().size()) std::cout << "Subject: " << msg.getSubject() << std::endl; + if (msg.getReplyTo()) std::cout << "ReplyTo: " << msg.getReplyTo() << std::endl; + if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl; + if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl; + if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl; + if (msg.getDurable()) std::cout << "Durable: true" << std::endl; + if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl; + std::cout << "Properties: " << msg.getProperties() << std::endl; + std::cout << std::endl; + } + if (opts.printContent) + std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages + if (opts.messages && count >= opts.messages) done = true; + } + } + if (opts.tx && (count % opts.tx == 0)) { + if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { + session.rollback(); + } else { + session.commit(); + } + } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { + session.acknowledge(); + } + //opts.rejectFrequency?? + } + if (opts.reportTotal) reporter.report(); + if (opts.tx) { + if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { + session.rollback(); + } else { + session.commit(); + } + } else { + session.acknowledge(); + } + session.close(); + connection.close(); + return 0; + } + } catch(const std::exception& error) { + std::cerr << "Failure: " << error.what() << std::endl; + connection.close(); + return 1; + } +} -- cgit v1.2.1 From b7323d923cfddd71e8efd5c16055239263fbef18 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 30 Sep 2010 20:06:03 +0000 Subject: Extending qpid-cpp-benchmark for cluster testing - multiple --broker args have senders/receivers connect to different cluster nodes. - multiple --client-host args start clients on different hosts via ssh. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1003228 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src/tests/qpid-receive.cpp') diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index a0394ccd21..c8bb58ac54 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -102,8 +102,8 @@ struct Options : public qpid::Options ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") ("report-total", qpid::optValue(reportTotal), "Report total throughput and latency statistics") ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") - ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), - "send a message to this address when ready to receive") + ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") + ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive") ("help", qpid::optValue(help), "print this usage statement"); add(log); } -- cgit v1.2.1 From f1e85512a58830985d6364c032f398a61608dc62 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 9 Nov 2010 22:14:45 +0000 Subject: Added --receive-rate to qpid-recieve to allow simulation of a slow receiver. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1033264 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) (limited to 'qpid/cpp/src/tests/qpid-receive.cpp') diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index c8bb58ac54..823756268c 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -29,6 +29,7 @@ #include #include #include +#include "qpid/sys/Time.h" #include "TestOptions.h" #include "Statistics.h" @@ -64,6 +65,7 @@ struct Options : public qpid::Options uint reportEvery; bool reportHeader; string readyAddress; + uint receiveRate; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -83,7 +85,8 @@ struct Options : public qpid::Options log(argv0), reportTotal(false), reportEvery(0), - reportHeader(true) + reportHeader(true), + receiveRate(0) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -104,6 +107,7 @@ struct Options : public qpid::Options ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive") + ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -181,7 +185,14 @@ int main(int argc, char ** argv) Reporter reporter(std::cout, opts.reportEvery, opts.reportHeader); if (!opts.readyAddress.empty()) session.createSender(opts.readyAddress).send(msg); + + uint received = 0; + qpid::sys::AbsTime start = qpid::sys::now(); + int64_t interval = 0; + if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; + while (!done && receiver.fetch(msg, timeout)) { + ++received; reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { if (msg.getContent() == EOS) { @@ -213,6 +224,11 @@ int main(int argc, char ** argv) } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { session.acknowledge(); } + if (opts.receiveRate) { + qpid::sys::AbsTime waitTill(start, received*interval); + int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); + if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); + } //opts.rejectFrequency?? } if (opts.reportTotal) reporter.report(); -- cgit v1.2.1 From 68c6150ada444683074e45d0738784d60aa03bce Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 10 Nov 2010 21:50:20 +0000 Subject: qpid-recieve --receive-rate: fixed calculation for duplicate messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1033739 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'qpid/cpp/src/tests/qpid-receive.cpp') diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 823756268c..a85d882a0f 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -185,14 +185,13 @@ int main(int argc, char ** argv) Reporter reporter(std::cout, opts.reportEvery, opts.reportHeader); if (!opts.readyAddress.empty()) session.createSender(opts.readyAddress).send(msg); - - uint received = 0; + + // For receive rate calculation qpid::sys::AbsTime start = qpid::sys::now(); int64_t interval = 0; if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; while (!done && receiver.fetch(msg, timeout)) { - ++received; reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { if (msg.getContent() == EOS) { @@ -225,7 +224,7 @@ int main(int argc, char ** argv) session.acknowledge(); } if (opts.receiveRate) { - qpid::sys::AbsTime waitTill(start, received*interval); + qpid::sys::AbsTime waitTill(start, count*interval); int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); } -- cgit v1.2.1 From 79df0d0ea3a9dee79c05de0f61aa8699d2a4fa0f Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 14 Dec 2010 21:30:55 +0000 Subject: Add end-to-end flow control to qpid-send, qpid-receive and qpid-cpp-benchmark. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1049286 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'qpid/cpp/src/tests/qpid-receive.cpp') diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index a85d882a0f..9b84306605 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -191,6 +191,9 @@ int main(int argc, char ** argv) int64_t interval = 0; if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; + Address replyToAddress; + Sender replyToSender; + while (!done && receiver.fetch(msg, timeout)) { reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { @@ -223,12 +226,21 @@ int main(int argc, char ** argv) } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { session.acknowledge(); } + if (msg.getReplyTo()) { // Echo message back to reply-to address. + if (msg.getReplyTo() != replyToAddress) { + replyToSender = session.createSender(msg.getReplyTo()); + replyToSender.setCapacity(opts.capacity); + replyToAddress = msg.getReplyTo(); + } + replyToSender.send(msg); + } if (opts.receiveRate) { qpid::sys::AbsTime waitTill(start, count*interval); int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); } - //opts.rejectFrequency?? + // Clear out message properties & content for next iteration. + msg = Message(); // TODO aconway 2010-12-01: should be done by fetch } if (opts.reportTotal) reporter.report(); if (opts.tx) { -- cgit v1.2.1 From 98fb574050919fb49291538867351f0644be3ae1 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 15 Dec 2010 18:10:12 +0000 Subject: Fix flow control for qpid-cpp-benchmark with multiple senders. Ensure senders & receivers agree on number of messages sent/received. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1049656 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'qpid/cpp/src/tests/qpid-receive.cpp') diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 9b84306605..28e229ca27 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -191,8 +191,7 @@ int main(int argc, char ** argv) int64_t interval = 0; if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; - Address replyToAddress; - Sender replyToSender; + std::map replyTo; while (!done && receiver.fetch(msg, timeout)) { reporter.message(msg); @@ -227,12 +226,12 @@ int main(int argc, char ** argv) session.acknowledge(); } if (msg.getReplyTo()) { // Echo message back to reply-to address. - if (msg.getReplyTo() != replyToAddress) { - replyToSender = session.createSender(msg.getReplyTo()); - replyToSender.setCapacity(opts.capacity); - replyToAddress = msg.getReplyTo(); + Sender& s = replyTo[msg.getReplyTo().str()]; + if (s.isNull()) { + s = session.createSender(msg.getReplyTo()); + s.setCapacity(opts.capacity); } - replyToSender.send(msg); + s.send(msg); } if (opts.receiveRate) { qpid::sys::AbsTime waitTill(start, count*interval); -- cgit v1.2.1 From 8b8d70e010a2999ad5dd1590d41eb35d8091296a Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 10 Feb 2011 10:12:41 +0000 Subject: QPID-529: Priority queue implementation QPID-2104: LVQ enhancement These both required some refactoring of the Queue class to allow cleaner implementation of different types of behaviour. The in-memory storage of messages is now abstracted out behind an interface specified by qpid::broker::Messages which qpid::broker::Queue uses. Different implementations of that are available for the standard FIFO queue, priority queues and LVQ (I have also separated out the 'legacy' implementation of LVQ from the new version driven by QPID-2104). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1069322 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'qpid/cpp/src/tests/qpid-receive.cpp') diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 28e229ca27..012d544a2e 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -206,6 +206,7 @@ int main(int argc, char ** argv) if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl; if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl; if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl; + if (msg.getPriority()) std::cout << "Priority: " << msg.getPriority() << std::endl; if (msg.getDurable()) std::cout << "Durable: true" << std::endl; if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl; std::cout << "Properties: " << msg.getProperties() << std::endl; -- cgit v1.2.1 From 47551f3aa2dd46b8daeeb9683a668464203ffa06 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Mon, 21 Feb 2011 21:11:23 +0000 Subject: QPID-3078: remote ambiguous -t short option, fix typo git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1073139 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/cpp/src/tests/qpid-receive.cpp') diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 012d544a2e..3189a13c6e 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -92,7 +92,7 @@ struct Options : public qpid::Options ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from") ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") - ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting") + ("timeout", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting") ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") -- cgit v1.2.1 From 667818f1417c742bc4c49465ef83562a693f0e49 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 25 Feb 2011 16:14:32 +0000 Subject: QPID-2999: set redelivered on replay git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1074611 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'qpid/cpp/src/tests/qpid-receive.cpp') diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 3189a13c6e..5a85da4fd2 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -53,6 +53,7 @@ struct Options : public qpid::Options bool forever; uint messages; bool ignoreDuplicates; + bool checkRedelivered; uint capacity; uint ackFrequency; uint tx; @@ -75,6 +76,7 @@ struct Options : public qpid::Options forever(false), messages(0), ignoreDuplicates(false), + checkRedelivered(false), capacity(1000), ackFrequency(100), tx(0), @@ -96,6 +98,7 @@ struct Options : public qpid::Options ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") + ("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") @@ -216,6 +219,8 @@ int main(int argc, char ** argv) std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages if (opts.messages && count >= opts.messages) done = true; } + } else if (opts.checkRedelivered && !msg.getRedelivered()) { + throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!"); } if (opts.tx && (count % opts.tx == 0)) { if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { -- cgit v1.2.1 From b0aace2bd3499ce31b9140629f69da5336c7dbda Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 14 Mar 2011 14:33:44 +0000 Subject: NO-JIRA: Minor improvements to test scripts qpid-cluster-benchmark and qpid-cpp-benchmark - qpid-cluster-benchmark: added command line options. - qpid-cpp-benchmark: clean up error handling, fixed a race condition. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1081396 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/cpp/src/tests/qpid-receive.cpp') diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 5a85da4fd2..9c713e872a 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -262,7 +262,7 @@ int main(int argc, char ** argv) return 0; } } catch(const std::exception& error) { - std::cerr << "Failure: " << error.what() << std::endl; + std::cerr << "qpid-receive: " << error.what() << std::endl; connection.close(); return 1; } -- cgit v1.2.1