summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java48
1 files changed, 30 insertions, 18 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index 8bde913149..f865837350 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -717,15 +717,18 @@ class Subscription_1_0 implements Subscription
getEndpoint().detach();
}
- public synchronized boolean wouldSuspend(final QueueEntry msg)
+ public boolean wouldSuspend(final QueueEntry msg)
{
- final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
- if(!hasCredit && getState() == State.ACTIVE)
+ synchronized (_link.getLock())
{
- suspend();
- }
+ final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+ if(!hasCredit && getState() == State.ACTIVE)
+ {
+ suspend();
+ }
- return !hasCredit;
+ return !hasCredit;
+ }
}
public boolean trySendLock()
@@ -733,11 +736,14 @@ class Subscription_1_0 implements Subscription
return _stateChangeLock.tryLock();
}
- public synchronized void suspend()
+ public void suspend()
{
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ synchronized(_link.getLock())
{
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
}
}
@@ -808,26 +814,32 @@ class Subscription_1_0 implements Subscription
return false; //TODO
}
- public synchronized void queueEmpty()
+ public void queueEmpty()
{
- if(_link.drained())
+ synchronized(_link.getLock())
{
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ if(_link.drained())
{
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
}
}
}
- public synchronized void flowStateChanged()
+ public void flowStateChanged()
{
- if(isSuspended() && getEndpoint() != null)
+ synchronized(_link.getLock())
{
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ if(isSuspended() && getEndpoint() != null)
{
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ {
+ _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ }
+ _transactionId = _link.getTransactionId();
}
- _transactionId = _link.getTransactionId();
}
}