summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-09-14 20:39:05 +0000
committerRafael H. Schloming <rhs@apache.org>2007-09-14 20:39:05 +0000
commit97a264479653731473dc6ae3389458357c876ff5 (patch)
treee7070833ef33d54c879d8899f903b580d8320624 /java/client
parent942379b3196bd37817c52e0fd08cb3c0efff234e (diff)
downloadqpid-python-97a264479653731473dc6ae3389458357c876ff5.tar.gz
Merged revision 572751 from the trunk, this fixes QPID-573.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@575788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java24
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java41
2 files changed, 37 insertions, 28 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index bdebc8e50a..140eaa7c1c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -108,6 +108,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
*
@@ -220,6 +221,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
+ /**
+ * Holds the highest received delivery tag.
+ */
+ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+
/** Holds the dispatcher thread for this session. */
private Dispatcher _dispatcher;
@@ -1278,6 +1284,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
+ _highestDeliveryTag.set(message.getDeliverBody().deliveryTag);
_queue.add(message);
}
}
@@ -2558,6 +2565,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
+ private final AtomicLong _rollbackMark = new AtomicLong(-1);
public Dispatcher()
{
@@ -2614,7 +2622,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
setConnectionStopped(true);
}
- rejectAllMessages(true);
+ _rollbackMark.set(_highestDeliveryTag.get());
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
@@ -2650,7 +2658,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Allow disptacher to start stopped
synchronized (_lock)
{
- while (connectionStopped())
+ while (!_closed.get() && connectionStopped())
{
try
{
@@ -2675,14 +2683,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_lock.wait();
}
- synchronized (_messageDeliveryLock)
+ if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
{
- dispatchMessage(message);
+ rejectMessage(message, true);
}
-
- while (connectionStopped())
+ else
{
- _lock.wait();
+ synchronized (_messageDeliveryLock)
+ {
+ dispatchMessage(message);
+ }
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 929621c496..678474a18b 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -171,34 +171,33 @@ public class TransactedTest extends TestCase
public void testRollback() throws Exception
{
// add some messages
- _logger.info("Send prep A");
- prepProducer1.send(prepSession.createTextMessage("A"));
- _logger.info("Send prep B");
- prepProducer1.send(prepSession.createTextMessage("B"));
- _logger.info("Send prep C");
- prepProducer1.send(prepSession.createTextMessage("C"));
-
- // Quick sleep to ensure all three get pre-fetched
+ _logger.info("Send prep RB_A");
+ prepProducer1.send(prepSession.createTextMessage("RB_A"));
+ _logger.info("Send prep RB_B");
+ prepProducer1.send(prepSession.createTextMessage("RB_B"));
+ _logger.info("Send prep RB_C");
+ prepProducer1.send(prepSession.createTextMessage("RB_C"));
+
+ _logger.info("Sending RB_X RB_Y RB_Z");
+ producer2.send(session.createTextMessage("RB_X"));
+ producer2.send(session.createTextMessage("RB_Y"));
+ producer2.send(session.createTextMessage("RB_Z"));
+ _logger.info("Receiving RB_A RB_B");
+ expect("RB_A", consumer1.receive(1000));
+ expect("RB_B", consumer1.receive(1000));
+ // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
+ // Quick sleep to ensure 'RB_C' gets pre-fetched
Thread.sleep(500);
- _logger.info("Sending X Y Z");
- producer2.send(session.createTextMessage("X"));
- producer2.send(session.createTextMessage("Y"));
- producer2.send(session.createTextMessage("Z"));
- _logger.info("Receiving A B");
- expect("A", consumer1.receive(1000));
- expect("B", consumer1.receive(1000));
- // Don't consume 'C' leave it in the prefetch cache to ensure rollback removes it.
-
// rollback
_logger.info("rollback");
session.rollback();
- _logger.info("Receiving A B C");
+ _logger.info("Receiving RB_A RB_B RB_C");
// ensure sent messages are not visible and received messages are requeued
- expect("A", consumer1.receive(1000), true);
- expect("B", consumer1.receive(1000), true);
- expect("C", consumer1.receive(1000), true);
+ expect("RB_A", consumer1.receive(1000), true);
+ expect("RB_B", consumer1.receive(1000), true);
+ expect("RB_C", consumer1.receive(1000), true);
_logger.info("Starting new connection");
testCon.start();