diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-07 16:57:49 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-07 16:57:49 +0000 |
| commit | 65b1a1ddfe95b9e273d4cdaf23067a0aaff9b1d1 (patch) | |
| tree | 427d5ce851b9336fea70eb8fc6f87135ad239065 /qpid/java/broker-plugins/amqp-0-10-protocol | |
| parent | 3ab4f9bdc9bbc8375534f45022a02257eb6e030d (diff) | |
| download | qpid-python-65b1a1ddfe95b9e273d4cdaf23067a0aaff9b1d1.tar.gz | |
QPID-5504 : Refactoring to allow for nodes other than queues to be subscribed from, and nodes other than exchanges to be sent to (merged from separate branch)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1565726 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol')
9 files changed, 713 insertions, 1087 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java new file mode 100644 index 0000000000..6ad9de22cb --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -0,0 +1,580 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.*; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener +{ + + private static final Option[] BATCHED = new Option[] { Option.BATCH }; + + private final AtomicBoolean _deleted = new AtomicBoolean(false); + private final String _name; + + + private FlowCreditManager_0_10 _creditManager; + + private final MessageAcceptMode _acceptMode; + private final MessageAcquireMode _acquireMode; + private MessageFlowMode _flowMode; + private final ServerSession _session; + private final AtomicBoolean _stopped = new AtomicBoolean(true); + + private final AtomicLong _unacknowledgedCount = new AtomicLong(0); + private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); + + private final Map<String, Object> _arguments; + private int _deferredMessageCredit; + private long _deferredSizeCredit; + private Consumer _consumer; + + + public ConsumerTarget_0_10(ServerSession session, + String name, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + MessageFlowMode flowMode, + FlowCreditManager_0_10 creditManager, + Map<String, Object> arguments) + { + super(State.SUSPENDED); + _session = session; + _postIdSettingAction = new AddMessageDispositionListenerAction(session); + _acceptMode = acceptMode; + _acquireMode = acquireMode; + _creditManager = creditManager; + _flowMode = flowMode; + _creditManager.addStateListener(this); + _arguments = arguments == null ? Collections.<String, Object> emptyMap() : + Collections.<String, Object> unmodifiableMap(arguments); + _name = name; + } + + public Consumer getConsumer() + { + return _consumer; + } + + public boolean isSuspended() + { + return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension + } + + public boolean close() + { + boolean closed = false; + State state = getState(); + + getConsumer().getSendLock(); + try + { + while(!closed && state != State.CLOSED) + { + closed = updateState(state, State.CLOSED); + if(!closed) + { + state = getState(); + } + } + _creditManager.removeListener(this); + } + finally + { + getConsumer().releaseSendLock(); + } + + return closed; + + } + + public void creditStateChanged(boolean hasCredit) + { + + if(hasCredit) + { + if(!updateState(State.SUSPENDED, State.ACTIVE)) + { + // this is a hack to get round the issue of increasing bytes credit + getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE); + } + } + else + { + updateState(State.ACTIVE, State.SUSPENDED); + } + } + + public String getName() + { + return _name; + } + + + public static class AddMessageDispositionListenerAction implements Runnable + { + private MessageTransfer _xfr; + private ServerSession.MessageDispositionChangeListener _action; + private ServerSession _session; + + public AddMessageDispositionListenerAction(ServerSession session) + { + _session = session; + } + + public void setXfr(MessageTransfer xfr) + { + _xfr = xfr; + } + + public void setAction(ServerSession.MessageDispositionChangeListener action) + { + _action = action; + } + + public void run() + { + if(_action != null) + { + _session.onMessageDispositionChange(_xfr, _action); + } + } + } + + private final AddMessageDispositionListenerAction _postIdSettingAction; + + public void send(final MessageInstance entry, boolean batch) throws AMQException + { + ServerMessage serverMsg = entry.getMessage(); + + + MessageTransfer xfr; + + DeliveryProperties deliveryProps; + MessageProperties messageProps = null; + + MessageTransferMessage msg; + + if(serverMsg instanceof MessageTransferMessage) + { + + msg = (MessageTransferMessage) serverMsg; + + } + else + { + MessageConverter converter = + MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class); + + + msg = (MessageTransferMessage) converter.convert(serverMsg, _session.getVirtualHost()); + } + DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); + messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); + + deliveryProps = new DeliveryProperties(); + if(origDeliveryProps != null) + { + if(origDeliveryProps.hasDeliveryMode()) + { + deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode()); + } + if(origDeliveryProps.hasExchange()) + { + deliveryProps.setExchange(origDeliveryProps.getExchange()); + } + if(origDeliveryProps.hasExpiration()) + { + deliveryProps.setExpiration(origDeliveryProps.getExpiration()); + } + if(origDeliveryProps.hasPriority()) + { + deliveryProps.setPriority(origDeliveryProps.getPriority()); + } + if(origDeliveryProps.hasRoutingKey()) + { + deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey()); + } + if(origDeliveryProps.hasTimestamp()) + { + deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); + } + if(origDeliveryProps.hasTtl()) + { + deliveryProps.setTtl(origDeliveryProps.getTtl()); + } + + + } + + deliveryProps.setRedelivered(entry.isRedelivered()); + + Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); + + + xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) + : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody()); + + if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) + { + xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); + } + else if(_flowMode == MessageFlowMode.WINDOW) + { + xfr.setCompletionListener(new Method.CompletionListener() + { + public void onComplete(Method method) + { + deferredAddCredit(1, entry.getMessage().getSize()); + } + }); + } + + + _postIdSettingAction.setXfr(xfr); + if(_acceptMode == MessageAcceptMode.EXPLICIT) + { + _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this)); + } + else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED) + { + _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this)); + } + else + { + _postIdSettingAction.setAction(null); + } + + + _session.sendMessage(xfr, _postIdSettingAction); + entry.incrementDeliveryCount(); + if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) + { + forceDequeue(entry, false); + } + else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED) + { + recordUnacknowledged(entry); + } + + } + + void recordUnacknowledged(MessageInstance entry) + { + _unacknowledgedCount.incrementAndGet(); + _unacknowledgedBytes.addAndGet(entry.getMessage().getSize()); + } + + private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) + { + _deferredMessageCredit += deferredMessageCredit; + _deferredSizeCredit += deferredSizeCredit; + + } + + public void flushCreditState(boolean strict) + { + if(strict || !isSuspended() || _deferredMessageCredit >= 200 + || !(_creditManager instanceof WindowCreditManager) + || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 ) + { + _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit); + _deferredMessageCredit = 0; + _deferredSizeCredit = 0l; + } + } + + private void forceDequeue(final MessageInstance entry, final boolean restoreCredit) + { + AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore()); + dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(), + new ServerTransaction.Action() + { + public void postCommit() + { + if (restoreCredit) + { + restoreCredit(entry.getMessage()); + } + entry.delete(); + } + + public void onRollback() + { + + } + }); + } + + void reject(final MessageInstance entry) + { + entry.setRedelivered(); + entry.routeToAlternate(null, null); + if(entry.isAcquiredBy(getConsumer())) + { + entry.delete(); + } + } + + void release(final MessageInstance entry, final boolean setRedelivered) + { + if (setRedelivered) + { + entry.setRedelivered(); + } + + if (getSessionModel().isClosing() || !setRedelivered) + { + entry.decrementDeliveryCount(); + } + + if (isMaxDeliveryLimitReached(entry)) + { + sendToDLQOrDiscard(entry); + } + else + { + entry.release(); + } + } + + protected void sendToDLQOrDiscard(MessageInstance entry) + { + final LogActor logActor = CurrentActor.get(); + final ServerMessage msg = entry.getMessage(); + + int requeues = entry.routeToAlternate(new Action<MessageInstance>() + { + @Override + public void performAction(final MessageInstance requeueEntry) + { + logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + requeueEntry.getOwningResource().getName())); + } + }, null); + + if (requeues == 0) + { + TransactionLogResource owningResource = entry.getOwningResource(); + if(owningResource instanceof AMQQueue) + { + final AMQQueue queue = (AMQQueue)owningResource; + final Exchange alternateExchange = queue.getAlternateExchange(); + + if(alternateExchange != null) + { + logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + alternateExchange.getName())); + } + else + { + logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + queue.getName(), + msg.getRoutingKey())); + } + } + } + } + + private boolean isMaxDeliveryLimitReached(MessageInstance entry) + { + final int maxDeliveryLimit = entry.getMaximumDeliveryCount(); + return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); + } + + public void queueDeleted() + { + _deleted.set(true); + } + + public boolean allocateCredit(ServerMessage message) + { + return _creditManager.useCreditForMessage(message.getSize()); + } + + public void restoreCredit(ServerMessage message) + { + _creditManager.restoreCredit(1, message.getSize()); + } + + public FlowCreditManager_0_10 getCreditManager() + { + return _creditManager; + } + + + public void stop() + { + try + { + getConsumer().getSendLock(); + + updateState(State.ACTIVE, State.SUSPENDED); + _stopped.set(true); + FlowCreditManager_0_10 creditManager = getCreditManager(); + creditManager.clearCredit(); + } + finally + { + getConsumer().releaseSendLock(); + } + } + + public void addCredit(MessageCreditUnit unit, long value) + { + FlowCreditManager_0_10 creditManager = getCreditManager(); + + switch (unit) + { + case MESSAGE: + + creditManager.addCredit(value, 0L); + break; + case BYTE: + creditManager.addCredit(0l, value); + break; + } + + _stopped.set(false); + + if(creditManager.hasCredit()) + { + updateState(State.SUSPENDED, State.ACTIVE); + } + + } + + public void setFlowMode(MessageFlowMode flowMode) + { + + + _creditManager.removeListener(this); + + switch(flowMode) + { + case CREDIT: + _creditManager = new CreditCreditManager(0l,0l); + break; + case WINDOW: + _creditManager = new WindowCreditManager(0l,0l); + break; + default: + throw new RuntimeException("Unknown message flow mode: " + flowMode); + } + _flowMode = flowMode; + updateState(State.ACTIVE, State.SUSPENDED); + + _creditManager.addStateListener(this); + + } + + public boolean isStopped() + { + return _stopped.get(); + } + + public void acknowledge(MessageInstance entry) + { + // TODO Fix Store Context / cleanup + if(entry.isAcquiredBy(getConsumer())) + { + _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); + _unacknowledgedCount.decrementAndGet(); + entry.delete(); + } + } + + public void flush() throws AMQException + { + flushCreditState(true); + getConsumer().flush(); + stop(); + } + + public ServerSession getSessionModel() + { + return _session; + } + + public boolean isDurable() + { + return false; + } + + public Map<String, Object> getArguments() + { + return _arguments; + } + + public void queueEmpty() + { + } + + public void flushBatched() + { + _session.getConnection().flush(); + } + + + @Override + public void consumerAdded(final Consumer sub) + { + _consumer = sub; + } + + @Override + public void consumerRemoved(final Consumer sub) + { + } + + public long getUnacknowledgedBytes() + { + return _unacknowledgedBytes.longValue(); + } + + public long getUnacknowledgedMessages() + { + return _unacknowledgedCount.longValue(); + } +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index 4b38b8a1a3..4420709a91 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.MessageInstance; class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener @@ -30,21 +30,20 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class); - private final QueueEntry _entry; - private final Subscription_0_10 _sub; + private final MessageInstance _entry; + private final ConsumerTarget_0_10 _target; - public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10) + public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target) { _entry = entry; - _sub = subscription_0_10; + _target = target; } public void onAccept() { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) { - subscription.getSessionModel().acknowledge(subscription, _entry); + _target.getSessionModel().acknowledge(_target, _entry); } else { @@ -55,10 +54,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) { - subscription.release(_entry, setRedelivered); + _target.release(_entry, setRedelivered); } else { @@ -68,10 +66,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) { - subscription.reject(_entry); + _target.reject(_entry); } else { @@ -82,12 +79,8 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - return _entry.acquire(getSubscription()); + return _entry.acquire(_target.getConsumer()); } - private Subscription_0_10 getSubscription() - { - return _sub; - } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java index ce0155b789..c459364dbb 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java @@ -22,20 +22,20 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.MessageInstance; class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener { private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class); - private final QueueEntry _entry; - private Subscription_0_10 _sub; + private final MessageInstance _entry; + private ConsumerTarget_0_10 _target; - public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10) + public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target) { _entry = entry; - _sub = subscription_0_10; + _target = target; } public void onAccept() @@ -45,9 +45,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - if(_entry.isAcquiredBy(_sub)) + if(_entry.isAcquiredBy(_target.getConsumer())) { - getSubscription().release(_entry, setRedelivered); + _target.release(_entry, setRedelivered); } else { @@ -57,9 +57,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - if(_entry.isAcquiredBy(_sub)) + if(_entry.isAcquiredBy(_target.getConsumer())) { - getSubscription().reject(_entry); + _target.reject(_entry); } else { @@ -70,19 +70,15 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - boolean acquired = _entry.acquire(getSubscription()); + boolean acquired = _entry.acquire(_target.getConsumer()); if(acquired) { - getSubscription().recordUnacknowledged(_entry); + _target.recordUnacknowledged(_entry); } return acquired; } - public Subscription_0_10 getSubscription() - { - return _sub; - } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index f5f2a8d43f..cd1146ac0b 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -21,17 +21,17 @@ package org.apache.qpid.server.protocol.v0_10; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.transport.Method; public class MessageAcceptCompletionListener implements Method.CompletionListener { - private final Subscription_0_10 _sub; - private final QueueEntry _entry; + private final ConsumerTarget_0_10 _sub; + private final MessageInstance _entry; private final ServerSession _session; private boolean _restoreCredit; - public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit) + public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) { super(); _sub = sub; @@ -44,9 +44,9 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene { if(_restoreCredit) { - _sub.restoreCredit(_entry); + _sub.restoreCredit(_entry.getMessage()); } - if(_entry.isAcquiredBy(_sub)) + if(_entry.isAcquiredBy(_sub.getConsumer())) { _session.acknowledge(_sub, _entry); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java index 2e74621814..687331e51d 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java @@ -141,7 +141,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData return buf; } - public int writeToBuffer(int offsetInMetaData, ByteBuffer dest) + public int writeToBuffer(ByteBuffer dest) { ByteBuffer buf = _encoded; @@ -153,7 +153,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData buf = buf.duplicate(); - buf.position(offsetInMetaData); + buf.position(0); if(dest.remaining() < buf.limit()) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index a15fea1200..c85a415ce5 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -282,8 +282,8 @@ public class ServerConnectionDelegate extends ServerDelegate private void stopAllSubscriptions(Connection conn, SessionDetach dtc) { final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); - final Collection<Subscription_0_10> subs = ssn.getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subs) + final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subs) { subscription_0_10.stop(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 93d886687c..53022c333e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -46,7 +46,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -55,15 +55,16 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.DistributedTransaction; @@ -77,6 +78,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; import org.slf4j.Logger; @@ -104,14 +106,7 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction() - { - @Override - public void onEnqueue(final QueueEntry entry) - { - entry.getQueue().checkCapacity(ServerSession.this); - } - }; + private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction(); public static interface MessageDispositionChangeListener { @@ -126,12 +121,6 @@ public class ServerSession extends Session } - public static interface Task - { - public void doTask(ServerSession session); - } - - private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); @@ -142,9 +131,9 @@ public class ServerSession extends Session private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); + private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>(); - private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>(); private final TransactionTimeoutHelper _transactionTimeoutHelper; @@ -194,7 +183,7 @@ public class ServerSession extends Session public int enqueue(final MessageTransferMessage message, final InstanceProperties instanceProperties, - final Exchange exchange) + final MessageDestination exchange) { if(_outstandingCredit.get() != UNLIMITED_CREDIT && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) @@ -386,9 +375,9 @@ public class ServerSession extends Session } _messageDispositionListenerMap.clear(); - for (Task task : _taskList) + for (Action<ServerSession> task : _taskList) { - task.doTask(this); + task.performAction(this); } LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get(); @@ -405,9 +394,9 @@ public class ServerSession extends Session // Broker shouldn't block awaiting close - thus do override this method to do nothing } - public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) + public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry) { - _transaction.dequeue(entry.getQueue(), entry.getMessage(), + _transaction.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action() { @@ -426,42 +415,26 @@ public class ServerSession extends Session }); } - public Collection<Subscription_0_10> getSubscriptions() + public Collection<ConsumerTarget_0_10> getSubscriptions() { return _subscriptions.values(); } - public void register(String destination, Subscription_0_10 sub) + public void register(String destination, ConsumerTarget_0_10 sub) { _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub); } - public Subscription_0_10 getSubscription(String destination) + public ConsumerTarget_0_10 getSubscription(String destination) { return _subscriptions.get(destination == null ? NULL_DESTINATION : destination); } - public void unregister(Subscription_0_10 sub) + public void unregister(ConsumerTarget_0_10 sub) { _subscriptions.remove(sub.getName()); - try - { - sub.getSendLock(); - AMQQueue queue = sub.getQueue(); - if(queue != null) - { - queue.unregisterSubscription(sub); - } - } - catch (AMQException e) - { - // TODO - _logger.error("Failed to unregister subscription :" + e.getMessage(), e); - } - finally - { - sub.releaseSendLock(); - } + sub.close(); + } public boolean isTransactional() @@ -638,12 +611,12 @@ public class ServerSession extends Session return getConnection().getAuthorizedSubject(); } - public void addSessionCloseTask(Task task) + public void addSessionCloseTask(Action<ServerSession> task) { _taskList.add(task); } - public void removeSessionCloseTask(Task task) + public void removeSessionCloseTask(Action<ServerSession> task) { _taskList.remove(task); } @@ -829,8 +802,8 @@ public class ServerSession extends Session void unregisterSubscriptions() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) { unregister(subscription_0_10); } @@ -838,8 +811,8 @@ public class ServerSession extends Session void stopSubscriptions() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.stop(); } @@ -848,8 +821,8 @@ public class ServerSession extends Session public void receivedComplete() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.flushCreditState(false); } @@ -955,4 +928,16 @@ public class ServerSession extends Session return getId().compareTo(o.getId()); } + private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>> + { + @Override + public void performAction(final MessageInstance<C> entry) + { + TransactionLogResource queue = entry.getOwningResource(); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(ServerSession.this); + } + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index dcca696529..9a90b74656 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; @@ -34,7 +35,9 @@ import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; @@ -45,6 +48,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; import org.apache.qpid.server.txn.IncorrectDtxStateException; @@ -55,6 +59,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; @@ -193,7 +198,7 @@ public class ServerSessionDelegate extends SessionDelegate String queueName = method.getQueue(); VirtualHost vhost = getVirtualHost(session); - final AMQQueue queue = vhost.getQueue(queueName); + final MessageSource queue = vhost.getMessageSource(queueName); if(queue == null) { @@ -214,9 +219,9 @@ public class ServerSessionDelegate extends SessionDelegate ServerSession s = (ServerSession) session; queue.setExclusiveOwningSession(s); - ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() + ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { if(queue.getExclusiveOwningSession() == session) { @@ -228,9 +233,9 @@ public class ServerSessionDelegate extends SessionDelegate if(queue.getAuthorizationHolder() == null) { queue.setAuthorizationHolder(s); - ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() + ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { if(queue.getAuthorizationHolder() == session) { @@ -254,25 +259,42 @@ public class ServerSessionDelegate extends SessionDelegate return; } - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, - destination, - method.getAcceptMode(), - method.getAcquireMode(), - MessageFlowMode.WINDOW, - creditManager, - filterManager, - method.getArguments()); + ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination, + method.getAcceptMode(), + method.getAcquireMode(), + MessageFlowMode.WINDOW, + creditManager, + method.getArguments() + ); - ((ServerSession)session).register(destination, sub); + ((ServerSession)session).register(destination, target); try { - queue.registerSubscription(sub, method.getExclusive()); + EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); + if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED) + { + options.add(Consumer.Option.ACQUIRES); + } + if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT) + { + options.add(Consumer.Option.SEES_REQUEUES); + } + if(method.getExclusive()) + { + options.add(Consumer.Option.EXCLUSIVE); + } + Consumer sub = + queue.addConsumer(target, + filterManager, + MessageTransferMessage.class, + destination, + options); } - catch (AMQQueue.ExistingExclusiveSubscription existing) + catch (AMQQueue.ExistingExclusiveConsumer existing) { exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer"); } - catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive) + catch (AMQQueue.ExistingConsumerPreventsExclusive exclusive) { exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively"); } @@ -288,7 +310,7 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageTransfer(Session ssn, final MessageTransfer xfr) { - final Exchange exchange = getExchangeForMessage(ssn, xfr); + final MessageDestination exchange = getDestinationForMessage(ssn, xfr); final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration()) @@ -307,7 +329,6 @@ public class ServerSessionDelegate extends SessionDelegate return; } - final Exchange exchangeInUse; final MessageStore store = getVirtualHost(ssn).getMessageStore(); final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); final ServerSession serverSession = (ServerSession) ssn; @@ -385,7 +406,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = method.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -393,12 +414,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - AMQQueue queue = sub.getQueue(); ((ServerSession)session).unregister(sub); - if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0) - { - queue.setAuthorizationHolder(null); - } } } @@ -407,7 +423,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = method.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -814,24 +830,24 @@ public class ServerSessionDelegate extends SessionDelegate return getVirtualHost(session).getExchange(exchangeName); } - private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr) + private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr) { VirtualHost virtualHost = getVirtualHost(ssn); - Exchange exchange; + MessageDestination destination; if(xfr.hasDestination()) { - exchange = virtualHost.getExchange(xfr.getDestination()); - if(exchange == null) + destination = virtualHost.getMessageDestination(xfr.getDestination()); + if(destination == null) { - exchange = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultExchange(); } } else { - exchange = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultExchange(); } - return exchange; + return destination; } private VirtualHost getVirtualHost(Session session) @@ -1249,9 +1265,9 @@ public class ServerSessionDelegate extends SessionDelegate if (autoDelete && exclusive) { final AMQQueue q = queue; - final ServerSession.Task deleteQueueTask = new ServerSession.Task() + final Action<ServerSession> deleteQueueTask = new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { try { @@ -1265,9 +1281,9 @@ public class ServerSessionDelegate extends SessionDelegate }; final ServerSession s = (ServerSession) session; s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) throws AMQException + public void performAction(AMQQueue queue) { s.removeSessionCloseTask(deleteQueueTask); } @@ -1276,9 +1292,9 @@ public class ServerSessionDelegate extends SessionDelegate if (exclusive) { final AMQQueue q = queue; - final ServerSession.Task removeExclusive = new ServerSession.Task() + final Action<ServerSession> removeExclusive = new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { q.setAuthorizationHolder(null); q.setExclusiveOwningSession(null); @@ -1287,9 +1303,9 @@ public class ServerSessionDelegate extends SessionDelegate final ServerSession s = (ServerSession) session; q.setExclusiveOwningSession(s); s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) throws AMQException + public void performAction(AMQQueue queue) { s.removeSessionCloseTask(removeExclusive); } @@ -1461,7 +1477,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = sfm.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1478,7 +1494,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = stop.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1496,7 +1512,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = flow.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java deleted file mode 100644 index 357b565365..0000000000 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ /dev/null @@ -1,944 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.messages.ChannelMessages; -import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Struct; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; - -import java.text.MessageFormat; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject -{ - private final long _subscriptionID; - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - - private static final Option[] BATCHED = new Option[] { Option.BATCH }; - - private final Lock _stateChangeLock = new ReentrantLock(); - - private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private volatile AMQQueue.Context _queueContext; - private final AtomicBoolean _deleted = new AtomicBoolean(false); - - - private FlowCreditManager_0_10 _creditManager; - - private StateListener _stateListener = new StateListener() - { - - public void stateChange(Subscription sub, State oldState, State newState) - { - CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); - } - }; - private AMQQueue _queue; - private final String _destination; - private boolean _noLocal; - private final FilterManager _filters; - private final MessageAcceptMode _acceptMode; - private final MessageAcquireMode _acquireMode; - private MessageFlowMode _flowMode; - private final ServerSession _session; - private final AtomicBoolean _stopped = new AtomicBoolean(true); - private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; - - private LogActor _logActor; - private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - private String _traceExclude; - private String _trace; - private final long _createTime = System.currentTimeMillis(); - private final AtomicLong _deliveredCount = new AtomicLong(0); - private final AtomicLong _deliveredBytes = new AtomicLong(0); - private final AtomicLong _unacknowledgedCount = new AtomicLong(0); - private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - - private final Map<String, Object> _arguments; - private int _deferredMessageCredit; - private long _deferredSizeCredit; - - - public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode, - MessageAcquireMode acquireMode, - MessageFlowMode flowMode, - FlowCreditManager_0_10 creditManager, - FilterManager filters,Map<String, Object> arguments) - { - _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); - _session = session; - _postIdSettingAction = new AddMessageDispositionListenerAction(session); - _destination = destination; - _acceptMode = acceptMode; - _acquireMode = acquireMode; - _creditManager = creditManager; - _flowMode = flowMode; - _filters = filters; - _creditManager.addStateListener(this); - _arguments = arguments == null ? Collections.<String, Object> emptyMap() : - Collections.<String, Object> unmodifiableMap(arguments); - _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED); - - } - - public void setNoLocal(boolean noLocal) - { - _noLocal = noLocal; - } - - public AMQQueue getQueue() - { - return _queue; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public void setQueue(AMQQueue queue, boolean exclusive) - { - if(getQueue() != null) - { - throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); - } - _queue = queue; - - _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES); - _trace = (String) queue.getAttribute(Queue.FEDERATION_ID); - String filterLogString = null; - - _logActor = GenericActor.getInstance(this); - if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, this, SubscriptionMessages.CREATE_LOG_HIERARCHY)) - { - filterLogString = getFilterLogString(); - CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive, - filterLogString.length() > 0)); - } - } - - public String getConsumerName() - { - return _destination; - } - - public boolean isSuspended() - { - return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension - } - - public boolean hasInterest(QueueEntry entry) - { - - - - //check that the message hasn't been rejected - if (entry.isRejectedBy(getSubscriptionID())) - { - - return false; - } - - if (entry.getMessage() instanceof MessageTransferMessage) - { - if(_noLocal) - { - Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference(); - if (connectionRef != null && connectionRef == _session.getReference()) - { - return false; - } - } - } - else - { - // no interest in messages we can't convert - if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), MessageTransferMessage.class)==null) - { - return false; - } - } - - - return checkFilters(entry); - - - } - - private boolean checkFilters(QueueEntry entry) - { - return (_filters == null) || _filters.allAllow(entry.asFilterable()); - } - - public boolean isClosed() - { - return getState() == State.CLOSED; - } - - public boolean isBrowser() - { - return _acquireMode == MessageAcquireMode.NOT_ACQUIRED; - } - - public boolean seesRequeues() - { - return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT; - } - - public void close() - { - boolean closed = false; - State state = getState(); - - _stateChangeLock.lock(); - try - { - while(!closed && state != State.CLOSED) - { - closed = _state.compareAndSet(state, State.CLOSED); - if(!closed) - { - state = getState(); - } - else - { - _stateListener.stateChange(this,state, State.CLOSED); - } - } - _creditManager.removeListener(this); - CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE()); - } - finally - { - _stateChangeLock.unlock(); - } - - - - } - - public Long getDelivered() - { - return _deliveredCount.get(); - } - - public void creditStateChanged(boolean hasCredit) - { - - if(hasCredit) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - else - { - // this is a hack to get round the issue of increasing bytes credit - _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE); - } - } - else - { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - } - } - - - public static class AddMessageDispositionListenerAction implements Runnable - { - private MessageTransfer _xfr; - private ServerSession.MessageDispositionChangeListener _action; - private ServerSession _session; - - public AddMessageDispositionListenerAction(ServerSession session) - { - _session = session; - } - - public void setXfr(MessageTransfer xfr) - { - _xfr = xfr; - } - - public void setAction(ServerSession.MessageDispositionChangeListener action) - { - _action = action; - } - - public void run() - { - if(_action != null) - { - _session.onMessageDispositionChange(_xfr, _action); - } - } - } - - private final AddMessageDispositionListenerAction _postIdSettingAction; - - public void send(final QueueEntry entry, boolean batch) throws AMQException - { - ServerMessage serverMsg = entry.getMessage(); - - - MessageTransfer xfr; - - DeliveryProperties deliveryProps; - MessageProperties messageProps = null; - - MessageTransferMessage msg; - - if(serverMsg instanceof MessageTransferMessage) - { - - msg = (MessageTransferMessage) serverMsg; - - } - else - { - MessageConverter converter = - MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class); - - - msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost()); - } - DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); - messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); - - deliveryProps = new DeliveryProperties(); - if(origDeliveryProps != null) - { - if(origDeliveryProps.hasDeliveryMode()) - { - deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode()); - } - if(origDeliveryProps.hasExchange()) - { - deliveryProps.setExchange(origDeliveryProps.getExchange()); - } - if(origDeliveryProps.hasExpiration()) - { - deliveryProps.setExpiration(origDeliveryProps.getExpiration()); - } - if(origDeliveryProps.hasPriority()) - { - deliveryProps.setPriority(origDeliveryProps.getPriority()); - } - if(origDeliveryProps.hasRoutingKey()) - { - deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey()); - } - if(origDeliveryProps.hasTimestamp()) - { - deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); - } - if(origDeliveryProps.hasTtl()) - { - deliveryProps.setTtl(origDeliveryProps.getTtl()); - } - - - } - - deliveryProps.setRedelivered(entry.isRedelivered()); - - if(_trace != null && messageProps == null) - { - messageProps = new MessageProperties(); - } - - Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); - - - xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) - : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody()); - - boolean excludeDueToFederation = false; - - if(_trace != null) - { - if(!messageProps.hasApplicationHeaders()) - { - messageProps.setApplicationHeaders(new HashMap<String,Object>()); - } - Map<String,Object> appHeaders = messageProps.getApplicationHeaders(); - String trace = (String) appHeaders.get("x-qpid.trace"); - if(trace == null) - { - trace = _trace; - } - else - { - if(_traceExclude != null) - { - excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude); - } - trace+=","+_trace; - } - appHeaders.put("x-qpid.trace",trace); - } - - if(!excludeDueToFederation) - { - if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) - { - xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); - } - else if(_flowMode == MessageFlowMode.WINDOW) - { - xfr.setCompletionListener(new Method.CompletionListener() - { - public void onComplete(Method method) - { - deferredAddCredit(1, entry.getSize()); - } - }); - } - - - _postIdSettingAction.setXfr(xfr); - if(_acceptMode == MessageAcceptMode.EXPLICIT) - { - _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this)); - } - else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED) - { - _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this)); - } - else - { - _postIdSettingAction.setAction(null); - } - - - _session.sendMessage(xfr, _postIdSettingAction); - entry.incrementDeliveryCount(); - _deliveredCount.incrementAndGet(); - _deliveredBytes.addAndGet(entry.getSize()); - if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) - { - forceDequeue(entry, false); - } - else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED) - { - recordUnacknowledged(entry); - } - } - else - { - forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW); - - } - } - - void recordUnacknowledged(QueueEntry entry) - { - _unacknowledgedCount.incrementAndGet(); - _unacknowledgedBytes.addAndGet(entry.getSize()); - } - - private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) - { - _deferredMessageCredit += deferredMessageCredit; - _deferredSizeCredit += deferredSizeCredit; - - } - - public void flushCreditState(boolean strict) - { - if(strict || !isSuspended() || _deferredMessageCredit >= 200 - || !(_creditManager instanceof WindowCreditManager) - || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 ) - { - _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit); - _deferredMessageCredit = 0; - _deferredSizeCredit = 0l; - } - } - - private void forceDequeue(final QueueEntry entry, final boolean restoreCredit) - { - AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); - dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(), - new ServerTransaction.Action() - { - public void postCommit() - { - if (restoreCredit) - { - restoreCredit(entry); - } - entry.delete(); - } - - public void onRollback() - { - - } - }); - } - - void reject(final QueueEntry entry) - { - entry.setRedelivered(); - entry.routeToAlternate(null, null); - if(entry.isAcquiredBy(this)) - { - entry.delete(); - } - } - - void release(final QueueEntry entry, final boolean setRedelivered) - { - if (setRedelivered) - { - entry.setRedelivered(); - } - - if (getSessionModel().isClosing() || !setRedelivered) - { - entry.decrementDeliveryCount(); - } - - if (isMaxDeliveryLimitReached(entry)) - { - sendToDLQOrDiscard(entry); - } - else - { - entry.release(); - } - } - - protected void sendToDLQOrDiscard(QueueEntry entry) - { - final LogActor logActor = CurrentActor.get(); - final ServerMessage msg = entry.getMessage(); - - int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction() - { - @Override - public void onEnqueue(final QueueEntry requeueEntry) - { - logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), - requeueEntry.getQueue().getName())); - } - }, null); - - if (requeues == 0) - { - final AMQQueue queue = entry.getQueue(); - final Exchange alternateExchange = queue.getAlternateExchange(); - - if(alternateExchange != null) - { - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), - alternateExchange.getName())); - } - else - { - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), - queue.getName(), - msg.getRoutingKey())); - } - } - } - - private boolean isMaxDeliveryLimitReached(QueueEntry entry) - { - final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); - return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); - } - - public void queueDeleted(AMQQueue queue) - { - _deleted.set(true); - } - - public boolean wouldSuspend(QueueEntry entry) - { - return !_creditManager.useCreditForMessage(entry.getMessage().getSize()); - } - - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public void restoreCredit(QueueEntry queueEntry) - { - _creditManager.restoreCredit(1, queueEntry.getSize()); - } - - public void onDequeue(QueueEntry queueEntry) - { - // no-op for 0-10, credit restored by completing command. - } - - public void releaseQueueEntry(QueueEntry queueEntry) - { - // no-op for 0-10, credit restored by completing command. - } - - public void setStateListener(StateListener listener) - { - _stateListener = listener; - } - - public State getState() - { - return _state.get(); - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context queueContext) - { - _queueContext = queueContext; - } - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public void set(String key, Object value) - { - _properties.put(key, value); - } - - public Object get(String key) - { - return _properties.get(key); - } - - - public FlowCreditManager_0_10 getCreditManager() - { - return _creditManager; - } - - - public void stop() - { - try - { - getSendLock(); - - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - _stopped.set(true); - FlowCreditManager_0_10 creditManager = getCreditManager(); - creditManager.clearCredit(); - } - finally - { - releaseSendLock(); - } - } - - public void addCredit(MessageCreditUnit unit, long value) - { - FlowCreditManager_0_10 creditManager = getCreditManager(); - - switch (unit) - { - case MESSAGE: - - creditManager.addCredit(value, 0L); - break; - case BYTE: - creditManager.addCredit(0l, value); - break; - } - - _stopped.set(false); - - if(creditManager.hasCredit()) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - } - - } - - public void setFlowMode(MessageFlowMode flowMode) - { - - - _creditManager.removeListener(this); - - switch(flowMode) - { - case CREDIT: - _creditManager = new CreditCreditManager(0l,0l); - break; - case WINDOW: - _creditManager = new WindowCreditManager(0l,0l); - break; - default: - throw new RuntimeException("Unknown message flow mode: " + flowMode); - } - _flowMode = flowMode; - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - - _creditManager.addStateListener(this); - - } - - public boolean isStopped() - { - return _stopped.get(); - } - - public boolean acquires() - { - return _acquireMode == MessageAcquireMode.PRE_ACQUIRED; - } - - public void acknowledge(QueueEntry entry) - { - // TODO Fix Store Context / cleanup - if(entry.isAcquiredBy(this)) - { - _unacknowledgedBytes.addAndGet(-entry.getSize()); - _unacknowledgedCount.decrementAndGet(); - entry.delete(); - } - } - - public void flush() throws AMQException - { - flushCreditState(true); - _queue.flushSubscription(this); - stop(); - } - - public long getSubscriptionID() - { - return _subscriptionID; - } - - public LogActor getLogActor() - { - return _logActor; - } - - public boolean isTransient() - { - return false; - } - - public ServerSession getSessionModel() - { - return _session; - } - - public boolean isBrowsing() - { - return _acquireMode == MessageAcquireMode.NOT_ACQUIRED; - } - - public boolean isExclusive() - { - return getQueue().hasExclusiveSubscriber(); - } - - public boolean isDurable() - { - return false; - } - - - public boolean isExplicitAcknowledge() - { - return _acceptMode == MessageAcceptMode.EXPLICIT; - } - - public String getCreditMode() - { - return _flowMode.toString(); - } - - public String getName() - { - return _destination; - } - - public Map<String, Object> getArguments() - { - return _arguments; - } - - public boolean isSessionTransactional() - { - return _session.isTransactional(); - } - - public void queueEmpty() - { - } - - public long getCreateTime() - { - return _createTime; - } - - public String toLogString() - { - String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), - _queue.getName()); - String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "(" - // queueString is "vh(/{0})/qu({1}) " so need to trim - + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] "; - return result; - } - - private String getFilterLogString() - { - StringBuilder filterLogString = new StringBuilder(); - String delimiter = ", "; - boolean hasEntries = false; - if (_filters != null && _filters.hasFilters()) - { - filterLogString.append(_filters.toString()); - hasEntries = true; - } - - if (isBrowser()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Browser"); - hasEntries = true; - } - - if (isDurable()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Durable"); - hasEntries = true; - } - - return filterLogString.toString(); - } - - public LogSubject getLogSubject() - { - return (LogSubject) this; - } - - - public void flushBatched() - { - _session.getConnection().flush(); - } - - public long getBytesOut() - { - return _deliveredBytes.longValue(); - } - - public long getMessagesOut() - { - return _deliveredCount.longValue(); - } - - public long getUnacknowledgedBytes() - { - return _unacknowledgedBytes.longValue(); - } - - public long getUnacknowledgedMessages() - { - return _unacknowledgedCount.longValue(); - } -} |
