summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-29 11:13:23 +0000
committerRobert Greig <rgreig@apache.org>2007-01-29 11:13:23 +0000
commit1825fa6a307f253e12c3c5632d026dd13942051e (patch)
tree664d4c99c2ec2f0facc7852153d13d941665c722
parentb92653bb73b575b0cff7eacce5cf8d71bd1bf2ea (diff)
downloadqpid-python-1825fa6a307f253e12c3c5632d026dd13942051e.tar.gz
QPID-313 : Patch supplied by Rob Godfrey - Call to attainState in makeBrokerConnection can miss the notification of state change.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@501011 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java61
1 files changed, 28 insertions, 33 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 1d871f7bc8..7f42faccb8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -55,6 +55,8 @@ public class AMQStateManager implements AMQMethodListener
private final Map _state2HandlersMap = new HashMap();
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+ private final Object _stateLock = new Object();
+ private static final long MAXIMUM_STATE_WAIT_TIME = 30000l;
public AMQStateManager()
{
@@ -127,17 +129,11 @@ public class AMQStateManager implements AMQMethodListener
public void changeState(AMQState newState) throws AMQException
{
_logger.debug("State changing to " + newState + " from old state " + _currentState);
- final AMQState oldState = _currentState;
- _currentState = newState;
- synchronized (_stateListeners)
+ synchronized (_stateLock)
{
- final Iterator it = _stateListeners.iterator();
- while (it.hasNext())
- {
- final StateListener l = (StateListener) it.next();
- l.stateChanged(oldState, newState);
- }
+ _currentState = newState;
+ _stateLock.notifyAll();
}
}
@@ -204,36 +200,35 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public void addStateListener(StateListener listener)
- {
- _logger.debug("Adding state listener");
- _stateListeners.add(listener);
- }
-
- public void removeStateListener(StateListener listener)
- {
- _stateListeners.remove(listener);
- }
- public void attainState(AMQState s) throws AMQException
+ public void attainState(final AMQState s) throws AMQException
{
- boolean needToWait = false;
- StateWaiter sw = null;
- synchronized (_stateListeners)
+ synchronized(_stateLock)
{
- if (_currentState != s)
+ final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
+ long waitTime = MAXIMUM_STATE_WAIT_TIME;
+
+ while(_currentState != s && waitTime > 0)
{
- _logger.debug("Adding state wait to reach state " + s);
- sw = new StateWaiter(s);
- addStateListener(sw);
- // we use a boolean since we must release the lock before starting to wait
- needToWait = true;
+ try
+ {
+ _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Thread interrupted");
+ }
+ if(_currentState != s)
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
+ if(_currentState != s)
+ {
+ throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
}
}
- if (needToWait)
- {
- sw.waituntilStateHasChanged();
- }
+
// at this point the state will have changed.
}