diff options
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java | 25 |
1 files changed, 17 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 1bfc2205f2..8a5f9ffef5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -109,15 +109,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final MessageAcquireMode _acquireMode; private MessageFlowMode _flowMode; private final ServerSession _session; - private AtomicBoolean _stopped = new AtomicBoolean(true); + private final AtomicBoolean _stopped = new AtomicBoolean(true); private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; private LogActor _logActor; - private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); + private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); private UUID _id; private String _traceExclude; private String _trace; - private long _createTime = System.currentTimeMillis(); + private final long _createTime = System.currentTimeMillis(); private final AtomicLong _deliveredCount = new AtomicLong(0); private final Map<String, Object> _arguments; @@ -727,13 +727,22 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void stop() { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + try { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + getSendLock(); + + if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + { + _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + } + _stopped.set(true); + FlowCreditManager_0_10 creditManager = getCreditManager(); + creditManager.clearCredit(); + } + finally + { + releaseSendLock(); } - _stopped.set(true); - FlowCreditManager_0_10 creditManager = getCreditManager(); - creditManager.clearCredit(); } public void addCredit(MessageCreditUnit unit, long value) |
