summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/MessagingSessionTests.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/MessagingSessionTests.cpp')
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp89
1 files changed, 89 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index b388f2c13a..5cc595c56f 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -1289,6 +1289,95 @@ QPID_AUTO_TEST_CASE(testSimpleRequestResponse)
BOOST_CHECK_EQUAL(m.getSubject(), original.getSubject());
}
+QPID_AUTO_TEST_CASE(testSelfDestructQueue)
+{
+ MessagingFixture fix;
+ Session other = fix.connection.createSession();
+ Receiver r1 = other.createReceiver("amq.fanout; {link:{reliability:at-least-once, x-declare:{arguments:{qpid.max_count:10,qpid.policy_type:self-destruct}}}}");
+ Receiver r2 = fix.session.createReceiver("amq.fanout");
+ //send request
+ Sender s = fix.session.createSender("amq.fanout");
+ for (uint i = 0; i < 20; ++i) {
+ s.send(Message((boost::format("MSG_%1%") % (i+1)).str()));
+ }
+ try {
+ ScopedSuppressLogging sl;
+ for (uint i = 0; i < 20; ++i) {
+ r1.fetch(Duration::SECOND);
+ }
+ BOOST_FAIL("Expected exception.");
+ } catch (const qpid::messaging::MessagingException&) {
+ }
+
+ for (uint i = 0; i < 20; ++i) {
+ BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testReroutingRingQueue)
+{
+ MessagingFixture fix;
+ Receiver r1 = fix.session.createReceiver("my-queue; {create:always, node:{x-declare:{alternate-exchange:amq.fanout, auto-delete:True, arguments:{qpid.max_count:10,qpid.policy_type:ring}}}}");
+ Receiver r2 = fix.session.createReceiver("amq.fanout");
+
+ Sender s = fix.session.createSender("my-queue");
+ for (uint i = 0; i < 20; ++i) {
+ s.send(Message((boost::format("MSG_%1%") % (i+1)).str()));
+ }
+ for (uint i = 10; i < 20; ++i) {
+ BOOST_CHECK_EQUAL(r1.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+ }
+ for (uint i = 0; i < 10; ++i) {
+ BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testReleaseOnPriorityQueue)
+{
+ MessagingFixture fix;
+ std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.priorities:10}}}}");
+ std::string text("my message");
+ Sender sender = fix.session.createSender(queue);
+ sender.send(Message(text));
+ Receiver receiver = fix.session.createReceiver(queue);
+ Message msg;
+ for (uint i = 0; i < 10; ++i) {
+ if (receiver.fetch(msg, Duration::SECOND)) {
+ BOOST_CHECK_EQUAL(msg.getContent(), text);
+ fix.session.release(msg);
+ } else {
+ BOOST_FAIL("Released message not redelivered as expected.");
+ }
+ }
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testRollbackWithFullPrefetch)
+{
+ QueueFixture fix;
+ std::string first("first");
+ std::string second("second");
+ Sender sender = fix.session.createSender(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ sender.send(Message((boost::format("MSG_%1%") % (i+1)).str()));
+ }
+ Session txsession = fix.connection.createTransactionalSession();
+ Receiver receiver = txsession.createReceiver(fix.queue);
+ receiver.setCapacity(9);
+ Message msg;
+ for (uint i = 0; i < 10; ++i) {
+ if (receiver.fetch(msg, Duration::SECOND)) {
+ BOOST_CHECK_EQUAL(msg.getContent(), std::string("MSG_1"));
+ txsession.rollback();
+ } else {
+ BOOST_FAIL("Released message not redelivered as expected.");
+ break;
+ }
+ }
+ txsession.acknowledge();
+ txsession.commit();
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests