diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-10-05 10:39:54 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-10-05 10:39:54 +0000 |
| commit | e2f7ab3c93a74e4a9e9ad29d24a3811e41396fa4 (patch) | |
| tree | b2e580b48d42676a5883356624ec9402ce9ed4d7 /java/client/src | |
| parent | 5c2922a259b7980b54059ac2f94de58ab724d064 (diff) | |
| download | qpid-python-e2f7ab3c93a74e4a9e9ad29d24a3811e41396fa4.tar.gz | |
QPID-624: Update to ensure all errors are correctly processed in BlockingMethodFrameListener.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@582201 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java | 84 |
1 files changed, 61 insertions, 23 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index c64f46ba23..0c59188de1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -78,14 +78,22 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener /** This flag is used to indicate that the blocked for method has been received. */ private volatile boolean _ready = false; + /** This flag is used to indicate that the received error has been processed. */ + private volatile boolean _errorAck = false; + /** Used to protect the shared event and ready flag between the producer and consumer. */ private final ReentrantLock _lock = new ReentrantLock(); - + /** * Used to signal that a method has been received */ private final Condition _receivedCondition = _lock.newCondition(); + /** + * Used to signal that a error has been processed + */ + private final Condition _errorConditionAck = _lock.newCondition(); + /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ private volatile Exception _error; @@ -142,7 +150,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener _ready = ready; _receivedCondition.signal(); } - finally + finally { _lock.unlock(); } @@ -174,13 +182,15 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException { long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout); - + _lock.lock(); + try { while (!_ready) { - try { + try + { if (timeout == -1) { _receivedCondition.await(); @@ -195,7 +205,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener _ready = true; } } - } + } catch (InterruptedException e) { // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess @@ -206,29 +216,34 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener // } } } + + + if (_error != null) + { + if (_error instanceof AMQException) + { + throw (AMQException) _error; + } + else if (_error instanceof FailoverException) + { + // This should ensure that FailoverException is not wrapped and can be caught. + throw (FailoverException) _error; // needed to expose FailoverException. + } + else + { + throw new AMQException("Woken up due to " + _error.getClass(), _error); + } + } + } finally { + _errorAck = true; + _errorConditionAck.signal(); + _error = null; _lock.unlock(); } - if (_error != null) - { - if (_error instanceof AMQException) - { - throw (AMQException) _error; - } - else if (_error instanceof FailoverException) - { - // This should ensure that FailoverException is not wrapped and can be caught. - throw (FailoverException) _error; // needed to expose FailoverException. - } - else - { - throw new AMQException("Woken up due to " + _error.getClass(), _error); - } - } - return _doneEvt; } @@ -242,13 +257,36 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener { // set the error so that the thread that is blocking (against blockForFrame()) // can pick up the exception and rethrow to the caller - _error = e; + _lock.lock(); + + if (_error == null) + { + _error = e; + } + else + { + System.err.println("WARNING: new error arrived while old one not yet processed"); + } + try { _ready = true; _receivedCondition.signal(); + + while (!_errorAck) + { + try + { + _errorConditionAck.await(); + } + catch (InterruptedException e1) + { + // + } + } + _errorAck = false; } finally { |
