summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-10-05 10:39:54 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-10-05 10:39:54 +0000
commite2f7ab3c93a74e4a9e9ad29d24a3811e41396fa4 (patch)
treeb2e580b48d42676a5883356624ec9402ce9ed4d7 /java/client/src
parent5c2922a259b7980b54059ac2f94de58ab724d064 (diff)
downloadqpid-python-e2f7ab3c93a74e4a9e9ad29d24a3811e41396fa4.tar.gz
QPID-624: Update to ensure all errors are correctly processed in BlockingMethodFrameListener.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@582201 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java84
1 files changed, 61 insertions, 23 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index c64f46ba23..0c59188de1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -78,14 +78,22 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
/** This flag is used to indicate that the blocked for method has been received. */
private volatile boolean _ready = false;
+ /** This flag is used to indicate that the received error has been processed. */
+ private volatile boolean _errorAck = false;
+
/** Used to protect the shared event and ready flag between the producer and consumer. */
private final ReentrantLock _lock = new ReentrantLock();
-
+
/**
* Used to signal that a method has been received
*/
private final Condition _receivedCondition = _lock.newCondition();
+ /**
+ * Used to signal that a error has been processed
+ */
+ private final Condition _errorConditionAck = _lock.newCondition();
+
/** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
private volatile Exception _error;
@@ -142,7 +150,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
_ready = ready;
_receivedCondition.signal();
}
- finally
+ finally
{
_lock.unlock();
}
@@ -174,13 +182,15 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
{
long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
-
+
_lock.lock();
+
try
{
while (!_ready)
{
- try {
+ try
+ {
if (timeout == -1)
{
_receivedCondition.await();
@@ -195,7 +205,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
_ready = true;
}
}
- }
+ }
catch (InterruptedException e)
{
// IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
@@ -206,29 +216,34 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
// }
}
}
+
+
+ if (_error != null)
+ {
+ if (_error instanceof AMQException)
+ {
+ throw (AMQException) _error;
+ }
+ else if (_error instanceof FailoverException)
+ {
+ // This should ensure that FailoverException is not wrapped and can be caught.
+ throw (FailoverException) _error; // needed to expose FailoverException.
+ }
+ else
+ {
+ throw new AMQException("Woken up due to " + _error.getClass(), _error);
+ }
+ }
+
}
finally
{
+ _errorAck = true;
+ _errorConditionAck.signal();
+ _error = null;
_lock.unlock();
}
- if (_error != null)
- {
- if (_error instanceof AMQException)
- {
- throw (AMQException) _error;
- }
- else if (_error instanceof FailoverException)
- {
- // This should ensure that FailoverException is not wrapped and can be caught.
- throw (FailoverException) _error; // needed to expose FailoverException.
- }
- else
- {
- throw new AMQException("Woken up due to " + _error.getClass(), _error);
- }
- }
-
return _doneEvt;
}
@@ -242,13 +257,36 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
// set the error so that the thread that is blocking (against blockForFrame())
// can pick up the exception and rethrow to the caller
- _error = e;
+
_lock.lock();
+
+ if (_error == null)
+ {
+ _error = e;
+ }
+ else
+ {
+ System.err.println("WARNING: new error arrived while old one not yet processed");
+ }
+
try
{
_ready = true;
_receivedCondition.signal();
+
+ while (!_errorAck)
+ {
+ try
+ {
+ _errorConditionAck.await();
+ }
+ catch (InterruptedException e1)
+ {
+ //
+ }
+ }
+ _errorAck = false;
}
finally
{