summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-10-10 15:47:38 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-10-10 15:47:38 +0000
commit7faf99208ff1c2578db48d2770b1611a90bedf68 (patch)
tree0c86a35df560024315f67821e69b1c0319c26ec3 /java
parentd87738d772276138fb20427558701febf455f510 (diff)
downloadqpid-python-7faf99208ff1c2578db48d2770b1611a90bedf68.tar.gz
Merged revisions 583518 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ........ r583518 | rupertlssmith | 2007-10-10 16:45:56 +0100 (Wed, 10 Oct 2007) | 1 line Changed maxPending to be by message correlation id. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@583520 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java37
1 files changed, 21 insertions, 16 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 70e548d613..5e1f35053a 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -419,15 +419,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
*/
protected int _maxPendingSize;
- /**
- * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
- * to wait until the number of unreceived message is reduced before continuing to send.
- */
- protected static final Object _sendPauseMonitor = new Object();
-
- /** Keeps a count of the number of message currently sent but not received. */
- protected static AtomicInteger _unreceived = new AtomicInteger(0);
-
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -898,7 +889,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
if ((_maxPendingSize > 0))
{
// Decrement the count of sent but not yet received messages.
- int unreceived = _unreceived.decrementAndGet();
+ int unreceived = perCorrelationId._unreceived.decrementAndGet();
int unreceivedSize =
(unreceived * ((_messageSize == 0) ? 1 : _messageSize))
/ (_isPubSub ? getConsumersPerDestination() : 1);
@@ -906,11 +897,11 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// log.debug("unreceived = " + unreceived);
// log.debug("unreceivedSize = " + unreceivedSize);
- synchronized (_sendPauseMonitor)
+ synchronized (perCorrelationId._sendPauseMonitor)
{
if (unreceivedSize < _maxPendingSize)
{
- _sendPauseMonitor.notifyAll();
+ perCorrelationId._sendPauseMonitor.notify();
}
}
}
@@ -1169,10 +1160,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// Prompt the user to kill the broker when doing failover testing.
_failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
+ // Get the test setup for the correlation id.
+ String correlationID = message.getJMSCorrelationID();
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
+
// If necessary, wait until the max pending message size comes within its limit.
if (_maxPendingSize > 0)
{
- synchronized (_sendPauseMonitor)
+ synchronized (perCorrelationId._sendPauseMonitor)
{
// Used to keep track of the number of times that send has to wait.
int numWaits = 0;
@@ -1184,7 +1179,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
while (true)
{
// Get the size estimate of sent but not yet received messages.
- int unreceived = _unreceived.get();
+ int unreceived = perCorrelationId._unreceived.get();
int unreceivedSize =
(unreceived * ((_messageSize == 0) ? 1 : _messageSize))
/ (_isPubSub ? getConsumersPerDestination() : 1);
@@ -1212,7 +1207,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
try
{
long start = System.nanoTime();
- _sendPauseMonitor.wait(10000);
+ perCorrelationId._sendPauseMonitor.wait(10000);
long end = System.nanoTime();
// Count the wait only if it was for > 99% of the requested wait time.
@@ -1255,7 +1250,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// in pub/sub mode.
if (_maxPendingSize > 0)
{
- int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
+ int newUnreceivedCount =
+ perCorrelationId._unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
// log.debug("newUnreceivedCount = " + newUnreceivedCount);
}
@@ -1676,5 +1672,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** Holds the last timestamp that the timeout was reset to. */
Long timeOutStart;
+
+ /**
+ * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
+ * to wait until the number of unreceived message is reduced before continuing to send.
+ */
+ final Object _sendPauseMonitor = new Object();
+
+ /** Keeps a count of the number of message currently sent but not received. */
+ AtomicInteger _unreceived = new AtomicInteger(0);
}
}