summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-10-protocol
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-07 16:57:49 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-07 16:57:49 +0000
commit65b1a1ddfe95b9e273d4cdaf23067a0aaff9b1d1 (patch)
tree427d5ce851b9336fea70eb8fc6f87135ad239065 /qpid/java/broker-plugins/amqp-0-10-protocol
parent3ab4f9bdc9bbc8375534f45022a02257eb6e030d (diff)
downloadqpid-python-65b1a1ddfe95b9e273d4cdaf23067a0aaff9b1d1.tar.gz
QPID-5504 : Refactoring to allow for nodes other than queues to be subscribed from, and nodes other than exchanges to be sent to (merged from separate branch)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1565726 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java580
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java31
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java26
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java12
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java95
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java104
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java944
9 files changed, 713 insertions, 1087 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
new file mode 100644
index 0000000000..6ad9de22cb
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -0,0 +1,580 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.*;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
+{
+
+ private static final Option[] BATCHED = new Option[] { Option.BATCH };
+
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+ private final String _name;
+
+
+ private FlowCreditManager_0_10 _creditManager;
+
+ private final MessageAcceptMode _acceptMode;
+ private final MessageAcquireMode _acquireMode;
+ private MessageFlowMode _flowMode;
+ private final ServerSession _session;
+ private final AtomicBoolean _stopped = new AtomicBoolean(true);
+
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+
+ private final Map<String, Object> _arguments;
+ private int _deferredMessageCredit;
+ private long _deferredSizeCredit;
+ private Consumer _consumer;
+
+
+ public ConsumerTarget_0_10(ServerSession session,
+ String name,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ MessageFlowMode flowMode,
+ FlowCreditManager_0_10 creditManager,
+ Map<String, Object> arguments)
+ {
+ super(State.SUSPENDED);
+ _session = session;
+ _postIdSettingAction = new AddMessageDispositionListenerAction(session);
+ _acceptMode = acceptMode;
+ _acquireMode = acquireMode;
+ _creditManager = creditManager;
+ _flowMode = flowMode;
+ _creditManager.addStateListener(this);
+ _arguments = arguments == null ? Collections.<String, Object> emptyMap() :
+ Collections.<String, Object> unmodifiableMap(arguments);
+ _name = name;
+ }
+
+ public Consumer getConsumer()
+ {
+ return _consumer;
+ }
+
+ public boolean isSuspended()
+ {
+ return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
+ }
+
+ public boolean close()
+ {
+ boolean closed = false;
+ State state = getState();
+
+ getConsumer().getSendLock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
+ {
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ }
+ _creditManager.removeListener(this);
+ }
+ finally
+ {
+ getConsumer().releaseSendLock();
+ }
+
+ return closed;
+
+ }
+
+ public void creditStateChanged(boolean hasCredit)
+ {
+
+ if(hasCredit)
+ {
+ if(!updateState(State.SUSPENDED, State.ACTIVE))
+ {
+ // this is a hack to get round the issue of increasing bytes credit
+ getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ }
+ }
+ else
+ {
+ updateState(State.ACTIVE, State.SUSPENDED);
+ }
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+
+ public static class AddMessageDispositionListenerAction implements Runnable
+ {
+ private MessageTransfer _xfr;
+ private ServerSession.MessageDispositionChangeListener _action;
+ private ServerSession _session;
+
+ public AddMessageDispositionListenerAction(ServerSession session)
+ {
+ _session = session;
+ }
+
+ public void setXfr(MessageTransfer xfr)
+ {
+ _xfr = xfr;
+ }
+
+ public void setAction(ServerSession.MessageDispositionChangeListener action)
+ {
+ _action = action;
+ }
+
+ public void run()
+ {
+ if(_action != null)
+ {
+ _session.onMessageDispositionChange(_xfr, _action);
+ }
+ }
+ }
+
+ private final AddMessageDispositionListenerAction _postIdSettingAction;
+
+ public void send(final MessageInstance entry, boolean batch) throws AMQException
+ {
+ ServerMessage serverMsg = entry.getMessage();
+
+
+ MessageTransfer xfr;
+
+ DeliveryProperties deliveryProps;
+ MessageProperties messageProps = null;
+
+ MessageTransferMessage msg;
+
+ if(serverMsg instanceof MessageTransferMessage)
+ {
+
+ msg = (MessageTransferMessage) serverMsg;
+
+ }
+ else
+ {
+ MessageConverter converter =
+ MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
+
+
+ msg = (MessageTransferMessage) converter.convert(serverMsg, _session.getVirtualHost());
+ }
+ DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
+ messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
+
+ deliveryProps = new DeliveryProperties();
+ if(origDeliveryProps != null)
+ {
+ if(origDeliveryProps.hasDeliveryMode())
+ {
+ deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+ }
+ if(origDeliveryProps.hasExchange())
+ {
+ deliveryProps.setExchange(origDeliveryProps.getExchange());
+ }
+ if(origDeliveryProps.hasExpiration())
+ {
+ deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+ }
+ if(origDeliveryProps.hasPriority())
+ {
+ deliveryProps.setPriority(origDeliveryProps.getPriority());
+ }
+ if(origDeliveryProps.hasRoutingKey())
+ {
+ deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+ }
+ if(origDeliveryProps.hasTimestamp())
+ {
+ deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
+ }
+ if(origDeliveryProps.hasTtl())
+ {
+ deliveryProps.setTtl(origDeliveryProps.getTtl());
+ }
+
+
+ }
+
+ deliveryProps.setRedelivered(entry.isRedelivered());
+
+ Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
+
+
+ xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+ : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody());
+
+ if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
+ {
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ }
+ else if(_flowMode == MessageFlowMode.WINDOW)
+ {
+ xfr.setCompletionListener(new Method.CompletionListener()
+ {
+ public void onComplete(Method method)
+ {
+ deferredAddCredit(1, entry.getMessage().getSize());
+ }
+ });
+ }
+
+
+ _postIdSettingAction.setXfr(xfr);
+ if(_acceptMode == MessageAcceptMode.EXPLICIT)
+ {
+ _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
+ }
+ else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
+ {
+ _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
+ }
+ else
+ {
+ _postIdSettingAction.setAction(null);
+ }
+
+
+ _session.sendMessage(xfr, _postIdSettingAction);
+ entry.incrementDeliveryCount();
+ if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ forceDequeue(entry, false);
+ }
+ else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ recordUnacknowledged(entry);
+ }
+
+ }
+
+ void recordUnacknowledged(MessageInstance entry)
+ {
+ _unacknowledgedCount.incrementAndGet();
+ _unacknowledgedBytes.addAndGet(entry.getMessage().getSize());
+ }
+
+ private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
+ {
+ _deferredMessageCredit += deferredMessageCredit;
+ _deferredSizeCredit += deferredSizeCredit;
+
+ }
+
+ public void flushCreditState(boolean strict)
+ {
+ if(strict || !isSuspended() || _deferredMessageCredit >= 200
+ || !(_creditManager instanceof WindowCreditManager)
+ || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+ {
+ _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+ _deferredMessageCredit = 0;
+ _deferredSizeCredit = 0l;
+ }
+ }
+
+ private void forceDequeue(final MessageInstance entry, final boolean restoreCredit)
+ {
+ AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore());
+ dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ if (restoreCredit)
+ {
+ restoreCredit(entry.getMessage());
+ }
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+
+ void reject(final MessageInstance entry)
+ {
+ entry.setRedelivered();
+ entry.routeToAlternate(null, null);
+ if(entry.isAcquiredBy(getConsumer()))
+ {
+ entry.delete();
+ }
+ }
+
+ void release(final MessageInstance entry, final boolean setRedelivered)
+ {
+ if (setRedelivered)
+ {
+ entry.setRedelivered();
+ }
+
+ if (getSessionModel().isClosing() || !setRedelivered)
+ {
+ entry.decrementDeliveryCount();
+ }
+
+ if (isMaxDeliveryLimitReached(entry))
+ {
+ sendToDLQOrDiscard(entry);
+ }
+ else
+ {
+ entry.release();
+ }
+ }
+
+ protected void sendToDLQOrDiscard(MessageInstance entry)
+ {
+ final LogActor logActor = CurrentActor.get();
+ final ServerMessage msg = entry.getMessage();
+
+ int requeues = entry.routeToAlternate(new Action<MessageInstance>()
+ {
+ @Override
+ public void performAction(final MessageInstance requeueEntry)
+ {
+ logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getOwningResource().getName()));
+ }
+ }, null);
+
+ if (requeues == 0)
+ {
+ TransactionLogResource owningResource = entry.getOwningResource();
+ if(owningResource instanceof AMQQueue)
+ {
+ final AMQQueue queue = (AMQQueue)owningResource;
+ final Exchange alternateExchange = queue.getAlternateExchange();
+
+ if(alternateExchange != null)
+ {
+ logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ alternateExchange.getName()));
+ }
+ else
+ {
+ logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ queue.getName(),
+ msg.getRoutingKey()));
+ }
+ }
+ }
+ }
+
+ private boolean isMaxDeliveryLimitReached(MessageInstance entry)
+ {
+ final int maxDeliveryLimit = entry.getMaximumDeliveryCount();
+ return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
+ }
+
+ public void queueDeleted()
+ {
+ _deleted.set(true);
+ }
+
+ public boolean allocateCredit(ServerMessage message)
+ {
+ return _creditManager.useCreditForMessage(message.getSize());
+ }
+
+ public void restoreCredit(ServerMessage message)
+ {
+ _creditManager.restoreCredit(1, message.getSize());
+ }
+
+ public FlowCreditManager_0_10 getCreditManager()
+ {
+ return _creditManager;
+ }
+
+
+ public void stop()
+ {
+ try
+ {
+ getConsumer().getSendLock();
+
+ updateState(State.ACTIVE, State.SUSPENDED);
+ _stopped.set(true);
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+ creditManager.clearCredit();
+ }
+ finally
+ {
+ getConsumer().releaseSendLock();
+ }
+ }
+
+ public void addCredit(MessageCreditUnit unit, long value)
+ {
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+
+ switch (unit)
+ {
+ case MESSAGE:
+
+ creditManager.addCredit(value, 0L);
+ break;
+ case BYTE:
+ creditManager.addCredit(0l, value);
+ break;
+ }
+
+ _stopped.set(false);
+
+ if(creditManager.hasCredit())
+ {
+ updateState(State.SUSPENDED, State.ACTIVE);
+ }
+
+ }
+
+ public void setFlowMode(MessageFlowMode flowMode)
+ {
+
+
+ _creditManager.removeListener(this);
+
+ switch(flowMode)
+ {
+ case CREDIT:
+ _creditManager = new CreditCreditManager(0l,0l);
+ break;
+ case WINDOW:
+ _creditManager = new WindowCreditManager(0l,0l);
+ break;
+ default:
+ throw new RuntimeException("Unknown message flow mode: " + flowMode);
+ }
+ _flowMode = flowMode;
+ updateState(State.ACTIVE, State.SUSPENDED);
+
+ _creditManager.addStateListener(this);
+
+ }
+
+ public boolean isStopped()
+ {
+ return _stopped.get();
+ }
+
+ public void acknowledge(MessageInstance entry)
+ {
+ // TODO Fix Store Context / cleanup
+ if(entry.isAcquiredBy(getConsumer()))
+ {
+ _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
+ _unacknowledgedCount.decrementAndGet();
+ entry.delete();
+ }
+ }
+
+ public void flush() throws AMQException
+ {
+ flushCreditState(true);
+ getConsumer().flush();
+ stop();
+ }
+
+ public ServerSession getSessionModel()
+ {
+ return _session;
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return _arguments;
+ }
+
+ public void queueEmpty()
+ {
+ }
+
+ public void flushBatched()
+ {
+ _session.getConnection().flush();
+ }
+
+
+ @Override
+ public void consumerAdded(final Consumer sub)
+ {
+ _consumer = sub;
+ }
+
+ @Override
+ public void consumerRemoved(final Consumer sub)
+ {
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 4b38b8a1a3..4420709a91 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -30,21 +30,20 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class);
- private final QueueEntry _entry;
- private final Subscription_0_10 _sub;
+ private final MessageInstance _entry;
+ private final ConsumerTarget_0_10 _target;
- public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
{
_entry = entry;
- _sub = subscription_0_10;
+ _target = target;
}
public void onAccept()
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
- subscription.getSessionModel().acknowledge(subscription, _entry);
+ _target.getSessionModel().acknowledge(_target, _entry);
}
else
{
@@ -55,10 +54,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
- subscription.release(_entry, setRedelivered);
+ _target.release(_entry, setRedelivered);
}
else
{
@@ -68,10 +66,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
- subscription.reject(_entry);
+ _target.reject(_entry);
}
else
{
@@ -82,12 +79,8 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- return _entry.acquire(getSubscription());
+ return _entry.acquire(_target.getConsumer());
}
- private Subscription_0_10 getSubscription()
- {
- return _sub;
- }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
index ce0155b789..c459364dbb 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
@@ -22,20 +22,20 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
{
private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class);
- private final QueueEntry _entry;
- private Subscription_0_10 _sub;
+ private final MessageInstance _entry;
+ private ConsumerTarget_0_10 _target;
- public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
{
_entry = entry;
- _sub = subscription_0_10;
+ _target = target;
}
public void onAccept()
@@ -45,9 +45,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_target.getConsumer()))
{
- getSubscription().release(_entry, setRedelivered);
+ _target.release(_entry, setRedelivered);
}
else
{
@@ -57,9 +57,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_target.getConsumer()))
{
- getSubscription().reject(_entry);
+ _target.reject(_entry);
}
else
{
@@ -70,19 +70,15 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- boolean acquired = _entry.acquire(getSubscription());
+ boolean acquired = _entry.acquire(_target.getConsumer());
if(acquired)
{
- getSubscription().recordUnacknowledged(_entry);
+ _target.recordUnacknowledged(_entry);
}
return acquired;
}
- public Subscription_0_10 getSubscription()
- {
- return _sub;
- }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index f5f2a8d43f..cd1146ac0b 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -21,17 +21,17 @@
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.transport.Method;
public class MessageAcceptCompletionListener implements Method.CompletionListener
{
- private final Subscription_0_10 _sub;
- private final QueueEntry _entry;
+ private final ConsumerTarget_0_10 _sub;
+ private final MessageInstance _entry;
private final ServerSession _session;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
{
super();
_sub = sub;
@@ -44,9 +44,9 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
{
if(_restoreCredit)
{
- _sub.restoreCredit(_entry);
+ _sub.restoreCredit(_entry.getMessage());
}
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_sub.getConsumer()))
{
_session.acknowledge(_sub, _entry);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 2e74621814..687331e51d 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -141,7 +141,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
return buf;
}
- public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+ public int writeToBuffer(ByteBuffer dest)
{
ByteBuffer buf = _encoded;
@@ -153,7 +153,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
buf = buf.duplicate();
- buf.position(offsetInMetaData);
+ buf.position(0);
if(dest.remaining() < buf.limit())
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index a15fea1200..c85a415ce5 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -282,8 +282,8 @@ public class ServerConnectionDelegate extends ServerDelegate
private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
{
final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
- final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subs)
+ final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subs)
{
subscription_0_10.stop();
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 93d886687c..53022c333e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -46,7 +46,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -55,15 +55,16 @@ import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -77,6 +78,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
@@ -104,14 +106,7 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction()
- {
- @Override
- public void onEnqueue(final QueueEntry entry)
- {
- entry.getQueue().checkCapacity(ServerSession.this);
- }
- };
+ private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
public static interface MessageDispositionChangeListener
{
@@ -126,12 +121,6 @@ public class ServerSession extends Session
}
- public static interface Task
- {
- public void doTask(ServerSession session);
- }
-
-
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
@@ -142,9 +131,9 @@ public class ServerSession extends Session
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
+ private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
- private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
private final TransactionTimeoutHelper _transactionTimeoutHelper;
@@ -194,7 +183,7 @@ public class ServerSession extends Session
public int enqueue(final MessageTransferMessage message,
final InstanceProperties instanceProperties,
- final Exchange exchange)
+ final MessageDestination exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -386,9 +375,9 @@ public class ServerSession extends Session
}
_messageDispositionListenerMap.clear();
- for (Task task : _taskList)
+ for (Action<ServerSession> task : _taskList)
{
- task.doTask(this);
+ task.performAction(this);
}
LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
@@ -405,9 +394,9 @@ public class ServerSession extends Session
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
+ public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
{
- _transaction.dequeue(entry.getQueue(), entry.getMessage(),
+ _transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
new ServerTransaction.Action()
{
@@ -426,42 +415,26 @@ public class ServerSession extends Session
});
}
- public Collection<Subscription_0_10> getSubscriptions()
+ public Collection<ConsumerTarget_0_10> getSubscriptions()
{
return _subscriptions.values();
}
- public void register(String destination, Subscription_0_10 sub)
+ public void register(String destination, ConsumerTarget_0_10 sub)
{
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
- public Subscription_0_10 getSubscription(String destination)
+ public ConsumerTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
}
- public void unregister(Subscription_0_10 sub)
+ public void unregister(ConsumerTarget_0_10 sub)
{
_subscriptions.remove(sub.getName());
- try
- {
- sub.getSendLock();
- AMQQueue queue = sub.getQueue();
- if(queue != null)
- {
- queue.unregisterSubscription(sub);
- }
- }
- catch (AMQException e)
- {
- // TODO
- _logger.error("Failed to unregister subscription :" + e.getMessage(), e);
- }
- finally
- {
- sub.releaseSendLock();
- }
+ sub.close();
+
}
public boolean isTransactional()
@@ -638,12 +611,12 @@ public class ServerSession extends Session
return getConnection().getAuthorizedSubject();
}
- public void addSessionCloseTask(Task task)
+ public void addSessionCloseTask(Action<ServerSession> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Task task)
+ public void removeSessionCloseTask(Action<ServerSession> task)
{
_taskList.remove(task);
}
@@ -829,8 +802,8 @@ public class ServerSession extends Session
void unregisterSubscriptions()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
@@ -838,8 +811,8 @@ public class ServerSession extends Session
void stopSubscriptions()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.stop();
}
@@ -848,8 +821,8 @@ public class ServerSession extends Session
public void receivedComplete()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.flushCreditState(false);
}
@@ -955,4 +928,16 @@ public class ServerSession extends Session
return getId().compareTo(o.getId());
}
+ private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>>
+ {
+ @Override
+ public void performAction(final MessageInstance<C> entry)
+ {
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+ }
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index dcca696529..9a90b74656 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
@@ -34,7 +35,9 @@ import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -45,6 +48,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -55,6 +59,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -193,7 +198,7 @@ public class ServerSessionDelegate extends SessionDelegate
String queueName = method.getQueue();
VirtualHost vhost = getVirtualHost(session);
- final AMQQueue queue = vhost.getQueue(queueName);
+ final MessageSource queue = vhost.getMessageSource(queueName);
if(queue == null)
{
@@ -214,9 +219,9 @@ public class ServerSessionDelegate extends SessionDelegate
ServerSession s = (ServerSession) session;
queue.setExclusiveOwningSession(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
if(queue.getExclusiveOwningSession() == session)
{
@@ -228,9 +233,9 @@ public class ServerSessionDelegate extends SessionDelegate
if(queue.getAuthorizationHolder() == null)
{
queue.setAuthorizationHolder(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
if(queue.getAuthorizationHolder() == session)
{
@@ -254,25 +259,42 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
- destination,
- method.getAcceptMode(),
- method.getAcquireMode(),
- MessageFlowMode.WINDOW,
- creditManager,
- filterManager,
- method.getArguments());
+ ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
+ method.getAcceptMode(),
+ method.getAcquireMode(),
+ MessageFlowMode.WINDOW,
+ creditManager,
+ method.getArguments()
+ );
- ((ServerSession)session).register(destination, sub);
+ ((ServerSession)session).register(destination, target);
try
{
- queue.registerSubscription(sub, method.getExclusive());
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+ if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ options.add(Consumer.Option.ACQUIRES);
+ }
+ if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+ if(method.getExclusive())
+ {
+ options.add(Consumer.Option.EXCLUSIVE);
+ }
+ Consumer sub =
+ queue.addConsumer(target,
+ filterManager,
+ MessageTransferMessage.class,
+ destination,
+ options);
}
- catch (AMQQueue.ExistingExclusiveSubscription existing)
+ catch (AMQQueue.ExistingExclusiveConsumer existing)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+ catch (AMQQueue.ExistingConsumerPreventsExclusive exclusive)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
}
@@ -288,7 +310,7 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
- final Exchange exchange = getExchangeForMessage(ssn, xfr);
+ final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -307,7 +329,6 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
- final Exchange exchangeInUse;
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
final ServerSession serverSession = (ServerSession) ssn;
@@ -385,7 +406,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = method.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -393,12 +414,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- AMQQueue queue = sub.getQueue();
((ServerSession)session).unregister(sub);
- if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
- {
- queue.setAuthorizationHolder(null);
- }
}
}
@@ -407,7 +423,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = method.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -814,24 +830,24 @@ public class ServerSessionDelegate extends SessionDelegate
return getVirtualHost(session).getExchange(exchangeName);
}
- private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
+ private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr)
{
VirtualHost virtualHost = getVirtualHost(ssn);
- Exchange exchange;
+ MessageDestination destination;
if(xfr.hasDestination())
{
- exchange = virtualHost.getExchange(xfr.getDestination());
- if(exchange == null)
+ destination = virtualHost.getMessageDestination(xfr.getDestination());
+ if(destination == null)
{
- exchange = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultExchange();
}
}
else
{
- exchange = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultExchange();
}
- return exchange;
+ return destination;
}
private VirtualHost getVirtualHost(Session session)
@@ -1249,9 +1265,9 @@ public class ServerSessionDelegate extends SessionDelegate
if (autoDelete && exclusive)
{
final AMQQueue q = queue;
- final ServerSession.Task deleteQueueTask = new ServerSession.Task()
+ final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
try
{
@@ -1265,9 +1281,9 @@ public class ServerSessionDelegate extends SessionDelegate
};
final ServerSession s = (ServerSession) session;
s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue) throws AMQException
+ public void performAction(AMQQueue queue)
{
s.removeSessionCloseTask(deleteQueueTask);
}
@@ -1276,9 +1292,9 @@ public class ServerSessionDelegate extends SessionDelegate
if (exclusive)
{
final AMQQueue q = queue;
- final ServerSession.Task removeExclusive = new ServerSession.Task()
+ final Action<ServerSession> removeExclusive = new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
q.setAuthorizationHolder(null);
q.setExclusiveOwningSession(null);
@@ -1287,9 +1303,9 @@ public class ServerSessionDelegate extends SessionDelegate
final ServerSession s = (ServerSession) session;
q.setExclusiveOwningSession(s);
s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue) throws AMQException
+ public void performAction(AMQQueue queue)
{
s.removeSessionCloseTask(removeExclusive);
}
@@ -1461,7 +1477,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = sfm.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1478,7 +1494,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = stop.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1496,7 +1512,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = flow.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
deleted file mode 100644
index 357b565365..0000000000
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ /dev/null
@@ -1,944 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Struct;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-
-import java.text.MessageFormat;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject
-{
- private final long _subscriptionID;
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
-
- private static final Option[] BATCHED = new Option[] { Option.BATCH };
-
- private final Lock _stateChangeLock = new ReentrantLock();
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private volatile AMQQueue.Context _queueContext;
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
-
- private FlowCreditManager_0_10 _creditManager;
-
- private StateListener _stateListener = new StateListener()
- {
-
- public void stateChange(Subscription sub, State oldState, State newState)
- {
- CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
- }
- };
- private AMQQueue _queue;
- private final String _destination;
- private boolean _noLocal;
- private final FilterManager _filters;
- private final MessageAcceptMode _acceptMode;
- private final MessageAcquireMode _acquireMode;
- private MessageFlowMode _flowMode;
- private final ServerSession _session;
- private final AtomicBoolean _stopped = new AtomicBoolean(true);
- private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
-
- private LogActor _logActor;
- private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private String _traceExclude;
- private String _trace;
- private final long _createTime = System.currentTimeMillis();
- private final AtomicLong _deliveredCount = new AtomicLong(0);
- private final AtomicLong _deliveredBytes = new AtomicLong(0);
- private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
- private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-
- private final Map<String, Object> _arguments;
- private int _deferredMessageCredit;
- private long _deferredSizeCredit;
-
-
- public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode,
- MessageFlowMode flowMode,
- FlowCreditManager_0_10 creditManager,
- FilterManager filters,Map<String, Object> arguments)
- {
- _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
- _session = session;
- _postIdSettingAction = new AddMessageDispositionListenerAction(session);
- _destination = destination;
- _acceptMode = acceptMode;
- _acquireMode = acquireMode;
- _creditManager = creditManager;
- _flowMode = flowMode;
- _filters = filters;
- _creditManager.addStateListener(this);
- _arguments = arguments == null ? Collections.<String, Object> emptyMap() :
- Collections.<String, Object> unmodifiableMap(arguments);
- _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
-
- }
-
- public void setNoLocal(boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public void setQueue(AMQQueue queue, boolean exclusive)
- {
- if(getQueue() != null)
- {
- throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
- }
- _queue = queue;
-
- _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES);
- _trace = (String) queue.getAttribute(Queue.FEDERATION_ID);
- String filterLogString = null;
-
- _logActor = GenericActor.getInstance(this);
- if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, this, SubscriptionMessages.CREATE_LOG_HIERARCHY))
- {
- filterLogString = getFilterLogString();
- CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
- filterLogString.length() > 0));
- }
- }
-
- public String getConsumerName()
- {
- return _destination;
- }
-
- public boolean isSuspended()
- {
- return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
- }
-
- public boolean hasInterest(QueueEntry entry)
- {
-
-
-
- //check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
- {
-
- return false;
- }
-
- if (entry.getMessage() instanceof MessageTransferMessage)
- {
- if(_noLocal)
- {
- Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
- if (connectionRef != null && connectionRef == _session.getReference())
- {
- return false;
- }
- }
- }
- else
- {
- // no interest in messages we can't convert
- if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), MessageTransferMessage.class)==null)
- {
- return false;
- }
- }
-
-
- return checkFilters(entry);
-
-
- }
-
- private boolean checkFilters(QueueEntry entry)
- {
- return (_filters == null) || _filters.allAllow(entry.asFilterable());
- }
-
- public boolean isClosed()
- {
- return getState() == State.CLOSED;
- }
-
- public boolean isBrowser()
- {
- return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
- }
-
- public boolean seesRequeues()
- {
- return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT;
- }
-
- public void close()
- {
- boolean closed = false;
- State state = getState();
-
- _stateChangeLock.lock();
- try
- {
- while(!closed && state != State.CLOSED)
- {
- closed = _state.compareAndSet(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
- else
- {
- _stateListener.stateChange(this,state, State.CLOSED);
- }
- }
- _creditManager.removeListener(this);
- CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
- }
- finally
- {
- _stateChangeLock.unlock();
- }
-
-
-
- }
-
- public Long getDelivered()
- {
- return _deliveredCount.get();
- }
-
- public void creditStateChanged(boolean hasCredit)
- {
-
- if(hasCredit)
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- else
- {
- // this is a hack to get round the issue of increasing bytes credit
- _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
- }
- }
- else
- {
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- }
- }
-
-
- public static class AddMessageDispositionListenerAction implements Runnable
- {
- private MessageTransfer _xfr;
- private ServerSession.MessageDispositionChangeListener _action;
- private ServerSession _session;
-
- public AddMessageDispositionListenerAction(ServerSession session)
- {
- _session = session;
- }
-
- public void setXfr(MessageTransfer xfr)
- {
- _xfr = xfr;
- }
-
- public void setAction(ServerSession.MessageDispositionChangeListener action)
- {
- _action = action;
- }
-
- public void run()
- {
- if(_action != null)
- {
- _session.onMessageDispositionChange(_xfr, _action);
- }
- }
- }
-
- private final AddMessageDispositionListenerAction _postIdSettingAction;
-
- public void send(final QueueEntry entry, boolean batch) throws AMQException
- {
- ServerMessage serverMsg = entry.getMessage();
-
-
- MessageTransfer xfr;
-
- DeliveryProperties deliveryProps;
- MessageProperties messageProps = null;
-
- MessageTransferMessage msg;
-
- if(serverMsg instanceof MessageTransferMessage)
- {
-
- msg = (MessageTransferMessage) serverMsg;
-
- }
- else
- {
- MessageConverter converter =
- MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
-
-
- msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost());
- }
- DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
- messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
-
- deliveryProps = new DeliveryProperties();
- if(origDeliveryProps != null)
- {
- if(origDeliveryProps.hasDeliveryMode())
- {
- deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
- }
- if(origDeliveryProps.hasExchange())
- {
- deliveryProps.setExchange(origDeliveryProps.getExchange());
- }
- if(origDeliveryProps.hasExpiration())
- {
- deliveryProps.setExpiration(origDeliveryProps.getExpiration());
- }
- if(origDeliveryProps.hasPriority())
- {
- deliveryProps.setPriority(origDeliveryProps.getPriority());
- }
- if(origDeliveryProps.hasRoutingKey())
- {
- deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
- }
- if(origDeliveryProps.hasTimestamp())
- {
- deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
- }
- if(origDeliveryProps.hasTtl())
- {
- deliveryProps.setTtl(origDeliveryProps.getTtl());
- }
-
-
- }
-
- deliveryProps.setRedelivered(entry.isRedelivered());
-
- if(_trace != null && messageProps == null)
- {
- messageProps = new MessageProperties();
- }
-
- Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
-
-
- xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
- : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
-
- boolean excludeDueToFederation = false;
-
- if(_trace != null)
- {
- if(!messageProps.hasApplicationHeaders())
- {
- messageProps.setApplicationHeaders(new HashMap<String,Object>());
- }
- Map<String,Object> appHeaders = messageProps.getApplicationHeaders();
- String trace = (String) appHeaders.get("x-qpid.trace");
- if(trace == null)
- {
- trace = _trace;
- }
- else
- {
- if(_traceExclude != null)
- {
- excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude);
- }
- trace+=","+_trace;
- }
- appHeaders.put("x-qpid.trace",trace);
- }
-
- if(!excludeDueToFederation)
- {
- if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
- {
- xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
- }
- else if(_flowMode == MessageFlowMode.WINDOW)
- {
- xfr.setCompletionListener(new Method.CompletionListener()
- {
- public void onComplete(Method method)
- {
- deferredAddCredit(1, entry.getSize());
- }
- });
- }
-
-
- _postIdSettingAction.setXfr(xfr);
- if(_acceptMode == MessageAcceptMode.EXPLICIT)
- {
- _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
- }
- else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
- {
- _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
- }
- else
- {
- _postIdSettingAction.setAction(null);
- }
-
-
- _session.sendMessage(xfr, _postIdSettingAction);
- entry.incrementDeliveryCount();
- _deliveredCount.incrementAndGet();
- _deliveredBytes.addAndGet(entry.getSize());
- if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
- {
- forceDequeue(entry, false);
- }
- else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
- {
- recordUnacknowledged(entry);
- }
- }
- else
- {
- forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW);
-
- }
- }
-
- void recordUnacknowledged(QueueEntry entry)
- {
- _unacknowledgedCount.incrementAndGet();
- _unacknowledgedBytes.addAndGet(entry.getSize());
- }
-
- private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
- {
- _deferredMessageCredit += deferredMessageCredit;
- _deferredSizeCredit += deferredSizeCredit;
-
- }
-
- public void flushCreditState(boolean strict)
- {
- if(strict || !isSuspended() || _deferredMessageCredit >= 200
- || !(_creditManager instanceof WindowCreditManager)
- || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
- {
- _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
- _deferredMessageCredit = 0;
- _deferredSizeCredit = 0l;
- }
- }
-
- private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
- {
- AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
- dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- if (restoreCredit)
- {
- restoreCredit(entry);
- }
- entry.delete();
- }
-
- public void onRollback()
- {
-
- }
- });
- }
-
- void reject(final QueueEntry entry)
- {
- entry.setRedelivered();
- entry.routeToAlternate(null, null);
- if(entry.isAcquiredBy(this))
- {
- entry.delete();
- }
- }
-
- void release(final QueueEntry entry, final boolean setRedelivered)
- {
- if (setRedelivered)
- {
- entry.setRedelivered();
- }
-
- if (getSessionModel().isClosing() || !setRedelivered)
- {
- entry.decrementDeliveryCount();
- }
-
- if (isMaxDeliveryLimitReached(entry))
- {
- sendToDLQOrDiscard(entry);
- }
- else
- {
- entry.release();
- }
- }
-
- protected void sendToDLQOrDiscard(QueueEntry entry)
- {
- final LogActor logActor = CurrentActor.get();
- final ServerMessage msg = entry.getMessage();
-
- int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction()
- {
- @Override
- public void onEnqueue(final QueueEntry requeueEntry)
- {
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getQueue().getName()));
- }
- }, null);
-
- if (requeues == 0)
- {
- final AMQQueue queue = entry.getQueue();
- final Exchange alternateExchange = queue.getAlternateExchange();
-
- if(alternateExchange != null)
- {
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
- alternateExchange.getName()));
- }
- else
- {
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
- queue.getName(),
- msg.getRoutingKey()));
- }
- }
- }
-
- private boolean isMaxDeliveryLimitReached(QueueEntry entry)
- {
- final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount();
- return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
- }
-
- public void queueDeleted(AMQQueue queue)
- {
- _deleted.set(true);
- }
-
- public boolean wouldSuspend(QueueEntry entry)
- {
- return !_creditManager.useCreditForMessage(entry.getMessage().getSize());
- }
-
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
-
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public void restoreCredit(QueueEntry queueEntry)
- {
- _creditManager.restoreCredit(1, queueEntry.getSize());
- }
-
- public void onDequeue(QueueEntry queueEntry)
- {
- // no-op for 0-10, credit restored by completing command.
- }
-
- public void releaseQueueEntry(QueueEntry queueEntry)
- {
- // no-op for 0-10, credit restored by completing command.
- }
-
- public void setStateListener(StateListener listener)
- {
- _stateListener = listener;
- }
-
- public State getState()
- {
- return _state.get();
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context queueContext)
- {
- _queueContext = queueContext;
- }
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public void set(String key, Object value)
- {
- _properties.put(key, value);
- }
-
- public Object get(String key)
- {
- return _properties.get(key);
- }
-
-
- public FlowCreditManager_0_10 getCreditManager()
- {
- return _creditManager;
- }
-
-
- public void stop()
- {
- try
- {
- getSendLock();
-
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- _stopped.set(true);
- FlowCreditManager_0_10 creditManager = getCreditManager();
- creditManager.clearCredit();
- }
- finally
- {
- releaseSendLock();
- }
- }
-
- public void addCredit(MessageCreditUnit unit, long value)
- {
- FlowCreditManager_0_10 creditManager = getCreditManager();
-
- switch (unit)
- {
- case MESSAGE:
-
- creditManager.addCredit(value, 0L);
- break;
- case BYTE:
- creditManager.addCredit(0l, value);
- break;
- }
-
- _stopped.set(false);
-
- if(creditManager.hasCredit())
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- }
-
- }
-
- public void setFlowMode(MessageFlowMode flowMode)
- {
-
-
- _creditManager.removeListener(this);
-
- switch(flowMode)
- {
- case CREDIT:
- _creditManager = new CreditCreditManager(0l,0l);
- break;
- case WINDOW:
- _creditManager = new WindowCreditManager(0l,0l);
- break;
- default:
- throw new RuntimeException("Unknown message flow mode: " + flowMode);
- }
- _flowMode = flowMode;
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
-
- _creditManager.addStateListener(this);
-
- }
-
- public boolean isStopped()
- {
- return _stopped.get();
- }
-
- public boolean acquires()
- {
- return _acquireMode == MessageAcquireMode.PRE_ACQUIRED;
- }
-
- public void acknowledge(QueueEntry entry)
- {
- // TODO Fix Store Context / cleanup
- if(entry.isAcquiredBy(this))
- {
- _unacknowledgedBytes.addAndGet(-entry.getSize());
- _unacknowledgedCount.decrementAndGet();
- entry.delete();
- }
- }
-
- public void flush() throws AMQException
- {
- flushCreditState(true);
- _queue.flushSubscription(this);
- stop();
- }
-
- public long getSubscriptionID()
- {
- return _subscriptionID;
- }
-
- public LogActor getLogActor()
- {
- return _logActor;
- }
-
- public boolean isTransient()
- {
- return false;
- }
-
- public ServerSession getSessionModel()
- {
- return _session;
- }
-
- public boolean isBrowsing()
- {
- return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
- }
-
- public boolean isExclusive()
- {
- return getQueue().hasExclusiveSubscriber();
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
-
- public boolean isExplicitAcknowledge()
- {
- return _acceptMode == MessageAcceptMode.EXPLICIT;
- }
-
- public String getCreditMode()
- {
- return _flowMode.toString();
- }
-
- public String getName()
- {
- return _destination;
- }
-
- public Map<String, Object> getArguments()
- {
- return _arguments;
- }
-
- public boolean isSessionTransactional()
- {
- return _session.isTransactional();
- }
-
- public void queueEmpty()
- {
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public String toLogString()
- {
- String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
- _queue.getName());
- String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
- // queueString is "vh(/{0})/qu({1}) " so need to trim
- + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] ";
- return result;
- }
-
- private String getFilterLogString()
- {
- StringBuilder filterLogString = new StringBuilder();
- String delimiter = ", ";
- boolean hasEntries = false;
- if (_filters != null && _filters.hasFilters())
- {
- filterLogString.append(_filters.toString());
- hasEntries = true;
- }
-
- if (isBrowser())
- {
- if (hasEntries)
- {
- filterLogString.append(delimiter);
- }
- filterLogString.append("Browser");
- hasEntries = true;
- }
-
- if (isDurable())
- {
- if (hasEntries)
- {
- filterLogString.append(delimiter);
- }
- filterLogString.append("Durable");
- hasEntries = true;
- }
-
- return filterLogString.toString();
- }
-
- public LogSubject getLogSubject()
- {
- return (LogSubject) this;
- }
-
-
- public void flushBatched()
- {
- _session.getConnection().flush();
- }
-
- public long getBytesOut()
- {
- return _deliveredBytes.longValue();
- }
-
- public long getMessagesOut()
- {
- return _deliveredCount.longValue();
- }
-
- public long getUnacknowledgedBytes()
- {
- return _unacknowledgedBytes.longValue();
- }
-
- public long getUnacknowledgedMessages()
- {
- return _unacknowledgedCount.longValue();
- }
-}