diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-29 11:13:23 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-29 11:13:23 +0000 |
commit | 1825fa6a307f253e12c3c5632d026dd13942051e (patch) | |
tree | 664d4c99c2ec2f0facc7852153d13d941665c722 | |
parent | b92653bb73b575b0cff7eacce5cf8d71bd1bf2ea (diff) | |
download | qpid-python-1825fa6a307f253e12c3c5632d026dd13942051e.tar.gz |
QPID-313 : Patch supplied by Rob Godfrey - Call to attainState in makeBrokerConnection can miss the notification of state change.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@501011 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java | 61 |
1 files changed, 28 insertions, 33 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 1d871f7bc8..7f42faccb8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -55,6 +55,8 @@ public class AMQStateManager implements AMQMethodListener private final Map _state2HandlersMap = new HashMap(); private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); + private final Object _stateLock = new Object(); + private static final long MAXIMUM_STATE_WAIT_TIME = 30000l; public AMQStateManager() { @@ -127,17 +129,11 @@ public class AMQStateManager implements AMQMethodListener public void changeState(AMQState newState) throws AMQException { _logger.debug("State changing to " + newState + " from old state " + _currentState); - final AMQState oldState = _currentState; - _currentState = newState; - synchronized (_stateListeners) + synchronized (_stateLock) { - final Iterator it = _stateListeners.iterator(); - while (it.hasNext()) - { - final StateListener l = (StateListener) it.next(); - l.stateChanged(oldState, newState); - } + _currentState = newState; + _stateLock.notifyAll(); } } @@ -204,36 +200,35 @@ public class AMQStateManager implements AMQMethodListener } } - public void addStateListener(StateListener listener) - { - _logger.debug("Adding state listener"); - _stateListeners.add(listener); - } - - public void removeStateListener(StateListener listener) - { - _stateListeners.remove(listener); - } - public void attainState(AMQState s) throws AMQException + public void attainState(final AMQState s) throws AMQException { - boolean needToWait = false; - StateWaiter sw = null; - synchronized (_stateListeners) + synchronized(_stateLock) { - if (_currentState != s) + final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; + long waitTime = MAXIMUM_STATE_WAIT_TIME; + + while(_currentState != s && waitTime > 0) { - _logger.debug("Adding state wait to reach state " + s); - sw = new StateWaiter(s); - addStateListener(sw); - // we use a boolean since we must release the lock before starting to wait - needToWait = true; + try + { + _stateLock.wait(MAXIMUM_STATE_WAIT_TIME); + } + catch (InterruptedException e) + { + _logger.warn("Thread interrupted"); + } + if(_currentState != s) + { + waitTime = waitUntilTime - System.currentTimeMillis(); + } + } + if(_currentState != s) + { + throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); } } - if (needToWait) - { - sw.waituntilStateHasChanged(); - } + // at this point the state will have changed. } |