diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2011-12-28 13:02:41 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2011-12-28 13:02:41 +0000 |
| commit | 55ccbf149980b06c7b7effa36871ffbdf50550fa (patch) | |
| tree | f5fc6181438968f82af0528c751af32ea8fef64e /qpid/java/broker | |
| parent | f085f3b0ce89af428e75bf2ae3b8c65ecdd16ad6 (diff) | |
| download | qpid-python-55ccbf149980b06c7b7effa36871ffbdf50550fa.tar.gz | |
QPID-3714 : [Java] Performance Improvements
Persistence:
Store message in same transaction as enqueue if possible
Memory:
Remove unnecessary (un)boxing
Reduce unnecessary copying of message data
Cache short strings
Cache queues for a given routing key on an Exchange
(0-9) Use a fixed size buffer for preparing frames to write out
Other:
Reduce calls to System.currentTimeMillis
(0-10) Special case immutable RangeSets, in particular RangeSets of a single range/point
(0-10) Special case delivery properties and message properties in headers
(0-9) send commit-ok as soon as data committed to store
Cache publishing access control queries
(0-9) Optimised long and int typed values for FieldTables
(0-9) Retain FieldTable encoded form
(0-9) Cache queue and topic destinations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1225178 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
87 files changed, 2660 insertions, 2288 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java index 593c1616fb..b898e85aa2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java @@ -111,6 +111,11 @@ public class ManagementExchange implements Exchange, QMFService.Listener } + public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException + { + enqueue(message); + } + public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException { enqueue(message); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java index b98daf7cb1..9ee8b923fc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java @@ -32,6 +32,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.management.common.mbeans.ManagedConnection; import java.util.ArrayList; +import java.util.List; public class QMFBrokerRequestCommand extends QMFCommand { @@ -57,7 +58,7 @@ public class QMFBrokerRequestCommand extends QMFCommand QMFMessage responseMessage = new QMFMessage(queueName, cmd); - ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + List<? extends BaseQueue> queues = exchange.route(responseMessage); for(BaseQueue q : queues) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java index 26a27cfa19..5d2717a9fb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java @@ -31,6 +31,7 @@ import org.apache.qpid.AMQException; import java.util.ArrayList; import java.util.Collection; +import java.util.List; public class QMFClassQueryCommand extends QMFCommand { @@ -71,7 +72,7 @@ public class QMFClassQueryCommand extends QMFCommand Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); - ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + List<? extends BaseQueue> queues = exchange.route(responseMessage); for(BaseQueue q : queues) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java index 8e8cb55a0d..ff927a1de9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java @@ -163,7 +163,7 @@ public class QMFGetQueryCommand extends QMFCommand Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); - ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + List<? extends BaseQueue> queues = exchange.route(responseMessage); for(BaseQueue q : queues) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java index 895ff643a2..8208525b08 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java @@ -21,8 +21,11 @@ package org.apache.qpid.qmf; +import org.apache.commons.lang.NotImplementedException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.message.*; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.transport.codec.BBEncoder; import java.nio.ByteBuffer; @@ -59,11 +62,21 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead return _routingKey; } + public AMQShortString getRoutingKeyShortString() + { + return AMQShortString.valueOf(_routingKey); + } + public AMQMessageHeader getMessageHeader() { return this; } + public StoredMessage getStoredMessage() + { + throw new NotImplementedException(); + } + public boolean isPersistent() { return false; @@ -159,9 +172,9 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead return new QMFMessageReference(this); } - public Long getMessageNumber() + public long getMessageNumber() { - return null; + return 0l; } public long getArrivalTime() @@ -172,9 +185,9 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead public int getContent(ByteBuffer buf, int offset) { ByteBuffer src = _content.duplicate(); - _content.position(offset); - _content = _content.slice(); - int len = _content.remaining(); + src.position(offset); + src = src.slice(); + int len = src.remaining(); if(len > buf.remaining()) { len = buf.remaining(); @@ -185,6 +198,16 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead return len; } + + public ByteBuffer getContent(int offset, int size) + { + ByteBuffer src = _content.duplicate(); + src.position(offset); + src = src.slice(); + src.limit(size); + return src; + } + private static class QMFMessageReference extends MessageReference<QMFMessage> { public QMFMessageReference(QMFMessage message) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java index cf27e4b970..37c16efec5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.AMQException; +import java.util.List; import java.util.UUID; import java.util.ArrayList; @@ -68,7 +69,7 @@ public class QMFMethodRequestCommand extends QMFCommand QMFMessage responseMessage = new QMFMessage(queueName, cmd); - ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + List<? extends BaseQueue> queues = exchange.route(responseMessage); for(BaseQueue q : queues) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java index 6defd088de..ed07457a23 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java @@ -31,6 +31,7 @@ import org.apache.qpid.AMQException; import java.util.ArrayList; import java.util.Collection; +import java.util.List; public class QMFPackageQueryCommand extends QMFCommand { @@ -67,7 +68,7 @@ public class QMFPackageQueryCommand extends QMFCommand Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); - ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + List<? extends BaseQueue> queues = exchange.route(responseMessage); for(BaseQueue q : queues) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java index 3141676f10..850ffa8610 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java @@ -31,6 +31,7 @@ import org.apache.qpid.AMQException; import java.util.Collection; import java.util.ArrayList; +import java.util.List; public class QMFSchemaRequestCommand extends QMFCommand { @@ -70,7 +71,7 @@ public class QMFSchemaRequestCommand extends QMFCommand QMFMessage responseMessage = new QMFMessage(routingKey, cmd); - ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + List<? extends BaseQueue> queues = exchange.route(responseMessage); for(BaseQueue q : queues) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index a4fd997568..82ac01cea8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -95,7 +95,7 @@ import java.util.concurrent.atomic.AtomicLong; public class AMQChannel implements SessionConfig, AMQSessionModel { - public static final int DEFAULT_PREFETCH = 5000; + public static final int DEFAULT_PREFETCH = 4096; private static final Logger _logger = Logger.getLogger(AMQChannel.class); @@ -166,6 +166,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final UUID _id; private long _createTime = System.currentTimeMillis(); + private final ClientDeliveryMethod _clientDeliveryMethod; + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { @@ -183,6 +185,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel // by default the session is non-transactional _transaction = new AutoCommitTransaction(_messageStore); + + _clientDeliveryMethod = session.createDeliveryMethod(_channelId); } public ConfigStore getConfigStore() @@ -205,6 +209,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel return !(_transaction instanceof AutoCommitTransaction); } + public void receivedComplete() + { + } + + public boolean inTransaction() { return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; @@ -284,7 +293,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _currentMessage.setExpiration(); - MessageMetaData mmd = _currentMessage.headersReceived(); + MessageMetaData mmd = _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime()); final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd); _currentMessage.setStoredMessage(handle); @@ -316,8 +325,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { try { - _currentMessage.getStoredMessage().flushToStore(); - final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues(); + final List<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues(); if(!checkMessageUserId(_currentMessage.getContentHeader())) { @@ -339,11 +347,13 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } else { - _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional())); + _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime()); incrementOutstandingTxnsIfNecessary(); updateTransactionalActivity(); } } + _currentMessage.getStoredMessage().flushToStore(); + } finally { @@ -857,10 +867,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) { - Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>(); - _unacknowledgedMessageMap.collect(deliveryTag, multiple, ackedMessageMap); - _unacknowledgedMessageMap.remove(ackedMessageMap); - return ackedMessageMap.values(); + return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple); + } /** @@ -949,12 +957,17 @@ public class AMQChannel implements SessionConfig, AMQSessionModel public void commit() throws AMQException { + commit(null); + } + public void commit(Runnable immediateAction) throws AMQException + { + if (!isTransactional()) { throw new AMQException("Fatal error: commit called on non-transactional channel"); } - _transaction.commit(); + _transaction.commit(immediateAction); _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); @@ -1033,7 +1046,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { if (isTransactional()) { - _txnUpdateTime.set(System.currentTimeMillis()); + _txnUpdateTime.set(getProtocolSession().getLastReceivedTime()); } } @@ -1079,20 +1092,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel return _messageStore; } - private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod() - { - - public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) - throws AMQException - { - _session.registerMessageDelivered(entry.getMessage().getSize()); - getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), - deliveryTag, sub.getConsumerTag()); - entry.incrementDeliveryCount(); - } - - }; - public ClientDeliveryMethod getClientDeliveryMethod() { return _clientDeliveryMethod; @@ -1158,11 +1157,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private class MessageDeliveryAction implements ServerTransaction.Action { private IncomingMessage _incommingMessage; - private ArrayList<? extends BaseQueue> _destinationQueues; + private List<? extends BaseQueue> _destinationQueues; public MessageDeliveryAction(IncomingMessage currentMessage, - ArrayList<? extends BaseQueue> destinationQueues, - boolean transactional) + List<? extends BaseQueue> destinationQueues) { _incommingMessage = currentMessage; _destinationQueues = destinationQueues; @@ -1177,8 +1175,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel final AMQMessage amqMessage = createAMQMessage(_incommingMessage); MessageReference ref = amqMessage.newReference(); - for(final BaseQueue queue : _destinationQueues) + for(int i = 0; i < _destinationQueues.size(); i++) { + BaseQueue queue = _destinationQueues.get(i); + BaseQueue.PostEnqueueAction action; if(immediate) @@ -1190,7 +1190,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel action = null; } - queue.enqueue(amqMessage, action); + queue.enqueue(amqMessage, isTransactional(), action); if(queue instanceof AMQQueue) { @@ -1198,6 +1198,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } } + + _incommingMessage.getStoredMessage().flushToStore(); ref.release(); } catch (AMQException e) @@ -1539,7 +1541,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry); - final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m); + final List<? extends BaseQueue> destinationQueues = altExchange.route(m); if (destinationQueues == null || destinationQueues.isEmpty()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java index 9da02e0600..9765636c25 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java @@ -22,8 +22,8 @@ package org.apache.qpid.server; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.AMQException; @@ -39,13 +39,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor private final Map<Long, QueueEntry> _msgToResend; private final boolean _requeueIfUnabletoResend; private final UnacknowledgedMessageMap _unacknowledgedMessageMap; - private final TransactionLog _transactionLog; + private final MessageStore _transactionLog; public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap, Map<Long, QueueEntry> msgToRequeue, Map<Long, QueueEntry> msgToResend, boolean requeueIfUnabletoResend, - TransactionLog txnLog) + MessageStore txnLog) { _unacknowledgedMessageMap = unacknowledgedMessageMap; _msgToRequeue = msgToRequeue; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java index 3bad73d86d..f4b4932744 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java @@ -46,10 +46,6 @@ public interface UnacknowledgedMessageMap void add(long deliveryTag, QueueEntry message); - void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs); - - void remove(Map<Long,QueueEntry> msgs); - QueueEntry remove(long deliveryTag); Collection<QueueEntry> cancelAllMessages(); @@ -67,6 +63,8 @@ public interface UnacknowledgedMessageMap */ Set<Long> getDeliveryTags(); + Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index d920d97c1a..6a5d863526 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -157,6 +157,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } + public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple) + { + Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>(); + collect(deliveryTag, multiple, ackedMessageMap); + remove(ackedMessageMap); + return ackedMessageMap.values(); + } + private void collect(long key, Map<Long, QueueEntry> msgs) { synchronized (_lock) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java index 60c9a86b76..48f85d9bc9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java @@ -115,4 +115,9 @@ public class Binding return result; } + public String toString() + { + return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+"}"; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index d693c6962b..5ff90b3499 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -351,11 +351,11 @@ public abstract class AbstractExchange implements Exchange, Managable - public final ArrayList<? extends BaseQueue> route(final InboundMessage message) + public final List<? extends BaseQueue> route(final InboundMessage message) { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); - final ArrayList<? extends BaseQueue> queues = doRoute(message); + final List<? extends BaseQueue> queues = doRoute(message); if(!queues.isEmpty()) { _routedMessageCount.incrementAndGet(); @@ -364,7 +364,7 @@ public abstract class AbstractExchange implements Exchange, Managable return queues; } - protected abstract ArrayList<? extends BaseQueue> doRoute(final InboundMessage message); + protected abstract List<? extends BaseQueue> doRoute(final InboundMessage message); public long getMsgReceives() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index cb0d8ecf8f..8c0a5001db 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -34,6 +34,8 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -41,8 +43,52 @@ public class DirectExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(DirectExchange.class); - private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey = - new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>(); + private static final class BindingSet + { + private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>(); + private List<BaseQueue> _queues = new ArrayList<BaseQueue>(); + + public synchronized void addBinding(Binding binding) + { + _bindings.add(binding); + recalculateQueues(); + } + + + public synchronized void removeBinding(Binding binding) + { + _bindings.remove(binding); + recalculateQueues(); + } + + private void recalculateQueues() + { + List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size()); + + for(Binding b : _bindings) + { + if(!queues.contains(b.getQueue())) + { + queues.add(b.getQueue()); + } + } + _queues = queues; + } + + + public List<BaseQueue> getQueues() + { + return _queues; + } + + public CopyOnWriteArraySet<Binding> getBindings() + { + return _bindings; + } + } + + private final ConcurrentHashMap<String, BindingSet> _bindingsByKey = + new ConcurrentHashMap<String, BindingSet>(); public static final ExchangeType<DirectExchange> TYPE = new ExchangeType<DirectExchange>() { @@ -91,33 +137,20 @@ public class DirectExchange extends AbstractExchange } - public ArrayList<? extends BaseQueue> doRoute(InboundMessage payload) + public List<? extends BaseQueue> doRoute(InboundMessage payload) { final String routingKey = payload.getRoutingKey(); - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey); + BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey); if(bindings != null) { - final ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(bindings.size()); - - for(Binding binding : bindings) - { - queues.add(binding.getQueue()); - binding.incrementMatches(); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("Publishing message to queue " + queues); - } - - return queues; + return bindings.getQueues(); } else { - return new ArrayList<BaseQueue>(0); + return Collections.emptyList(); } @@ -132,16 +165,10 @@ public class DirectExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) { String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); + BindingSet bindings = _bindingsByKey.get(bindingKey); if(bindings != null) { - for(Binding binding : bindings) - { - if(binding.getQueue().equals(queue)) - { - return true; - } - } + return bindings.getQueues().contains(queue); } return false; @@ -150,22 +177,20 @@ public class DirectExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey) { String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); - return bindings != null && !bindings.isEmpty(); + BindingSet bindings = _bindingsByKey.get(bindingKey); + return bindings != null && !bindings.getQueues().isEmpty(); } public boolean isBound(AMQQueue queue) { - for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values()) + for (BindingSet bindings : _bindingsByKey.values()) { - for(Binding binding : bindings) + if(bindings.getQueues().contains(queue)) { - if(binding.getQueue().equals(queue)) - { - return true; - } + return true; } + } return false; } @@ -184,19 +209,19 @@ public class DirectExchange extends AbstractExchange assert queue != null; assert routingKey != null; - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); + BindingSet bindings = _bindingsByKey.get(bindingKey); if(bindings == null) { - bindings = new CopyOnWriteArraySet<Binding>(); - CopyOnWriteArraySet<Binding> newBindings; + bindings = new BindingSet(); + BindingSet newBindings; if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null) { bindings = newBindings; } } - bindings.add(binding); + bindings.addBinding(binding); } @@ -204,10 +229,10 @@ public class DirectExchange extends AbstractExchange { assert binding != null; - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(binding.getBindingKey()); + BindingSet bindings = _bindingsByKey.get(binding.getBindingKey()); if(bindings != null) { - bindings.remove(binding); + bindings.removeBinding(binding); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 29a3611709..29c354feae 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.configuration.ExchangeConfig; import javax.management.JMException; import java.util.ArrayList; import java.util.Collection; +import java.util.List; public interface Exchange extends ExchangeReferrer, ExchangeConfig { @@ -70,7 +71,7 @@ public interface Exchange extends ExchangeReferrer, ExchangeConfig * * @return list of queues to which to route the message. */ - ArrayList<? extends BaseQueue> route(InboundMessage message); + List<? extends BaseQueue> route(InboundMessage message); /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index e523eb24fb..3a8a86e654 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -37,8 +37,11 @@ import org.apache.qpid.server.filter.JMSSelectorFilter; import org.apache.qpid.server.message.InboundMessage; import javax.management.JMException; +import java.sql.Array; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + import java.lang.ref.WeakReference; public class TopicExchange extends AbstractExchange @@ -77,8 +80,6 @@ public class TopicExchange extends AbstractExchange private static final Logger _logger = Logger.getLogger(TopicExchange.class); - - private final TopicParser _parser = new TopicParser(); private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults = @@ -175,7 +176,6 @@ public class TopicExchange extends AbstractExchange _bindings.put(binding, args); } - } private JMSSelectorFilter createSelectorFilter(final FieldTable args) throws AMQInvalidArgumentException @@ -201,14 +201,23 @@ public class TopicExchange extends AbstractExchange public ArrayList<BaseQueue> doRoute(InboundMessage payload) { - final AMQShortString routingKey = payload.getRoutingKey() == null + final AMQShortString routingKey = payload.getRoutingKeyShortString() == null ? AMQShortString.EMPTY_STRING - : new AMQShortString(payload.getRoutingKey()); + : payload.getRoutingKeyShortString(); + + final Collection<AMQQueue> matchedQueues = getMatchedQueues(payload, routingKey); - // The copy here is unfortunate, but not too bad relevant to the amount of - // things created and copied in getMatchedQueues - ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(); - queues.addAll(getMatchedQueues(payload, routingKey)); + ArrayList<BaseQueue> queues; + + if(matchedQueues.getClass() == ArrayList.class) + { + queues = (ArrayList) matchedQueues; + } + else + { + queues = new ArrayList<BaseQueue>(); + queues.addAll(matchedQueues); + } if(queues == null || queues.isEmpty()) { @@ -325,25 +334,28 @@ public class TopicExchange extends AbstractExchange { Collection<TopicMatcherResult> results = _parser.parse(routingKey); - if(results.isEmpty()) + switch(results.size()) { - return Collections.EMPTY_SET; - } - else - { - Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>(); - for(TopicMatcherResult result : results) - { - TopicExchangeResult res = (TopicExchangeResult)result; - - for(Binding b : res.getBindings()) + case 0: + return Collections.EMPTY_SET; + case 1: + TopicMatcherResult[] resultQueues = new TopicMatcherResult[1]; + results.toArray(resultQueues); + return ((TopicExchangeResult)resultQueues[0]).processMessage(message, null); + default: + Collection<AMQQueue> queues = new HashSet<AMQQueue>(); + for(TopicMatcherResult result : results) { - b.incrementMatches(); + TopicExchangeResult res = (TopicExchangeResult)result; + + for(Binding b : res.getBindings()) + { + b.incrementMatches(); + } + + queues = res.processMessage(message, queues); } - - queues = res.processMessage(message, queues); - } - return queues; + return queues; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java index 41dc0d749a..d8b09a7841 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java @@ -39,6 +39,7 @@ public final class TopicExchangeResult implements TopicMatcherResult private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>(); private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>(); private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>(); + private volatile ArrayList<AMQQueue> _unfilteredQueueList = new ArrayList<AMQQueue>(0); public void addUnfilteredQueue(AMQQueue queue) { @@ -46,6 +47,9 @@ public final class TopicExchangeResult implements TopicMatcherResult if(instances == null) { _unfilteredQueues.put(queue, 1); + ArrayList<AMQQueue> newList = new ArrayList<AMQQueue>(_unfilteredQueueList); + newList.add(queue); + _unfilteredQueueList = newList; } else { @@ -59,6 +63,10 @@ public final class TopicExchangeResult implements TopicMatcherResult if(instances == 1) { _unfilteredQueues.remove(queue); + ArrayList<AMQQueue> newList = new ArrayList<AMQQueue>(_unfilteredQueueList); + newList.remove(queue); + _unfilteredQueueList = newList; + } else { @@ -166,7 +174,7 @@ public final class TopicExchangeResult implements TopicMatcherResult { if(_filteredQueues.isEmpty()) { - return new ArrayList<AMQQueue>(_unfilteredQueues.keySet()); + return _unfilteredQueueList; } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java index 36076cf75b..4446536d4c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java @@ -77,7 +77,7 @@ public class TopicMatcherDFAState } if(nextState == null) { - return Collections.EMPTY_SET; + return Collections.EMPTY_LIST; } // Shortcut if we are at a looping terminal state if((nextState == this) && (_nextStateMap.size() == 1) && _nextStateMap.containsKey(TopicWord.ANY_WORD)) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java index 2dff45c326..1c1527aa31 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java @@ -44,27 +44,9 @@ import org.apache.qpid.server.transport.ServerSession; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageReject; -import org.apache.qpid.transport.MessageRejectCode; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import org.apache.qpid.transport.*; + +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -365,7 +347,8 @@ public class Bridge implements BridgeConfig // TODO - deal with exchange not existing DeliveryProperties delvProps = null; - if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) + if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && + !delvProps.hasExpiration()) { delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); } @@ -377,7 +360,7 @@ public class Bridge implements BridgeConfig storeMessage.flushToStore(); MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)_session).getReference()); - ArrayList<? extends BaseQueue> queues = exchange.route(message); + List<? extends BaseQueue> queues = exchange.route(message); @@ -391,7 +374,7 @@ public class Bridge implements BridgeConfig { if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) { - RangeSet rejects = new RangeSet(); + RangeSet rejects = RangeSetFactory.createRangeSet(); rejects.add(xfr.getId()); MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); ssn.invoke(reject); @@ -428,7 +411,7 @@ public class Bridge implements BridgeConfig } - private void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues) + private void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues) { _transaction.enqueue(queues,message, new ServerTransaction.Action() { @@ -456,8 +439,7 @@ public class Bridge implements BridgeConfig { // NO-OP } - }); - + }, 0L); } public void exception(final Session session, final SessionException exception) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java index cfe5aedd61..a77ed5700a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.flow; +import java.util.ArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.Set; import java.util.HashSet; @@ -27,13 +28,16 @@ import java.util.HashSet; public abstract class AbstractFlowCreditManager implements FlowCreditManager { protected final AtomicBoolean _suspended = new AtomicBoolean(false); - private final Set<FlowCreditManagerListener> _listeners = new HashSet<FlowCreditManagerListener>(); + private final ArrayList<FlowCreditManagerListener> _listeners = new ArrayList<FlowCreditManagerListener>(); public final void addStateListener(FlowCreditManagerListener listener) { synchronized(_listeners) { - _listeners.add(listener); + if(!_listeners.contains(listener)) + { + _listeners.add(listener); + } } } @@ -49,9 +53,10 @@ public abstract class AbstractFlowCreditManager implements FlowCreditManager { synchronized(_listeners) { - for(FlowCreditManagerListener listener : _listeners) + final int size = _listeners.size(); + for(int i = 0; i<size; i++) { - listener.creditStateChanged(!suspended); + _listeners.get(i).creditStateChanged(!suspended); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 765dee2878..8875f21d0b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -52,7 +52,6 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); AMQChannel channel = protocolConnection.getChannel(channelId); - VirtualHost vHost = protocolConnection.getVirtualHost(); if (channel == null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 9133cce6b7..32aa99534b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -65,7 +65,6 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos { throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel"); } - session.closeChannel(channelId); // Client requested closure so we don't wait for ok we send it stateManager.getProtocolSession().closeChannelOk(channelId); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index 696ca8a63b..5ccaa49de8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -55,7 +55,6 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB { throw body.getChannelNotFoundException(channelId); } - channel.setSuspended(!body.getActive()); _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index f8e4eab0b6..6eaba87b79 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -49,7 +49,6 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - if (_logger.isInfoEnabled()) { _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index ccd42204d9..21aea1510b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -65,7 +65,6 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); MethodRegistry methodRegistry = session.getMethodRegistry(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 0cfed77f2e..693b316607 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -85,6 +85,13 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar //TODO: do we need to check that the queue already exists with exactly the same "configuration"? + AMQChannel channel = protocolConnection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + synchronized (queueRegistry) { queue = queueRegistry.getQueue(queueName); @@ -183,12 +190,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } - AMQChannel channel = protocolConnection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId); - } //set this as the default queue on the channel: channel.setDefaultQueue(queue); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index da52268e52..902e3ade85 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -64,15 +64,17 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + + AMQChannel channel = protocolConnection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + AMQQueue queue; if (body.getQueue() == null) { - AMQChannel channel = protocolConnection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId); - } //get the default queue on the channel: queue = channel.getDefaultQueue(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index 759eec0129..6c3e11be5b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -63,17 +63,14 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); AMQChannel channel = protocolConnection.getChannel(channelId); - - + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } AMQQueue queue; if(body.getQueue() == null) { - if (channel == null) - { - throw body.getChannelNotFoundException(channelId); - } - //get the default queue on the channel: queue = channel.getDefaultQueue(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java index f2119f7faa..3849c5af19 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java @@ -66,14 +66,15 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB final AMQQueue queue; final AMQShortString routingKey; - if (body.getQueue() == null) + + AMQChannel channel = session.getChannel(channelId); + if (channel == null) { - AMQChannel channel = session.getChannel(channelId); + throw body.getChannelNotFoundException(channelId); + } - if (channel == null) - { - throw body.getChannelNotFoundException(channelId); - } + if (body.getQueue() == null) + { queue = channel.getDefaultQueue(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index 32bf8aa17d..b284514186 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -37,7 +37,7 @@ import java.nio.ByteBuffer; /** * A deliverable message. */ -public class AMQMessage extends AbstractServerMessageImpl +public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -62,10 +62,6 @@ public class AMQMessage extends AbstractServerMessageImpl private Object _sessionIdentifier; private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); - private final StoredMessage<MessageMetaData> _handle; - - WeakReference<AMQChannel> _channelRef; - public AMQMessage(StoredMessage<MessageMetaData> handle) { this(handle, null); @@ -75,7 +71,7 @@ public class AMQMessage extends AbstractServerMessageImpl { super(handle); - _handle = handle; + final MessageMetaData metaData = handle.getMetaData(); _size = metaData.getContentSize(); final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo(); @@ -84,8 +80,6 @@ public class AMQMessage extends AbstractServerMessageImpl { _flags |= IMMEDIATE; } - - _channelRef = channelRef; } public void setExpiration(final long expiration) @@ -97,7 +91,7 @@ public class AMQMessage extends AbstractServerMessageImpl public MessageMetaData getMessageMetaData() { - return _handle.getMetaData(); + return getStoredMessage().getMetaData(); } public ContentHeaderBody getContentHeaderBody() throws AMQException @@ -107,7 +101,7 @@ public class AMQMessage extends AbstractServerMessageImpl public Long getMessageId() { - return _handle.getMessageNumber(); + return getStoredMessage().getMessageNumber(); } /** @@ -219,9 +213,9 @@ public class AMQMessage extends AbstractServerMessageImpl return new AMQMessageReference(this); } - public Long getMessageNumber() + public long getMessageNumber() { - return getMessageId(); + return getStoredMessage().getMessageNumber(); } @@ -248,16 +242,13 @@ public class AMQMessage extends AbstractServerMessageImpl public int getContent(ByteBuffer buf, int offset) { - return _handle.getContent(offset, buf); + return getStoredMessage().getContent(offset, buf); } - public StoredMessage<MessageMetaData> getStoredMessage() + + public ByteBuffer getContent(int offset, int size) { - return _handle; + return getStoredMessage().getContent(offset, size); } - public SessionConfig getSessionConfig() - { - return _channelRef == null ? null : ((SessionConfig) _channelRef.get()); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index 80c28332c0..b1d43f0b50 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -21,19 +21,30 @@ package org.apache.qpid.server.message; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; -public abstract class AbstractServerMessageImpl implements ServerMessage +public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaData> implements ServerMessage<T> { - private final AtomicInteger _referenceCount = new AtomicInteger(0); - private final StoredMessage<?> _handle; - public AbstractServerMessageImpl(StoredMessage<?> handle) + private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater = + AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount"); + + private volatile int _referenceCount = 0; + private final StoredMessage<T> _handle; + + public AbstractServerMessageImpl(StoredMessage<T> handle) { _handle = handle; } + public StoredMessage<T> getStoredMessage() + { + return _handle; + } + public boolean incrementReference() { return incrementReference(1); @@ -41,9 +52,9 @@ public abstract class AbstractServerMessageImpl implements ServerMessage public boolean incrementReference(int count) { - if(_referenceCount.addAndGet(count) <= 0) + if(_refCountUpdater.addAndGet(this, count) <= 0) { - _referenceCount.addAndGet(-count); + _refCountUpdater.addAndGet(this, -count); return false; } else @@ -62,7 +73,7 @@ public abstract class AbstractServerMessageImpl implements ServerMessage */ public void decrementReference() { - int count = _referenceCount.decrementAndGet(); + int count = _refCountUpdater.decrementAndGet(this); // note that the operation of decrementing the reference count and then removing the message does not // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after @@ -73,7 +84,7 @@ public abstract class AbstractServerMessageImpl implements ServerMessage // set the reference count way below 0 so that we can detect that the message has been deleted // this is to guard against the message being spontaneously recreated (from the mgmt console) // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE/2); + _refCountUpdater.set(this,Integer.MIN_VALUE/2); // must check if the handle is null since there may be cases where we decide to throw away a message // and the handle has not yet been constructed @@ -99,6 +110,6 @@ public abstract class AbstractServerMessageImpl implements ServerMessage protected int getReferenceCount() { - return _referenceCount.get(); + return _referenceCount; } }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java index c32f80fc5b..7be91ad0ca 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java @@ -20,8 +20,11 @@ */ package org.apache.qpid.server.message; +import org.apache.qpid.server.store.StoredMessage; + public interface EnqueableMessage { - Long getMessageNumber(); + long getMessageNumber(); boolean isPersistent(); + StoredMessage getStoredMessage(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java index 1b3fdb1870..79d5574a91 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java @@ -22,10 +22,12 @@ package org.apache.qpid.server.message; import org.apache.qpid.server.queue.Filterable; +import org.apache.qpid.framing.AMQShortString; public interface InboundMessage extends Filterable { String getRoutingKey(); + AMQShortString getRoutingKeyShortString(); AMQMessageHeader getMessageHeader(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java index 08a09c4a85..44741f57bd 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; public interface MessageContentSource { public int getContent(ByteBuffer buf, int offset); + public ByteBuffer getContent(int offset, int size); long getSize(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java index f9863f4945..17ebb6ee07 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java @@ -30,9 +30,12 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.framing.AMQShortString; import java.nio.ByteBuffer; import java.lang.ref.SoftReference; +import java.util.ArrayList; +import java.util.List; public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage { @@ -42,7 +45,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes private MessageTransferHeader _messageHeader; private long _arrivalTime; private int _bodySize; - private volatile SoftReference<ByteBuffer> _body; private static final int ENCODER_SIZE = 1 << 10; @@ -53,21 +55,16 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes public MessageMetaData_0_10(MessageTransfer xfr) { - this(xfr.getHeader(), xfr.getBodySize(), xfr.getBody(), System.currentTimeMillis()); + this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis()); } private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime) { - this(header, bodySize, null, arrivalTime); - } - - private MessageMetaData_0_10(Header header, int bodySize, ByteBuffer xfrBody, long arrivalTime) - { _header = header; if(_header != null) { - _deliveryProps = _header.get(DeliveryProperties.class); - _messageProps = _header.get(MessageProperties.class); + _deliveryProps = _header.getDeliveryProperties(); + _messageProps = _header.getMessageProperties(); } else { @@ -78,21 +75,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes _arrivalTime = arrivalTime; _bodySize = bodySize; - - - if(xfrBody == null) - { - _body = null; - } - else - { - ByteBuffer body = ByteBuffer.allocate(_bodySize); - body.put(xfrBody); - body.flip(); - _body = new SoftReference<ByteBuffer>(body); - } - - } @@ -122,16 +104,39 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes encoder.writeInt64(_arrivalTime); encoder.writeInt32(_bodySize); - Struct[] headers = _header == null ? new Struct[0] : _header.getStructs(); - encoder.writeInt32(headers.length); + int headersLength = 0; + if(_header.getDeliveryProperties() != null) + { + headersLength++; + } + if(_header.getMessageProperties() != null) + { + headersLength++; + } + if(_header.getNonStandardProperties() != null) + { + headersLength += _header.getNonStandardProperties().size(); + } + encoder.writeInt32(headersLength); - for(Struct header : headers) + if(_header.getDeliveryProperties() != null) { - encoder.writeStruct32(header); - + encoder.writeStruct32(_header.getDeliveryProperties()); + } + if(_header.getMessageProperties() != null) + { + encoder.writeStruct32(_header.getMessageProperties()); } + if(_header.getNonStandardProperties() != null) + { + for(Struct header : _header.getNonStandardProperties()) + { + encoder.writeStruct32(header); + } + + } ByteBuffer buf = encoder.buffer(); return buf; } @@ -173,6 +178,11 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes return _deliveryProps == null ? null : _deliveryProps.getRoutingKey(); } + public AMQShortString getRoutingKeyShortString() + { + return AMQShortString.valueOf(getRoutingKey()); + } + public AMQMessageHeader getMessageHeader() { return _messageHeader; @@ -210,17 +220,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes return _header; } - public ByteBuffer getBody() - { - ByteBuffer body = _body == null ? null : _body.get(); - return body; - } - - public void setBody(ByteBuffer body) - { - _body = new SoftReference<ByteBuffer>(body); - } - private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10> { public MessageMetaData_0_10 createMetaData(ByteBuffer buf) @@ -232,14 +231,32 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes int bodySize = decoder.readInt32(); int headerCount = decoder.readInt32(); - Struct[] headers = new Struct[headerCount]; + DeliveryProperties deliveryProperties = null; + MessageProperties messageProperties = null; + List<Struct> otherProps = null; for(int i = 0 ; i < headerCount; i++) { - headers[i] = decoder.readStruct32(); + Struct struct = decoder.readStruct32(); + if(struct instanceof DeliveryProperties && deliveryProperties == null) + { + deliveryProperties = (DeliveryProperties) struct; + } + else if(struct instanceof MessageProperties && messageProperties == null) + { + messageProperties = (MessageProperties) struct; + } + else + { + if(otherProps == null) + { + otherProps = new ArrayList<Struct>(); + + } + otherProps.add(struct); + } } - - Header header = new Header(headers); + Header header = new Header(deliveryProperties,messageProperties,otherProps); return new MessageMetaData_0_10(header, bodySize, arrivalTime); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java index 51841e6dd0..30934ae014 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java @@ -24,32 +24,35 @@ import org.apache.qpid.transport.*; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.transport.ServerSession; +import org.apache.qpid.framing.AMQShortString; import java.nio.ByteBuffer; -import java.lang.ref.WeakReference; -public class MessageTransferMessage extends AbstractServerMessageImpl implements InboundMessage +public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage { - private StoredMessage<MessageMetaData_0_10> _storeMessage; - private WeakReference<Session> _sessionRef; - public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef) + private Object _connectionRef; + + public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef) { super(storeMessage); - _storeMessage = storeMessage; - _sessionRef = sessionRef; + _connectionRef = connectionRef; } private MessageMetaData_0_10 getMetaData() { - return _storeMessage.getMetaData(); + return getStoredMessage().getMetaData(); } public String getRoutingKey() { return getMetaData().getRoutingKey(); + } + public AMQShortString getRoutingKeyShortString() + { + return AMQShortString.valueOf(getRoutingKey()); } public AMQMessageHeader getMessageHeader() @@ -91,9 +94,9 @@ public class MessageTransferMessage extends AbstractServerMessageImpl implements return new TransferMessageReference(this); } - public Long getMessageNumber() + public long getMessageNumber() { - return _storeMessage.getMessageNumber(); + return getStoredMessage().getMessageNumber(); } public long getArrivalTime() @@ -103,7 +106,13 @@ public class MessageTransferMessage extends AbstractServerMessageImpl implements public int getContent(ByteBuffer buf, int offset) { - return _storeMessage.getContent(offset, buf); + return getStoredMessage().getContent(offset, buf); + } + + + public ByteBuffer getContent(int offset, int size) + { + return getStoredMessage().getContent(offset,size); } public Header getHeader() @@ -113,32 +122,13 @@ public class MessageTransferMessage extends AbstractServerMessageImpl implements public ByteBuffer getBody() { - ByteBuffer body = getMetaData().getBody(); - if(body == null && getSize() != 0l) - { - final int size = (int) getSize(); - int pos = 0; - body = ByteBuffer.allocate(size); - - while(pos < size) - { - pos += getContent(body, pos); - } - - body.flip(); - getMetaData().setBody(body.duplicate()); - } - return body; + return getContent(0, (int)getSize()); } - public Session getSession() + public Object getConnectionReference() { - return _sessionRef == null ? null : _sessionRef.get(); + return _connectionRef; } - public SessionConfig getSessionConfig() - { - return _sessionRef == null ? null : (ServerSession) _sessionRef.get(); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java index 2f2d39115f..2d25135326 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -23,13 +23,17 @@ package org.apache.qpid.server.message; import java.nio.ByteBuffer; import org.apache.qpid.server.configuration.SessionConfig; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; -public interface ServerMessage extends EnqueableMessage, MessageContentSource +public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueableMessage, MessageContentSource { String getRoutingKey(); AMQMessageHeader getMessageHeader(); + public StoredMessage<T> getStoredMessage(); + boolean isPersistent(); long getSize(); @@ -40,11 +44,12 @@ public interface ServerMessage extends EnqueableMessage, MessageContentSource MessageReference newReference(); - Long getMessageNumber(); + long getMessageNumber(); long getArrivalTime(); public int getContent(ByteBuffer buf, int offset); - SessionConfig getSessionConfig(); + public ByteBuffer getContent(int offset, int size); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java index aded3f3d2a..483bca894e 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java @@ -40,8 +40,8 @@ public class HeaderPropertiesConverter BasicContentHeaderProperties props = new BasicContentHeaderProperties(); Header header = messageTransferMessage.getHeader(); - DeliveryProperties deliveryProps = header.get(DeliveryProperties.class); - MessageProperties messageProps = header.get(MessageProperties.class); + DeliveryProperties deliveryProps = header.getDeliveryProperties(); + MessageProperties messageProps = header.getMessageProperties(); if(deliveryProps != null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 3970e5a2d4..efd904f6aa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -1,420 +1,420 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
- * Supported AMQP versions:
- * 8-0
- */
-package org.apache.qpid.server.output.amqp0_8;
-
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
-
- private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
- writeMessageDelivery(entry, channelId, deliverBody);
- }
-
-
- private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
- throws AMQException
- {
- if(entry.getMessage() instanceof AMQMessage)
- {
- return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
- }
- else
- {
- final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
- return chb;
- }
- }
-
-
- private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
- throws AMQException
- {
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
- }
-
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
- throws AMQException
- {
-
-
- int bodySize = (int) message.getSize();
-
- if(bodySize == 0)
- {
- SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
-
- writeFrame(compositeBlock);
- }
- else
- {
- int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
- int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
- int writtenSize = capacity;
-
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
- CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- while(writtenSize < bodySize)
- {
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
- writtenSize += capacity;
-
- writeFrame(new AMQFrame(channelId, body));
- }
- }
- }
-
- private class MessageContentSourceBody implements AMQBody
- {
- public static final byte TYPE = 3;
- private int _length;
- private MessageContentSource _message;
- private int _offset;
-
- public MessageContentSourceBody(MessageContentSource message, int offset, int length)
- {
- _message = message;
- _offset = offset;
- _length = length;
- }
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- public int getSize()
- {
- return _length;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- byte[] data = new byte[_length];
-
- _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
-
- buffer.write(data);
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
- }
-
-
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(entry, channelId, deliver);
- }
-
-
- private AMQBody createEncodedDeliverBody(QueueEntry entry,
- final long deliveryTag,
- final AMQShortString consumerTag)
- throws AMQException
- {
-
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- final AMQBody returnBlock = new AMQBody()
- {
-
- public AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
-
-
-
- }
-
- public byte getFrameType()
- {
- return AMQMethodBody.TYPE;
- }
-
- public int getSize()
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
- }
-
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
- throws AMQException
- {
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
-
- return getOkBody;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
-
- BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
-
-
- return basicReturnBody;
- }
-
- public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
-
- AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
- writeMessageDelivery(message, header, channelId, returnFrame);
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
-
- BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-
-
- public static final class CompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final AMQBody _contentBody;
- private final int _channel;
-
-
- public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
- _contentBody = contentBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
- }
-
- public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final int _channel;
-
-
- public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP versions: + * 8-0 + */ +package org.apache.qpid.server.output.amqp0_8; + +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.HeaderPropertiesConverter; +import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.AMQException; +import org.apache.qpid.transport.DeliveryProperties; + +import java.io.DataOutput; +import java.io.IOException; + +public class ProtocolOutputConverterImpl implements ProtocolOutputConverter +{ + + private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); + + public static Factory getInstanceFactory() + { + return new Factory() + { + + public ProtocolOutputConverter newInstance(AMQProtocolSession session) + { + return new ProtocolOutputConverterImpl(session); + } + }; + } + + + private final AMQProtocolSession _protocolSession; + + private ProtocolOutputConverterImpl(AMQProtocolSession session) + { + _protocolSession = session; + } + + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag); + writeMessageDelivery(entry, channelId, deliverBody); + } + + + private ContentHeaderBody getContentHeaderBody(QueueEntry entry) + throws AMQException + { + if(entry.getMessage() instanceof AMQMessage) + { + return ((AMQMessage)entry.getMessage()).getContentHeaderBody(); + } + else + { + final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID); + chb.bodySize = message.getSize(); + return chb; + } + } + + + private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody) + throws AMQException + { + writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody); + } + + private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) + throws AMQException + { + + + int bodySize = (int) message.getSize(); + + if(bodySize == 0) + { + SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, + contentHeaderBody); + + writeFrame(compositeBlock); + } + else + { + int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); + + + int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; + + int writtenSize = capacity; + + AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); + + CompositeAMQBodyBlock + compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); + writeFrame(compositeBlock); + + while(writtenSize < bodySize) + { + capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; + MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); + writtenSize += capacity; + + writeFrame(new AMQFrame(channelId, body)); + } + } + } + + private class MessageContentSourceBody implements AMQBody + { + public static final byte TYPE = 3; + private int _length; + private MessageContentSource _message; + private int _offset; + + public MessageContentSourceBody(MessageContentSource message, int offset, int length) + { + _message = message; + _offset = offset; + _length = length; + } + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return _length; + } + + public void writePayload(DataOutput buffer) throws IOException + { + byte[] data = new byte[_length]; + + _message.getContent(java.nio.ByteBuffer.wrap(data), _offset); + + buffer.write(data); + } + + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException + { + throw new UnsupportedOperationException(); + } + } + + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) + { + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + contentHeaderBody); + return contentHeader; + } + + + public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException + { + AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); + writeMessageDelivery(entry, channelId, deliver); + } + + + private AMQBody createEncodedDeliverBody(QueueEntry entry, + final long deliveryTag, + final AMQShortString consumerTag) + throws AMQException + { + + final AMQShortString exchangeName; + final AMQShortString routingKey; + + if(entry.getMessage() instanceof AMQMessage) + { + final AMQMessage message = (AMQMessage) entry.getMessage(); + final MessagePublishInfo pb = message.getMessagePublishInfo(); + exchangeName = pb.getExchange(); + routingKey = pb.getRoutingKey(); + } + else + { + MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); + exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); + routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); + } + + final boolean isRedelivered = entry.isRedelivered(); + + final AMQBody returnBlock = new AMQBody() + { + + public AMQBody _underlyingBody; + + public AMQBody createAMQBody() + { + return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, + deliveryTag, + isRedelivered, + exchangeName, + routingKey); + + + + + + } + + public byte getFrameType() + { + return AMQMethodBody.TYPE; + } + + public int getSize() + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.getSize(); + } + + public void writePayload(DataOutput buffer) throws IOException + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + _underlyingBody.writePayload(buffer); + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) + throws AMQException + { + throw new AMQException("This block should never be dispatched!"); + } + }; + return returnBlock; + } + + private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) + throws AMQException + { + final AMQShortString exchangeName; + final AMQShortString routingKey; + + if(entry.getMessage() instanceof AMQMessage) + { + final AMQMessage message = (AMQMessage) entry.getMessage(); + final MessagePublishInfo pb = message.getMessagePublishInfo(); + exchangeName = pb.getExchange(); + routingKey = pb.getRoutingKey(); + } + else + { + MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); + exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); + routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); + } + + final boolean isRedelivered = entry.isRedelivered(); + + BasicGetOkBody getOkBody = + METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, + isRedelivered, + exchangeName, + routingKey, + queueSize); + + return getOkBody; + } + + public byte getProtocolMinorVersion() + { + return getProtocolSession().getProtocolMinorVersion(); + } + + public byte getProtocolMajorVersion() + { + return getProtocolSession().getProtocolMajorVersion(); + } + + private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, + int replyCode, + AMQShortString replyText) throws AMQException + { + + BasicReturnBody basicReturnBody = + METHOD_REGISTRY.createBasicReturnBody(replyCode, + replyText, + messagePublishInfo.getExchange(), + messagePublishInfo.getRoutingKey()); + + + return basicReturnBody; + } + + public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText) + throws AMQException + { + + AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText); + + writeMessageDelivery(message, header, channelId, returnFrame); + } + + + public void writeFrame(AMQDataBlock block) + { + getProtocolSession().writeFrame(block); + } + + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + + BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); + writeFrame(basicCancelOkBody.generateFrame(channelId)); + + } + + + public static final class CompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final AMQBody _contentBody; + private final int _channel; + + + public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + _contentBody = contentBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); + } + + public void writePayload(DataOutput buffer) throws IOException + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); + } + } + + public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final int _channel; + + + public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; + } + + public void writePayload(DataOutput buffer) throws IOException + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index aef3483282..010afcb1a9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -1,419 +1,418 @@ -package org.apache.qpid.server.output.amqp0_9;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
- private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
- writeMessageDelivery(entry, channelId, deliverBody);
- }
-
-
- private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
- throws AMQException
- {
- if(entry.getMessage() instanceof AMQMessage)
- {
- return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
- }
- else
- {
- final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
- return chb;
- }
- }
-
-
- private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
- throws AMQException
- {
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
- }
-
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
- throws AMQException
- {
-
-
- int bodySize = (int) message.getSize();
-
- if(bodySize == 0)
- {
- SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
-
- writeFrame(compositeBlock);
- }
- else
- {
- int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
- int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
- int writtenSize = capacity;
-
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
-
- CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- while(writtenSize < bodySize)
- {
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
- writtenSize += capacity;
-
- writeFrame(new AMQFrame(channelId, body));
- }
- }
- }
-
- private class MessageContentSourceBody implements AMQBody
- {
- public static final byte TYPE = 3;
- private int _length;
- private MessageContentSource _message;
- private int _offset;
-
- public MessageContentSourceBody(MessageContentSource message, int offset, int length)
- {
- _message = message;
- _offset = offset;
- _length = length;
- }
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- public int getSize()
- {
- return _length;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- byte[] data = new byte[_length];
-
- _message.getContent(ByteBuffer.wrap(data), _offset);
-
- buffer.write(data);
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
- {
- throw new UnsupportedOperationException();
- }
- }
-
-
- private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
- }
-
-
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(entry, channelId, deliver);
- }
-
-
- private AMQBody createEncodedDeliverBody(QueueEntry entry,
- final long deliveryTag,
- final AMQShortString consumerTag)
- throws AMQException
- {
-
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- final AMQBody returnBlock = new AMQBody()
- {
-
- public AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
-
-
-
- }
-
- public byte getFrameType()
- {
- return AMQMethodBody.TYPE;
- }
-
- public int getSize()
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
- }
-
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
- throws AMQException
- {
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
-
- return getOkBody;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
-
- BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
-
-
- return basicReturnBody;
- }
-
- public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
-
- AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
- writeMessageDelivery(message, header, channelId, returnFrame);
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
-
- BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-
-
- public static final class CompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final AMQBody _contentBody;
- private final int _channel;
-
-
- public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
- _contentBody = contentBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
- }
-
- public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final int _channel;
-
-
- public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
- }
-
-}
+package org.apache.qpid.server.output.amqp0_9; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.HeaderPropertiesConverter; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.AMQException; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class ProtocolOutputConverterImpl implements ProtocolOutputConverter +{ + private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + + + public static Factory getInstanceFactory() + { + return new Factory() + { + + public ProtocolOutputConverter newInstance(AMQProtocolSession session) + { + return new ProtocolOutputConverterImpl(session); + } + }; + } + + private final AMQProtocolSession _protocolSession; + + private ProtocolOutputConverterImpl(AMQProtocolSession session) + { + _protocolSession = session; + } + + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag); + writeMessageDelivery(entry, channelId, deliverBody); + } + + + private ContentHeaderBody getContentHeaderBody(QueueEntry entry) + throws AMQException + { + if(entry.getMessage() instanceof AMQMessage) + { + return ((AMQMessage)entry.getMessage()).getContentHeaderBody(); + } + else + { + final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); + chb.bodySize = message.getSize(); + return chb; + } + } + + + private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody) + throws AMQException + { + writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody); + } + + private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) + throws AMQException + { + + + int bodySize = (int) message.getSize(); + + if(bodySize == 0) + { + SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, + contentHeaderBody); + + writeFrame(compositeBlock); + } + else + { + int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); + + + int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; + + int writtenSize = capacity; + + AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); + + + CompositeAMQBodyBlock + compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); + writeFrame(compositeBlock); + + while(writtenSize < bodySize) + { + capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; + MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); + writtenSize += capacity; + + writeFrame(new AMQFrame(channelId, body)); + } + } + } + + private class MessageContentSourceBody implements AMQBody + { + public static final byte TYPE = 3; + private int _length; + private MessageContentSource _message; + private int _offset; + + public MessageContentSourceBody(MessageContentSource message, int offset, int length) + { + _message = message; + _offset = offset; + _length = length; + } + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return _length; + } + + public void writePayload(DataOutput buffer) throws IOException + { + byte[] data = new byte[_length]; + + _message.getContent(ByteBuffer.wrap(data), _offset); + + buffer.write(data); + } + + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException + { + throw new UnsupportedOperationException(); + } + } + + + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) + { + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + contentHeaderBody); + return contentHeader; + } + + + public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException + { + AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); + writeMessageDelivery(entry, channelId, deliver); + } + + + private AMQBody createEncodedDeliverBody(QueueEntry entry, + final long deliveryTag, + final AMQShortString consumerTag) + throws AMQException + { + + final AMQShortString exchangeName; + final AMQShortString routingKey; + + if(entry.getMessage() instanceof AMQMessage) + { + final AMQMessage message = (AMQMessage) entry.getMessage(); + final MessagePublishInfo pb = message.getMessagePublishInfo(); + exchangeName = pb.getExchange(); + routingKey = pb.getRoutingKey(); + } + else + { + MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); + exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); + routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); + } + + final boolean isRedelivered = entry.isRedelivered(); + + final AMQBody returnBlock = new AMQBody() + { + + public AMQBody _underlyingBody; + + public AMQBody createAMQBody() + { + return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, + deliveryTag, + isRedelivered, + exchangeName, + routingKey); + + + + + + } + + public byte getFrameType() + { + return AMQMethodBody.TYPE; + } + + public int getSize() + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.getSize(); + } + + public void writePayload(DataOutput buffer) throws IOException + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + _underlyingBody.writePayload(buffer); + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) + throws AMQException + { + throw new AMQException("This block should never be dispatched!"); + } + }; + return returnBlock; + } + + private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) + throws AMQException + { + final AMQShortString exchangeName; + final AMQShortString routingKey; + + if(entry.getMessage() instanceof AMQMessage) + { + final AMQMessage message = (AMQMessage) entry.getMessage(); + final MessagePublishInfo pb = message.getMessagePublishInfo(); + exchangeName = pb.getExchange(); + routingKey = pb.getRoutingKey(); + } + else + { + MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); + exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); + routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); + } + + final boolean isRedelivered = entry.isRedelivered(); + + BasicGetOkBody getOkBody = + METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, + isRedelivered, + exchangeName, + routingKey, + queueSize); + + return getOkBody; + } + + public byte getProtocolMinorVersion() + { + return getProtocolSession().getProtocolMinorVersion(); + } + + public byte getProtocolMajorVersion() + { + return getProtocolSession().getProtocolMajorVersion(); + } + + private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, + int replyCode, + AMQShortString replyText) throws AMQException + { + + BasicReturnBody basicReturnBody = + METHOD_REGISTRY.createBasicReturnBody(replyCode, + replyText, + messagePublishInfo.getExchange(), + messagePublishInfo.getRoutingKey()); + + + return basicReturnBody; + } + + public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText) + throws AMQException + { + + AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText); + + writeMessageDelivery(message, header, channelId, returnFrame); + } + + + public void writeFrame(AMQDataBlock block) + { + getProtocolSession().writeFrame(block); + } + + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + + BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); + writeFrame(basicCancelOkBody.generateFrame(channelId)); + + } + + + public static final class CompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final AMQBody _contentBody; + private final int _channel; + + + public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + _contentBody = contentBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); + } + + public void writePayload(DataOutput buffer) throws IOException + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); + } + } + + public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final int _channel; + + + public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; + } + + public void writePayload(DataOutput buffer) throws IOException + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); + } + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java index 10748298bc..5e2b3e4556 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java @@ -1,414 +1,425 @@ -package org.apache.qpid.server.output.amqp0_9_1;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
- private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
- writeMessageDelivery(entry, channelId, deliverBody);
- }
-
-
- private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
- throws AMQException
- {
- if(entry.getMessage() instanceof AMQMessage)
- {
- return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
- }
- else
- {
- final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
- return chb;
- }
- }
-
-
- private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
- throws AMQException
- {
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
- }
-
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
- throws AMQException
- {
-
-
- int bodySize = (int) message.getSize();
-
- if(bodySize == 0)
- {
- SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
-
- writeFrame(compositeBlock);
- }
- else
- {
- int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
- int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
- int writtenSize = capacity;
-
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
- CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- while(writtenSize < bodySize)
- {
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
- writtenSize += capacity;
-
- writeFrame(new AMQFrame(channelId, body));
- }
- }
- }
-
- private class MessageContentSourceBody implements AMQBody
- {
- public static final byte TYPE = 3;
- private int _length;
- private MessageContentSource _message;
- private int _offset;
-
- public MessageContentSourceBody(MessageContentSource message, int offset, int length)
- {
- _message = message;
- _offset = offset;
- _length = length;
- }
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- public int getSize()
- {
- return _length;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- byte[] data = new byte[_length];
-
- _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
-
- buffer.write(data);
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
- }
-
-
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(entry, channelId, deliver);
- }
-
-
- private AMQBody createEncodedDeliverBody(QueueEntry entry,
- final long deliveryTag,
- final AMQShortString consumerTag)
- throws AMQException
- {
-
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- final AMQBody returnBlock = new AMQBody()
- {
-
- public AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
-
-
-
- }
-
- public byte getFrameType()
- {
- return AMQMethodBody.TYPE;
- }
-
- public int getSize()
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
- }
-
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
- throws AMQException
- {
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
-
- return getOkBody;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
-
- BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
-
-
- return basicReturnBody;
- }
-
- public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
-
- AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
- writeMessageDelivery(message, header, channelId, returnFrame);
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
-
- BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-
-
- public static final class CompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final AMQBody _contentBody;
- private final int _channel;
-
-
- public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
- _contentBody = contentBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
- }
-
- public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final int _channel;
-
-
- public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
- }
-
+package org.apache.qpid.server.output.amqp0_9_1; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.HeaderPropertiesConverter; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.AMQException; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class ProtocolOutputConverterImpl implements ProtocolOutputConverter +{ + private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91); + + public static Factory getInstanceFactory() + { + return new Factory() + { + + public ProtocolOutputConverter newInstance(AMQProtocolSession session) + { + return new ProtocolOutputConverterImpl(session); + } + }; + } + + private final AMQProtocolSession _protocolSession; + + private ProtocolOutputConverterImpl(AMQProtocolSession session) + { + _protocolSession = session; + } + + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag); + writeMessageDelivery(entry, channelId, deliverBody); + } + + + private ContentHeaderBody getContentHeaderBody(QueueEntry entry) + throws AMQException + { + if(entry.getMessage() instanceof AMQMessage) + { + return ((AMQMessage)entry.getMessage()).getContentHeaderBody(); + } + else + { + final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); + chb.bodySize = message.getSize(); + return chb; + } + } + + + private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody) + throws AMQException + { + writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody); + } + + private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) + throws AMQException + { + + + int bodySize = (int) message.getSize(); + + if(bodySize == 0) + { + SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, + contentHeaderBody); + + writeFrame(compositeBlock); + } + else + { + int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); + + + int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; + + int writtenSize = capacity; + + AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); + + CompositeAMQBodyBlock + compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); + writeFrame(compositeBlock); + + while(writtenSize < bodySize) + { + capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; + MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); + writtenSize += capacity; + + writeFrame(new AMQFrame(channelId, body)); + } + } + } + + private class MessageContentSourceBody implements AMQBody + { + public static final byte TYPE = 3; + private int _length; + private MessageContentSource _message; + private int _offset; + + public MessageContentSourceBody(MessageContentSource message, int offset, int length) + { + _message = message; + _offset = offset; + _length = length; + } + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return _length; + } + + public void writePayload(DataOutput buffer) throws IOException + { + ByteBuffer buf = _message.getContent(_offset, _length); + + if(buf.hasArray()) + { + buffer.write(buf.array(), buf.arrayOffset()+buf.position(), buf.remaining()); + } + else + { + + byte[] data = new byte[_length]; + + buf.get(data); + + buffer.write(data); + } + } + + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException + { + throw new UnsupportedOperationException(); + } + } + + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) + { + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + contentHeaderBody); + return contentHeader; + } + + + public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException + { + AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); + writeMessageDelivery(entry, channelId, deliver); + } + + + private AMQBody createEncodedDeliverBody(QueueEntry entry, + final long deliveryTag, + final AMQShortString consumerTag) + throws AMQException + { + + final AMQShortString exchangeName; + final AMQShortString routingKey; + + if(entry.getMessage() instanceof AMQMessage) + { + final AMQMessage message = (AMQMessage) entry.getMessage(); + final MessagePublishInfo pb = message.getMessagePublishInfo(); + exchangeName = pb.getExchange(); + routingKey = pb.getRoutingKey(); + } + else + { + MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); + exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); + routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); + } + + final boolean isRedelivered = entry.isRedelivered(); + + final AMQBody returnBlock = new AMQBody() + { + + public AMQBody _underlyingBody; + + public AMQBody createAMQBody() + { + return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, + deliveryTag, + isRedelivered, + exchangeName, + routingKey); + + + + + + } + + public byte getFrameType() + { + return AMQMethodBody.TYPE; + } + + public int getSize() + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.getSize(); + } + + public void writePayload(DataOutput buffer) throws IOException + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + _underlyingBody.writePayload(buffer); + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) + throws AMQException + { + throw new AMQException("This block should never be dispatched!"); + } + }; + return returnBlock; + } + + private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) + throws AMQException + { + final AMQShortString exchangeName; + final AMQShortString routingKey; + + if(entry.getMessage() instanceof AMQMessage) + { + final AMQMessage message = (AMQMessage) entry.getMessage(); + final MessagePublishInfo pb = message.getMessagePublishInfo(); + exchangeName = pb.getExchange(); + routingKey = pb.getRoutingKey(); + } + else + { + MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); + exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); + routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); + } + + final boolean isRedelivered = entry.isRedelivered(); + + BasicGetOkBody getOkBody = + METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, + isRedelivered, + exchangeName, + routingKey, + queueSize); + + return getOkBody; + } + + public byte getProtocolMinorVersion() + { + return getProtocolSession().getProtocolMinorVersion(); + } + + public byte getProtocolMajorVersion() + { + return getProtocolSession().getProtocolMajorVersion(); + } + + private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, + int replyCode, + AMQShortString replyText) throws AMQException + { + + BasicReturnBody basicReturnBody = + METHOD_REGISTRY.createBasicReturnBody(replyCode, + replyText, + messagePublishInfo.getExchange(), + messagePublishInfo.getRoutingKey()); + + + return basicReturnBody; + } + + public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText) + throws AMQException + { + + AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText); + + writeMessageDelivery(message, header, channelId, returnFrame); + } + + + public void writeFrame(AMQDataBlock block) + { + getProtocolSession().writeFrame(block); + } + + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + + BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); + writeFrame(basicCancelOkBody.generateFrame(channelId)); + + } + + + public static final class CompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final AMQBody _contentBody; + private final int _channel; + + + public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + _contentBody = contentBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); + } + + public void writePayload(DataOutput buffer) throws IOException + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); + } + } + + public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final int _channel; + + + public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; + } + + public void writePayload(DataOutput buffer) throws IOException + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); + } + } + }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 2a277848ed..b778cf4135 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -111,6 +111,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr // to save boxing the channelId and looking up in a map... cache in an array the low numbered // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; + private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; private AMQShortString _contextKey; @@ -280,6 +281,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr closeProtocolSession(); } } + receiveComplete(); } catch (Exception e) { @@ -288,6 +290,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } } + private void receiveComplete() + { + for (AMQChannel channel : _channelMap.values()) + { + channel.receivedComplete(); + } + + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; @@ -405,35 +416,51 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } } + + private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; + private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes); + private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); + private ByteBuffer asByteBuffer(AMQDataBlock block) { - final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); + final int size = (int) block.getSize(); - try - { - block.writePayload(new DataOutputStream(new OutputStream() - { + final byte[] data; - @Override - public void write(int b) throws IOException - { - buf.put((byte) b); - } + if(size > REUSABLE_BYTE_BUFFER_CAPACITY) + { + data= new byte[size]; + } + else + { - @Override - public void write(byte[] b, int off, int len) throws IOException - { - buf.put(b, off, len); - } - })); + data = _reusableBytes; + } + _reusableDataOutput.setBuffer(data); + + try + { + block.writePayload(_reusableDataOutput); } catch (IOException e) { throw new RuntimeException(e); } - buf.flip(); + final ByteBuffer buf; + + if(size <= REUSABLE_BYTE_BUFFER_CAPACITY) + { + buf = _reusableByteBuffer; + buf.position(0); + } + else + { + buf = ByteBuffer.wrap(data); + } + buf.limit(_reusableDataOutput.length()); + return buf; } @@ -1425,7 +1452,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _statisticsEnabled = enabled; } - @Override public boolean isSessionNameUnique(byte[] name) { // 0-8/0-9/0-9-1 sessions don't have names @@ -1437,9 +1463,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _deferFlush = deferFlush; } - - - @Override public String getUserName() { return getAuthorizedPrincipal().getName(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index b4765d6227..143a6ae8ca 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -557,7 +557,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que List<String> list = new ArrayList<String>(); AMQMessageHeader header = msg.getMessageHeader(); - MessageProperties msgProps = msg.getHeader().get(MessageProperties.class); + MessageProperties msgProps = msg.getHeader().getMessageProperties(); String appID = null; String userID = null; @@ -619,7 +619,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); } - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn); txn.commit(); } @@ -654,7 +654,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); } - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index 05e0efd9a6..0bd40e8f13 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -35,6 +35,7 @@ public interface BaseQueue extends TransactionLogResource void enqueue(ServerMessage message) throws AMQException; void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException; + void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException; boolean isDurable(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index c4762c98c9..ab0a567114 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -100,7 +100,7 @@ public class ConflationQueueList extends SimpleQueueEntryList { if(entry.acquire()) { - ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); + ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); txn.dequeue(entry.getQueue(),entry.getMessage(), new ServerTransaction.Action() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java index 26112d9f53..31e9725e47 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.framing.AMQShortString; public class InboundMessageAdapter implements InboundMessage { @@ -44,6 +45,11 @@ public class InboundMessageAdapter implements InboundMessage } + public AMQShortString getRoutingKeyShortString() + { + return AMQShortString.valueOf(_entry.getMessage()); + } + public String getRoutingKey() { return _entry.getMessage().getRoutingKey(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index a56f5685b8..19a7a15ad1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -63,7 +63,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done * by the message handle. */ - private ArrayList<? extends BaseQueue> _destinationQueues; + private List<? extends BaseQueue> _destinationQueues; private long _expiration; @@ -126,12 +126,18 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public MessageMetaData headersReceived() { - _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0); + + return headersReceived(System.currentTimeMillis()); + } + + public MessageMetaData headersReceived(long currentTime) + { + _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime); return _messageMetaData; } - public ArrayList<? extends BaseQueue> getDestinationQueues() + public List<? extends BaseQueue> getDestinationQueues() { return _destinationQueues; } @@ -158,6 +164,11 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return _messagePublishInfo.getExchange(); } + public AMQShortString getRoutingKeyShortString() + { + return _messagePublishInfo.getRoutingKey(); + } + public String getRoutingKey() { return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); @@ -209,7 +220,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return getContentHeader().bodySize; } - public Long getMessageNumber() + public long getMessageNumber() { return _storedMessageHandle.getMessageNumber(); } @@ -225,7 +236,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes } - public void enqueue(final ArrayList<? extends BaseQueue> queues) + public void enqueue(final List<? extends BaseQueue> queues) { _destinationQueues = queues; } @@ -288,6 +299,15 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes } + + public ByteBuffer getContent(int offset, int size) + { + ByteBuffer buf = ByteBuffer.allocate(size); + getContent(buf,offset); + buf.flip(); + return buf; + } + public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle) { _storedMessageHandle = storedMessageHandle; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 5bb5dc3462..bd349e7512 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import java.util.Collection; @@ -424,7 +425,7 @@ public abstract class QueueEntryImpl implements QueueEntry if (rerouteQueues != null && rerouteQueues.size() != 0) { - ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore()); txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() { @@ -447,7 +448,8 @@ public abstract class QueueEntryImpl implements QueueEntry { } - }); + }, 0L); + txn.dequeue(currentQueue, message, new ServerTransaction.Action() { public void postCommit() @@ -460,8 +462,10 @@ public abstract class QueueEntryImpl implements QueueEntry } }); - } + + txn.commit(); } + } } public boolean isQueueDeleted() @@ -549,4 +553,11 @@ public abstract class QueueEntryImpl implements QueueEntry _deliveryCountUpdater.decrementAndGet(this); } + public String toString() + { + return "QueueEntryImpl{" + + "_entryId=" + _entryId + + ", _state=" + _state + + '}'; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index 5270f9f740..a6cdb40d72 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -51,7 +51,6 @@ public class QueueRunner implements ReadWriteRunnable private final AtomicInteger _scheduled = new AtomicInteger(IDLE); - private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES; private final AtomicBoolean _stateChange = new AtomicBoolean(); private final AtomicLong _lastRunAgain = new AtomicLong(); @@ -65,8 +64,6 @@ public class QueueRunner implements ReadWriteRunnable _queue = queue; } - private int trouble = 0; - public void run() { if(_scheduled.compareAndSet(SCHEDULED,RUNNING)) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index cc93cbc2f1..25fc91b998 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -584,8 +584,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException { + enqueue(message, false, action); + } + + public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException + { - incrementTxnEnqueueStats(message); + if(transactional) + { + incrementTxnEnqueueStats(message); + } incrementQueueCount(); incrementQueueSize(message); @@ -733,13 +741,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private void incrementTxnEnqueueStats(final ServerMessage message) { - SessionConfig session = message.getSessionConfig(); - - if(session !=null && session.isTransactional()) - { - _msgTxnEnqueues.incrementAndGet(); - _byteTxnEnqueues.addAndGet(message.getSize()); - } + _msgTxnEnqueues.incrementAndGet(); + _byteTxnEnqueues.addAndGet(message.getSize()); } private void incrementTxnDequeueStats(QueueEntry entry) @@ -1447,7 +1450,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { } - }); + }, 0L); txn.dequeue(this, entry.getMessage(), new ServerTransaction.Action() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java index abf9e3379d..7d4748bcaa 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -32,11 +32,9 @@ import static org.apache.qpid.server.security.access.Operation.UNBIND; import java.net.SocketAddress; import java.security.Principal; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import javax.security.auth.Subject; @@ -192,6 +190,15 @@ public class SecurityManager return _logger; } + private static class CachedPropertiesMap extends LinkedHashMap<String, PublishAccessCheck> + { + @Override + protected boolean removeEldestEntry(Entry<String, PublishAccessCheck> eldest) + { + return size() >= 200; + } + } + private abstract class AccessCheck { abstract Result allowed(SecurityPlugin plugin); @@ -204,56 +211,61 @@ public class SecurityManager return true; } - HashMap<String, SecurityPlugin> remainingPlugins = new HashMap<String, SecurityPlugin>(_globalPlugins); + Map<String, SecurityPlugin> remainingPlugins = _globalPlugins.isEmpty() + ? Collections.<String, SecurityPlugin>emptyMap() + : _hostPlugins.isEmpty() ? _globalPlugins : new HashMap<String, SecurityPlugin>(_globalPlugins); - for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet()) + if(!_hostPlugins.isEmpty()) { - // Create set of global only plugins - SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey()); - if (globalPlugin != null) - { - remainingPlugins.remove(hostEntry.getKey()); - } - - Result host = checker.allowed(hostEntry.getValue()); - - if (host == Result.DENIED) - { - // Something vetoed the access, we're done - return false; - } - - // host allow overrides global allow, so only check global on abstain or defer - if (host != Result.ALLOWED) - { - if (globalPlugin == null) - { - if (host == Result.DEFER) - { - host = hostEntry.getValue().getDefault(); - } - if (host == Result.DENIED) + for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet()) + { + // Create set of global only plugins + SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey()); + if (globalPlugin != null) + { + remainingPlugins.remove(hostEntry.getKey()); + } + + Result host = checker.allowed(hostEntry.getValue()); + + if (host == Result.DENIED) + { + // Something vetoed the access, we're done + return false; + } + + // host allow overrides global allow, so only check global on abstain or defer + if (host != Result.ALLOWED) + { + if (globalPlugin == null) { - return false; + if (host == Result.DEFER) + { + host = hostEntry.getValue().getDefault(); + } + if (host == Result.DENIED) + { + return false; + } } - } - else - { - Result global = checker.allowed(globalPlugin); - if (global == Result.DEFER) - { - global = globalPlugin.getDefault(); - } - if (global == Result.ABSTAIN && host == Result.DEFER) - { - global = hostEntry.getValue().getDefault(); - } - if (global == Result.DENIED) + else { - return false; + Result global = checker.allowed(globalPlugin); + if (global == Result.DEFER) + { + global = globalPlugin.getDefault(); + } + if (global == Result.ABSTAIN && host == Result.DEFER) + { + global = hostEntry.getValue().getDefault(); + } + if (global == Result.DENIED) + { + return false; + } } - } - } + } + } } for (SecurityPlugin plugin : remainingPlugins.values()) @@ -371,15 +383,33 @@ public class SecurityManager }); } + + private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _immediatePublishPropsCache + = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>(); + private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _publishPropsCache + = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>(); + public boolean authorisePublish(final boolean immediate, final String routingKey, final String exchangeName) { - return checkAllPlugins(new AccessCheck() + PublishAccessCheck check; + ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> cache = + immediate ? _immediatePublishPropsCache : _publishPropsCache; + + ConcurrentHashMap<String, PublishAccessCheck> exchangeMap = cache.get(exchangeName); + if(exchangeMap == null) { - Result allowed(SecurityPlugin plugin) + cache.putIfAbsent(exchangeName, new ConcurrentHashMap<String, PublishAccessCheck>()); + exchangeMap = cache.get(exchangeName); + } + + check = exchangeMap.get(routingKey); + if(check == null) { - return plugin.authorise(PUBLISH, EXCHANGE, new ObjectProperties(exchangeName, routingKey, immediate)); + check = new PublishAccessCheck(new ObjectProperties(exchangeName, routingKey, immediate)); + exchangeMap.put(routingKey, check); } - }); + + return checkAllPlugins(check); } public boolean authorisePurge(final AMQQueue queue) @@ -413,4 +443,19 @@ public class SecurityManager return current; } + + private class PublishAccessCheck extends AccessCheck + { + private final ObjectProperties _props; + + public PublishAccessCheck(ObjectProperties props) + { + _props = props; + } + + Result allowed(SecurityPlugin plugin) + { + return plugin.authorise(PUBLISH, EXCHANGE, _props); + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java index e4bf8df340..8a52d31f97 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java @@ -18,10 +18,7 @@ */ package org.apache.qpid.server.security.access; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.commons.lang.StringUtils; import org.apache.qpid.framing.AMQShortString; @@ -35,7 +32,7 @@ import org.apache.qpid.server.queue.AMQQueue; * {@link #equals(Object)} and {@link #hashCode()} are intended for use in maps. This is due to the wildcard matching * described above. */ -public class ObjectProperties extends HashMap<ObjectProperties.Property, String> +public class ObjectProperties { /** serialVersionUID */ private static final long serialVersionUID = -1356019341374170495L; @@ -93,7 +90,9 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String> return properties; } } - + + private final EnumMap<Property, String> _properties = new EnumMap<Property, String>(Property.class); + public static List<String> getAllPropertyNames() { List<String> properties = new ArrayList<String>(); @@ -113,7 +112,7 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String> { super(); - putAll(copy); + _properties.putAll(copy._properties); } public ObjectProperties(String name) @@ -231,7 +230,7 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String> public List<String> getPropertyNames() { List<String> properties = new ArrayList<String>(); - for (Property property : keySet()) + for (Property property : _properties.keySet()) { properties.add(property.getName()); } @@ -240,17 +239,22 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String> public Boolean isSet(Property key) { - return containsKey(key) && Boolean.valueOf(get(key)); + return _properties.containsKey(key) && Boolean.valueOf(_properties.get(key)); } - + + public String get(Property key) + { + return _properties.get(key); + } + public String getName() { - return get(Property.NAME); + return _properties.get(Property.NAME); } public void setName(String name) { - put(Property.NAME, name); + _properties.put(Property.NAME, name); } public void setName(AMQShortString name) @@ -262,39 +266,38 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String> { return put(key, value == null ? "" : value.asString()); } - - @Override + public String put(Property key, String value) { - return super.put(key, value == null ? "" : value.trim()); + return _properties.put(key, value == null ? "" : value.trim()); } public void put(Property key, Boolean value) { if (value != null) { - super.put(key, Boolean.toString(value)); + _properties.put(key, Boolean.toString(value)); } } public boolean matches(ObjectProperties properties) { - if (properties.keySet().isEmpty()) + if (properties._properties.keySet().isEmpty()) { return true; } - if (!keySet().containsAll(properties.keySet())) + if (!_properties.keySet().containsAll(properties._properties.keySet())) { return false; } - for (Map.Entry<Property,String> entry : properties.entrySet()) + for (Map.Entry<Property,String> entry : properties._properties.entrySet()) { Property key = entry.getKey(); String ruleValue = entry.getValue(); - String thisValue = get(key); + String thisValue = _properties.get(key); if (!valueMatches(thisValue, ruleValue)) { @@ -315,4 +318,29 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String> && thisValue.length() > ruleValue.length() && thisValue.startsWith(ruleValue.substring(0, ruleValue.length() - 2))); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ObjectProperties that = (ObjectProperties) o; + + if (_properties != null ? !_properties.equals(that._properties) : that._properties != null) return false; + + return true; + } + + @Override + public int hashCode() + { + return _properties != null ? _properties.hashCode() : 0; + } + + @Override + public String toString() + { + return _properties.toString(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index d3f46d2e90..d90b3d02ba 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -52,6 +52,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; /** @@ -60,7 +62,7 @@ import org.apache.qpid.server.queue.AMQQueue; * * TODO extract the SQL statements into a generic JDBC store */ -public class DerbyMessageStore implements MessageStore +public class DerbyMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); @@ -197,12 +199,16 @@ public class DerbyMessageStore implements MessageStore Configuration storeConfiguration, LogSubject logSubject) throws Exception { - CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); - if(!_configured) { _logSubject = logSubject; + } + + CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); + + if(!_configured) + { commonConfiguration(name, storeConfiguration, logSubject); _configured = true; @@ -219,6 +225,11 @@ public class DerbyMessageStore implements MessageStore Configuration storeConfiguration, LogSubject logSubject) throws Exception { + + if(!_configured) + { + _logSubject = logSubject; + } CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName())); if(!_configured) @@ -697,7 +708,7 @@ public class DerbyMessageStore implements MessageStore if (results == 0) { - throw new RuntimeException("Message metadata not found for message id " + messageId); + _logger.warn("Message metadata not found for message id " + messageId); } if (_logger.isDebugEnabled()) @@ -1678,14 +1689,26 @@ public class DerbyMessageStore implements MessageStore } } - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId); + if(message.getStoredMessage() instanceof StoredDerbyMessage) + { + try + { + ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection()); + } + catch (SQLException e) + { + throw new AMQStoreException("Exception on enqueuing message " + _messageId, e); + } + } + + DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber()); } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId); + DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber()); } @@ -1709,8 +1732,11 @@ public class DerbyMessageStore implements MessageStore { private final long _messageId; + private StorableMessageMetaData _metaData; private volatile SoftReference<StorableMessageMetaData> _metaDataRef; - private Connection _conn; + private byte[] _data; + private volatile SoftReference<byte[]> _dataRef; + StoredDerbyMessage(long messageId, StorableMessageMetaData metaData) { @@ -1721,27 +1747,19 @@ public class DerbyMessageStore implements MessageStore StoredDerbyMessage(long messageId, StorableMessageMetaData metaData, boolean persist) { - try - { - _messageId = messageId; + _messageId = messageId; + - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - if(persist) - { - _conn = newConnection(); - storeMetaData(_conn, messageId, metaData); - } - } - catch (SQLException e) + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); + if(persist) { - throw new RuntimeException(e); + _metaData = metaData; } - } public StorableMessageMetaData getMetaData() { - StorableMessageMetaData metaData = _metaDataRef.get(); + StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData; if(metaData == null) { try @@ -1765,27 +1783,62 @@ public class DerbyMessageStore implements MessageStore public void addContent(int offsetInMessage, java.nio.ByteBuffer src) { - DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src); + src = src.slice(); + + if(_data == null) + { + _data = new byte[src.remaining()]; + _dataRef = new SoftReference<byte[]>(_data); + src.duplicate().get(_data); + } + else + { + byte[] oldData = _data; + _data = new byte[oldData.length + src.remaining()]; + _dataRef = new SoftReference<byte[]>(_data); + + System.arraycopy(oldData,0,_data,0,oldData.length); + src.duplicate().get(_data, oldData.length, src.remaining()); + } + } public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) { - return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst); + byte[] data = _dataRef == null ? null : _dataRef.get(); + if(data != null) + { + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; + } + else + { + return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst); + } + } + + + public ByteBuffer getContent(int offsetInMessage, int size) + { + ByteBuffer buf = ByteBuffer.allocate(size); + getContent(offsetInMessage, buf); + buf.position(0); + return buf; } - public StoreFuture flushToStore() + public synchronized StoreFuture flushToStore() { try { - if(_conn != null) + if(_metaData != null) { - if(_logger.isDebugEnabled()) - { - _logger.debug("Flushing message " + _messageId + " to store"); - } + Connection conn = newConnection(); + + store(conn); - _conn.commit(); - _conn.close(); + conn.commit(); + conn.close(); } } catch (SQLException e) @@ -1796,16 +1849,34 @@ public class DerbyMessageStore implements MessageStore } throw new RuntimeException(e); } - finally + return IMMEDIATE_FUTURE; + } + + private synchronized void store(final Connection conn) throws SQLException + { + if(_metaData != null) { - _conn = null; + try + { + storeMetaData(conn, _messageId, _metaData); + DerbyMessageStore.this.addContent(conn, _messageId, 0, + _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + } + finally + { + _metaData = null; + _data = null; + } + } + + if(_logger.isDebugEnabled()) + { + _logger.debug("Storing message " + _messageId + " to store"); } - return IMMEDIATE_FUTURE; } public void remove() { - flushToStore(); DerbyMessageStore.this.removeMessage(_messageId); } } @@ -1839,4 +1910,5 @@ public class DerbyMessageStore implements MessageStore } } } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index d008d42fa0..005055dbaa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -35,10 +35,12 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; /** A simple message store that stores the messages in a threadsafe structure in memory. */ -public class MemoryMessageStore implements MessageStore +public class MemoryMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -53,11 +55,11 @@ public class MemoryMessageStore implements MessageStore private static final Transaction IN_MEMORY_TRANSACTION = new Transaction() { - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index e2fca2f9c7..88c95ad65e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -20,14 +20,16 @@ */ package org.apache.qpid.server.store; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.logging.LogSubject; import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.message.EnqueableMessage; /** * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages. * */ -public interface MessageStore extends DurableConfigurationStore, TransactionLog +public interface MessageStore { StoreFuture IMMEDIATE_FUTURE = new StoreFuture() { @@ -77,4 +79,69 @@ public interface MessageStore extends DurableConfigurationStore, TransactionLog boolean isPersistent(); + + public static interface Transaction + { + /** + * Places a message onto a specified queue, in a given transactional context. + * + * + * + * @param queue The queue to place the message on. + * @param message + * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason. + */ + void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException; + + /** + * Extracts a message from a specified queue, in a given transactional context. + * + * @param queue The queue to place the message on. + * @param message The message to dequeue. + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. + */ + void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException; + + + /** + * Commits all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void commitTran() throws AMQStoreException; + + /** + * Commits all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + StoreFuture commitTranAsync() throws AMQStoreException; + + /** + * Abandons all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void abortTran() throws AMQStoreException; + + + + } + + public void configureTransactionLog(String name, + TransactionLogRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception; + + Transaction newTransaction(); + + + + public static interface StoreFuture + { + boolean isComplete(); + + void waitForCompletion(); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index 1f5b027b80..858a850d8c 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -26,15 +26,13 @@ import java.nio.ByteBuffer; public class StoredMemoryMessage implements StoredMessage { private final long _messageNumber; - private final ByteBuffer _content; + private ByteBuffer _content; private final StorableMessageMetaData _metaData; public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData) { _messageNumber = messageNumber; _metaData = metaData; - _content = ByteBuffer.allocate(metaData.getContentSize()); - } public long getMessageNumber() @@ -44,26 +42,79 @@ public class StoredMemoryMessage implements StoredMessage public void addContent(int offsetInMessage, ByteBuffer src) { - src = src.duplicate(); - ByteBuffer dst = _content.duplicate(); - dst.position(offsetInMessage); - dst.put(src); + if(_content == null) + { + if(offsetInMessage == 0) + { + _content = src.slice(); + } + else + { + final int contentSize = _metaData.getContentSize(); + int size = (contentSize < offsetInMessage + src.remaining()) + ? offsetInMessage + src.remaining() + : contentSize; + _content = ByteBuffer.allocate(size); + addContent(offsetInMessage, src); + } + } + else + { + if(_content.limit() >= offsetInMessage + src.remaining()) + { + _content.position(offsetInMessage); + _content.put(src); + _content.position(0); + } + else + { + final int contentSize = _metaData.getContentSize(); + int size = (contentSize < offsetInMessage + src.remaining()) + ? offsetInMessage + src.remaining() + : contentSize; + ByteBuffer oldContent = _content; + _content = ByteBuffer.allocate(size); + _content.put(oldContent); + _content.position(0); + addContent(offsetInMessage, src); + } + + } } public int getContent(int offset, ByteBuffer dst) { ByteBuffer src = _content.duplicate(); - src.position(offset); - src = src.slice(); - if(dst.remaining() < src.limit()) + + int oldPosition = src.position(); + + src.position(oldPosition + offset); + + int length = dst.remaining() < src.remaining() ? dst.remaining() : src.remaining(); + src.limit(oldPosition + length); + + dst.put(src); + + + return length; + } + + + public ByteBuffer getContent(int offsetInMessage, int size) + { + ByteBuffer buf = _content.duplicate(); + + if(offsetInMessage != 0) { - src.limit(dst.remaining()); + buf.position(offsetInMessage); + buf = buf.slice(); } - dst.put(src); - return src.limit(); + + buf.limit(size); + return buf; } - public TransactionLog.StoreFuture flushToStore() + public MessageStore.StoreFuture flushToStore() { return MessageStore.IMMEDIATE_FUTURE; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java index 0bc45c6718..d4a0381929 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -32,7 +32,9 @@ public interface StoredMessage<M extends StorableMessageMetaData> int getContent(int offsetInMessage, ByteBuffer dst); - TransactionLog.StoreFuture flushToStore(); + ByteBuffer getContent(int offsetInMessage, int size); + + MessageStore.StoreFuture flushToStore(); void remove(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java index d196a91930..da7f8d18b2 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java @@ -20,72 +20,7 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.AMQStoreException; -import org.apache.commons.configuration.Configuration; - public interface TransactionLog { - public static interface Transaction - { - /** - * Places a message onto a specified queue, in a given transactional context. - * - * @param queue The queue to place the message on. - * @param messageId The message to enqueue. - * @throws AMQStoreException If the operation fails for any reason. - */ - void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException; - - /** - * Extracts a message from a specified queue, in a given transactional context. - * - * @param queue The queue to place the message on. - * @param messageId The message to dequeue. - * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. - */ - void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException; - - - /** - * Commits all operations performed within a given transactional context. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - void commitTran() throws AMQStoreException; - - /** - * Commits all operations performed within a given transactional context. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - StoreFuture commitTranAsync() throws AMQStoreException; - - /** - * Abandons all operations performed within a given transactional context. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - void abortTran() throws AMQStoreException; - - - - } - - public void configureTransactionLog(String name, - TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception; - - Transaction newTransaction(); - - - - public static interface StoreFuture - { - boolean isComplete(); - - void waitForCompletion(); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java index 7781c52df3..802596ed1e 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.store; public interface TransactionLogRecoveryHandler { - QueueEntryRecoveryHandler begin(TransactionLog log); + QueueEntryRecoveryHandler begin(MessageStore log); public static interface QueueEntryRecoveryHandler { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 6c18b2e229..dea88c4776 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -62,7 +62,6 @@ import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Session; import org.apache.qpid.transport.Struct; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -70,10 +69,10 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import java.text.MessageFormat; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.locks.Lock; @@ -183,6 +182,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); } _queue = queue; + Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments(); _traceExclude = (String) arguments.get("qpid.trace.exclude"); _trace = (String) arguments.get("qpid.trace.id"); @@ -224,8 +224,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr if (_noLocal && entry.getMessage() instanceof MessageTransferMessage) { - Session messageSession= ((MessageTransferMessage)entry.getMessage()).getSession(); - if (messageSession != null && messageSession.getConnection() == _session.getConnection()) + Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference(); + if (connectionRef != null && connectionRef == _session.getReference()) { return false; } @@ -377,35 +377,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { MessageTransferMessage msg = (MessageTransferMessage) serverMsg; - - - Struct[] headers; - if(msg.getHeader() == null) - { - headers = EMPTY_STRUCT_ARRAY; - } - else - { - headers = msg.getHeader().getStructs(); - } - - ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length); - DeliveryProperties origDeliveryProps = null; - for(Struct header : headers) - { - if(header instanceof DeliveryProperties) - { - origDeliveryProps = (DeliveryProperties) header; - } - else - { - if(header instanceof MessageProperties) - { - messageProps = (MessageProperties) header; - } - newHeaders.add(header); - } - } + DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); + messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); deliveryProps = new DeliveryProperties(); if(origDeliveryProps != null) @@ -440,17 +413,16 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr deliveryProps.setRedelivered(entry.isRedelivered()); - newHeaders.add(deliveryProps); - if(_trace != null && messageProps == null) { messageProps = new MessageProperties(); - newHeaders.add(messageProps); } - Header header = new Header(newHeaders); + Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); + - xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody()); + xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) + : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody()); } else if(serverMsg instanceof AMQMessage) { @@ -463,8 +435,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr message_0_8.getContent(body, 0); body.flip(); - Struct[] headers = new Struct[] { deliveryProps, messageProps }; - BasicContentHeaderProperties properties = (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties(); final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange(); @@ -505,8 +475,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr messageProps.setApplicationHeaders(appHeaders); - Header header = new Header(headers); - xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body); + Header header = new Header(deliveryProps, messageProps, null); + xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED) + : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body); } else { @@ -519,8 +490,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr serverMsg.getContent(body, 0); body.flip(); - Struct[] headers = new Struct[] { deliveryProps, messageProps }; - deliveryProps.setExpiration(serverMsg.getExpiration()); deliveryProps.setImmediate(serverMsg.isImmediate()); @@ -567,8 +536,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr messageProps.setApplicationHeaders(appHeaders); */ - Header header = new Header(headers); - xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body); + Header header = new Header(deliveryProps, messageProps, null); + xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED) + : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body); } boolean excludeDueToFederation = false; @@ -644,28 +614,51 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } } - private void forceDequeue(final QueueEntry entry, final boolean restoreCredit) + private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) { - ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); - txn.dequeue(entry.getQueue(),entry.getMessage(), - new ServerTransaction.Action() - { - public void postCommit() - { - if(restoreCredit) - { - restoreCredit(entry); - } - entry.discard(); - } + _deferredMessageCredit += deferredMessageCredit; + _deferredSizeCredit += deferredSizeCredit; - public void onRollback() - { + } - } - }); + public void flushCreditState() + { + flushCreditState(false); + } + public void flushCreditState(boolean strict) + { + if(strict || !isSuspended() || _deferredMessageCredit >= 200 + || !(_creditManager instanceof WindowCreditManager) + || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 ) + { + _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit); + _deferredMessageCredit = 0; + _deferredSizeCredit = 0l; + } } + private void forceDequeue(final QueueEntry entry, final boolean restoreCredit) + { + AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); + dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(), + new ServerTransaction.Action() + { + public void postCommit() + { + if (restoreCredit) + { + restoreCredit(entry); + } + entry.discard(); + } + + public void onRollback() + { + + } + }); + } + void reject(final QueueEntry entry) { entry.setRedelivered(); @@ -704,7 +697,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { final InboundMessage m = new InboundMessageAdapter(entry); - final ArrayList<? extends BaseQueue> destinationQueues = alternateExchange.route(m); + final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m); if (destinationQueues == null || destinationQueues.isEmpty()) { @@ -751,6 +744,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _stateChangeLock.tryLock(); } + public void getSendLock() { _stateChangeLock.lock(); @@ -816,28 +810,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _properties.get(key); } - private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) - { - _deferredMessageCredit += deferredMessageCredit; - _deferredSizeCredit += deferredSizeCredit; - - } - - public void flushCreditState() - { - flushCreditState(false); - } - public void flushCreditState(boolean strict) - { - if(strict || !isSuspended() || _deferredMessageCredit >= 200 - || !(_creditManager instanceof WindowCreditManager) - || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 ) - { - _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit); - _deferredMessageCredit = 0; - _deferredSizeCredit = 0l; - } - } public FlowCreditManager_0_10 getCreditManager() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 00f0c9f0f1..ab07ed20f6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec private boolean _statisticsEnabled = false; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final long _connectionId; - + private final Object _reference = new Object(); private ServerConnectionMBean _mBean; private VirtualHost _virtualHost; private AtomicLong _lastIoTime = new AtomicLong(); @@ -90,6 +90,11 @@ public class ServerConnection extends Connection implements Managable, AMQConnec return _config.getId(); } + public Object getReference() + { + return _reference; + } + @Override protected void invoke(Method method) { @@ -414,13 +419,11 @@ public class ServerConnection extends Connection implements Managable, AMQConnec return _connectionId; } - @Override public boolean isSessionNameUnique(byte[] name) { return !super.hasSessionWithName(name); } - @Override public String getUserName() { return _authorizedPrincipal.getName(); @@ -450,11 +453,11 @@ public class ServerConnection extends Connection implements Managable, AMQConnec { for (Session ssn : getChannels()) { - ((ServerSession)ssn).flushCreditState(); + ((ServerSession)ssn).receivedComplete(); } } - @Override + public ManagedObject getManagedObject() { return _mBean; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 1ce7b806d8..8e6d33d3bc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; import static org.apache.qpid.util.Serial.gt; -import java.lang.ref.WeakReference; import java.security.Principal; import java.text.MessageFormat; import java.util.ArrayList; @@ -71,6 +70,7 @@ import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; import org.slf4j.Logger; @@ -86,6 +86,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private ConnectionConfig _connectionConfig; private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); + private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction(); public static interface MessageDispositionChangeListener { @@ -121,8 +122,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); - private final WeakReference<Session> _reference; - ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) { this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig()); @@ -134,7 +133,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi _connectionConfig = connConfig; _transaction = new AutoCommitTransaction(this.getMessageStore()); - _reference = new WeakReference<Session>(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); } @@ -161,40 +159,22 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi return isCommandsFull(id); } - public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues) + public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues) { getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - _transaction.enqueue(queues,message, new ServerTransaction.Action() - { - - BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); - - public void postCommit() - { - MessageReference<?> ref = message.newReference(); - for(int i = 0; i < _queues.length; i++) - { - try - { - _queues[i].enqueue(message); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - ref.release(); - } - - public void onRollback() - { - // NO-OP - } - }); - - incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); + PostEnqueueAction postTransactionAction; + if(isTransactional()) + { + postTransactionAction = new PostEnqueueAction(queues, message) ; + } + else + { + postTransactionAction = _postEnqueueAction; + postTransactionAction.setState(queues, message); + } + _transaction.enqueue(queues,message, postTransactionAction, 0L); + incrementOutstandingTxnsIfNecessary(); + updateTransactionalActivity(); } @@ -252,7 +232,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public RangeSet acquire(RangeSet transfers) { - RangeSet acquired = new RangeSet(); + RangeSet acquired = RangeSetFactory.createRangeSet(); if(!_messageDispositionListenerMap.isEmpty()) { @@ -300,41 +280,56 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void dispositionChange(RangeSet ranges, MessageDispositionAction action) { - if(ranges != null && !_messageDispositionListenerMap.isEmpty()) + if(ranges != null) { - Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); - Iterator<Range> rangeIter = ranges.iterator(); - if(rangeIter.hasNext()) + if(ranges.size() == 1) { - Range range = rangeIter.next(); + Range r = ranges.getFirst(); + for(int i = r.getLower(); i <= r.getUpper(); i++) + { + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i); + if(changeListener != null) + { + action.performAction(changeListener); + } + } + } + else if(!_messageDispositionListenerMap.isEmpty()) + { + Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); + Iterator<Range> rangeIter = ranges.iterator(); - while(range != null && unacceptedMessages.hasNext()) + if(rangeIter.hasNext()) { - int next = unacceptedMessages.next(); - while(gt(next, range.getUpper())) + Range range = rangeIter.next(); + + while(range != null && unacceptedMessages.hasNext()) { - if(rangeIter.hasNext()) + int next = unacceptedMessages.next(); + while(gt(next, range.getUpper())) { - range = rangeIter.next(); + if(rangeIter.hasNext()) + { + range = rangeIter.next(); + } + else + { + range = null; + break; + } } - else + if(range != null && range.includes(next)) { - range = null; - break; + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next); + action.performAction(changeListener); } - } - if(range != null && range.includes(next)) - { - MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next); - action.performAction(changeListener); - } - } + } + } } - } } @@ -534,10 +529,10 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi _taskList.remove(task); } - public WeakReference<Session> getReference() - { - return _reference; - } + public Object getReference() + { + return ((ServerConnection) getConnection()).getReference(); + } public MessageStore getMessageStore() { @@ -697,7 +692,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } - public void flushCreditState() + public void receivedComplete() { final Collection<Subscription_0_10> subscriptions = getSubscriptions(); for (Subscription_0_10 subscription_0_10 : subscriptions) @@ -706,6 +701,54 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } + private static class PostEnqueueAction implements ServerTransaction.Action + { + + private List<? extends BaseQueue> _queues; + private ServerMessage _message; + private final boolean _transactional; + + public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message) + { + _transactional = true; + setState(queues, message); + } + + public PostEnqueueAction() + { + _transactional = false; + } + + public void setState(List<? extends BaseQueue> queues, ServerMessage message) + { + _message = message; + _queues = queues; + } + + public void postCommit() + { + MessageReference<?> ref = _message.newReference(); + for(int i = 0; i < _queues.size(); i++) + { + try + { + _queues.get(i).enqueue(_message, _transactional, null); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + } + ref.release(); + } + + public void onRollback() + { + // NO-OP + } + } + public int getUnacknowledgedMessageCount() { return _messageDispositionListenerMap.size(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index a0dca53ed0..b6e142a5fd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.transport; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import org.apache.log4j.Logger; @@ -55,46 +56,7 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.Acquired; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.ExchangeBind; -import org.apache.qpid.transport.ExchangeBound; -import org.apache.qpid.transport.ExchangeBoundResult; -import org.apache.qpid.transport.ExchangeDeclare; -import org.apache.qpid.transport.ExchangeDelete; -import org.apache.qpid.transport.ExchangeQuery; -import org.apache.qpid.transport.ExchangeQueryResult; -import org.apache.qpid.transport.ExchangeUnbind; -import org.apache.qpid.transport.ExecutionErrorCode; -import org.apache.qpid.transport.ExecutionException; -import org.apache.qpid.transport.MessageAccept; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquire; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCancel; -import org.apache.qpid.transport.MessageFlow; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageFlush; -import org.apache.qpid.transport.MessageReject; -import org.apache.qpid.transport.MessageRejectCode; -import org.apache.qpid.transport.MessageRelease; -import org.apache.qpid.transport.MessageResume; -import org.apache.qpid.transport.MessageSetFlowMode; -import org.apache.qpid.transport.MessageStop; -import org.apache.qpid.transport.MessageSubscribe; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.QueueDeclare; -import org.apache.qpid.transport.QueueDelete; -import org.apache.qpid.transport.QueuePurge; -import org.apache.qpid.transport.QueueQuery; -import org.apache.qpid.transport.QueueQueryResult; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.TxCommit; -import org.apache.qpid.transport.TxRollback; -import org.apache.qpid.transport.TxSelect; +import org.apache.qpid.transport.*; public class ServerSessionDelegate extends SessionDelegate { @@ -295,7 +257,8 @@ public class ServerSessionDelegate extends SessionDelegate final Exchange exchange = getExchangeForMessage(ssn, xfr); DeliveryProperties delvProps = null; - if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) + if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps + .hasExpiration()) { delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); } @@ -312,7 +275,7 @@ public class ServerSessionDelegate extends SessionDelegate } final Exchange exchangeInUse; - ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData); + List<? extends BaseQueue> queues = exchange.route(messageMetaData); if(queues.isEmpty() && exchange.getAlternateExchange() != null) { final Exchange alternateExchange = exchange.getAlternateExchange(); @@ -334,15 +297,16 @@ public class ServerSessionDelegate extends SessionDelegate if(!queues.isEmpty()) { final MessageStore store = getVirtualHost(ssn).getMessageStore(); - final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store); + final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference()); ((ServerSession) ssn).enqueue(message, queues); + storeMessage.flushToStore(); } else { if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) { - RangeSet rejects = new RangeSet(); + RangeSet rejects = RangeSetFactory.createRangeSet(); rejects.add(xfr.getId()); MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); ssn.invoke(reject); @@ -353,11 +317,13 @@ public class ServerSessionDelegate extends SessionDelegate } } + + ssn.processed(xfr); } - private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr, - final MessageMetaData_0_10 messageMetaData, final MessageStore store) + private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr, + final MessageMetaData_0_10 messageMetaData, final MessageStore store) { final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData); ByteBuffer body = xfr.getBody(); @@ -365,7 +331,6 @@ public class ServerSessionDelegate extends SessionDelegate { storeMessage.addContent(0, body); } - storeMessage.flushToStore(); return storeMessage; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index 36e9d78440..a67d4badd1 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.txn; +import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -30,7 +31,7 @@ import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.MessageStore; /** * An implementation of ServerTransaction where each enqueue/dequeue @@ -43,11 +44,11 @@ public class AutoCommitTransaction implements ServerTransaction { protected static final Logger _logger = Logger.getLogger(AutoCommitTransaction.class); - private final TransactionLog _transactionLog; + private final MessageStore _messageStore; - public AutoCommitTransaction(TransactionLog transactionLog) + public AutoCommitTransaction(MessageStore transactionLog) { - _transactionLog = transactionLog; + _messageStore = transactionLog; } public long getTransactionStartTime() @@ -59,14 +60,14 @@ public class AutoCommitTransaction implements ServerTransaction * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered * by the caller are executed immediately. */ - public void addPostTransactionAction(Action immediateAction) + public void addPostTransactionAction(final Action immediateAction) { immediateAction.postCommit(); } public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - TransactionLog.Transaction txn = null; + MessageStore.Transaction txn = null; try { if(message.isPersistent() && queue.isDurable()) @@ -76,8 +77,8 @@ public class AutoCommitTransaction implements ServerTransaction _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); } - txn = _transactionLog.newTransaction(); - txn.dequeueMessage(queue, message.getMessageNumber()); + txn = _messageStore.newTransaction(); + txn.dequeueMessage(queue, message); txn.commitTran(); txn = null; } @@ -98,7 +99,7 @@ public class AutoCommitTransaction implements ServerTransaction public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { - TransactionLog.Transaction txn = null; + MessageStore.Transaction txn = null; try { for(QueueEntry entry : queueEntries) @@ -115,10 +116,10 @@ public class AutoCommitTransaction implements ServerTransaction if(txn == null) { - txn = _transactionLog.newTransaction(); + txn = _messageStore.newTransaction(); } - txn.dequeueMessage(queue, message.getMessageNumber()); + txn.dequeueMessage(queue, message); } } @@ -145,7 +146,7 @@ public class AutoCommitTransaction implements ServerTransaction public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - TransactionLog.Transaction txn = null; + MessageStore.Transaction txn = null; try { if(message.isPersistent() && queue.isDurable()) @@ -155,8 +156,8 @@ public class AutoCommitTransaction implements ServerTransaction _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); } - txn = _transactionLog.newTransaction(); - txn.enqueueMessage(queue, message.getMessageNumber()); + txn = _messageStore.newTransaction(); + txn.enqueueMessage(queue, message); txn.commitTran(); txn = null; } @@ -176,15 +177,14 @@ public class AutoCommitTransaction implements ServerTransaction } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) { - TransactionLog.Transaction txn = null; + MessageStore.Transaction txn = null; try { if(message.isPersistent()) { - Long id = message.getMessageNumber(); for(BaseQueue queue : queues) { if (queue.isDurable()) @@ -195,22 +195,26 @@ public class AutoCommitTransaction implements ServerTransaction } if (txn == null) { - txn = _transactionLog.newTransaction(); + txn = _messageStore.newTransaction(); } - txn.enqueueMessage(queue, id); + txn.enqueueMessage(queue, message); + + } } - if (txn != null) - { - txn.commitTran(); - txn = null; - - } } + if (txn != null) + { + txn.commitTran(); + txn = null; + } + postTransactionAction.postCommit(); postTransactionAction = null; + + } catch (AMQException e) { @@ -225,6 +229,11 @@ public class AutoCommitTransaction implements ServerTransaction } + public void commit(final Runnable immediatePostTransactionAction) + { + immediatePostTransactionAction.run(); + } + public void commit() { } @@ -233,7 +242,7 @@ public class AutoCommitTransaction implements ServerTransaction { } - private void rollbackIfNecessary(Action postTransactionAction, TransactionLog.Transaction txn) + private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn) { if (txn != null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 946dbd7c28..7f5b5fb8b2 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -29,11 +29,7 @@ import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.MessageStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,11 +46,11 @@ public class LocalTransaction implements ServerTransaction private final List<Action> _postTransactionActions = new ArrayList<Action>(); - private volatile TransactionLog.Transaction _transaction; - private TransactionLog _transactionLog; + private volatile MessageStore.Transaction _transaction; + private MessageStore _transactionLog; private long _txnStartTime = 0L; - public LocalTransaction(TransactionLog transactionLog) + public LocalTransaction(MessageStore transactionLog) { _transactionLog = transactionLog; } @@ -63,7 +59,7 @@ public class LocalTransaction implements ServerTransaction { return _transaction != null; } - + public long getTransactionStartTime() { return _txnStartTime; @@ -88,7 +84,7 @@ public class LocalTransaction implements ServerTransaction } beginTranIfNecessary(); - _transaction.dequeueMessage(queue, message.getMessageNumber()); + _transaction.dequeueMessage(queue, message); } catch(AMQException e) @@ -118,7 +114,7 @@ public class LocalTransaction implements ServerTransaction } beginTranIfNecessary(); - _transaction.dequeueMessage(queue, message.getMessageNumber()); + _transaction.dequeueMessage(queue, message); } } @@ -191,7 +187,7 @@ public class LocalTransaction implements ServerTransaction } beginTranIfNecessary(); - _transaction.enqueueMessage(queue, message.getMessageNumber()); + _transaction.enqueueMessage(queue, message); } catch (Exception e) { @@ -202,13 +198,13 @@ public class LocalTransaction implements ServerTransaction } } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) { _postTransactionActions.add(postTransactionAction); if (_txnStartTime == 0L) { - _txnStartTime = System.currentTimeMillis(); + _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime; } if(message.isPersistent()) @@ -226,7 +222,7 @@ public class LocalTransaction implements ServerTransaction beginTranIfNecessary(); - _transaction.enqueueMessage(queue, message.getMessageNumber()); + _transaction.enqueueMessage(queue, message); } } @@ -242,6 +238,11 @@ public class LocalTransaction implements ServerTransaction public void commit() { + commit(null); + } + + public void commit(Runnable immediateAction) + { try { if(_transaction != null) @@ -249,9 +250,14 @@ public class LocalTransaction implements ServerTransaction _transaction.commitTran(); } - for(Action action : _postTransactionActions) + if(immediateAction != null) + { + immediateAction.run(); + } + + for(int i = 0; i < _postTransactionActions.size(); i++) { - action.postCommit(); + _postTransactionActions.get(i).postCommit(); } } catch (Exception e) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index b3c6e1ac3a..64fdc0ba9a 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.txn; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; @@ -42,7 +43,7 @@ import java.util.List; */ public interface ServerTransaction { - /** + /** * Represents an action to be performed on transaction commit or rollback */ public static interface Action @@ -91,7 +92,7 @@ public interface ServerTransaction * * Store operations will result only for a persistent messages on durable queues. */ - void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction); + void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime); /** * Commit the transaction represented by this object. @@ -101,6 +102,8 @@ public interface ServerTransaction */ void commit(); + void commit(Runnable immediatePostTransactionAction); + /** Rollback the transaction represented by this object. * * If the caller has registered one or more Actions, the onRollback() method on each will diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 04f19b79bb..24ab29444c 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -39,7 +39,6 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TransactionLog; public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer { @@ -57,8 +56,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo MessageStore getMessageStore(); - TransactionLog getTransactionLog(); - DurableConfigurationStore getDurableConfigurationStore(); AuthenticationManager getAuthenticationManager(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 0fd31973b2..aad3547789 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.server.virtualhost; +import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; @@ -73,7 +73,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private List<ProcessAction> _actions; private MessageStore _store; - private TransactionLog _transactionLog; private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); @@ -86,7 +85,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa _virtualHost = virtualHost; } - public QueueRecoveryHandler begin(MessageStore store) + public VirtualHostConfigRecoveryHandler begin(MessageStore store) { _logSubject = new MessageStoreLogSubject(_virtualHost,store); _store = store; @@ -99,14 +98,12 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { try { - AMQShortString queueNameShortString = new AMQShortString(queueName); - - AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString); + AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName); if (q == null) { - q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, exclusive, _virtualHost, - arguments); + q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost, + FieldTable.convertToMap(arguments)); _virtualHost.getQueueRegistry().registerQueue(q); } @@ -186,12 +183,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa //To change body of implemented methods use File | Settings | File Templates. } - public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log) - { - _transactionLog = log; - return this; - } - private static final class ProcessAction { private final AMQQueue _queue; @@ -316,15 +307,15 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa else { _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded"); - TransactionLog.Transaction txn = _transactionLog.newTransaction(); - txn.dequeueMessage(queue, messageId); + MessageStore.Transaction txn = _store.newTransaction(); + txn.dequeueMessage(queue, new DummyMessage(messageId)); txn.commitTranAsync(); } } else { _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded"); - TransactionLog.Transaction txn = _transactionLog.newTransaction(); + MessageStore.Transaction txn = _store.newTransaction(); TransactionLogResource mockQueue = new TransactionLogResource() { @@ -334,7 +325,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa return queueName; } }; - txn.dequeueMessage(mockQueue, messageId); + txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); txn.commitTranAsync(); } @@ -367,4 +358,32 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); } + private static class DummyMessage implements EnqueableMessage + { + + + private final long _messageId; + + public DummyMessage(long messageId) + { + _messageId = messageId; + } + + public long getMessageNumber() + { + return _messageId; + } + + + public boolean isPersistent() + { + return true; + } + + + public StoredMessage getStoredMessage() + { + return null; + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 17c65003e9..9b1c5474a3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -75,7 +75,6 @@ import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; @@ -228,7 +227,10 @@ public class VirtualHostImpl implements VirtualHost if (store != null) { _messageStore = store; - _durableConfigurationStore = store; + if(store instanceof DurableConfigurationStore) + { + _durableConfigurationStore = (DurableConfigurationStore) store; + } } else { @@ -380,6 +382,8 @@ public class VirtualHostImpl implements VirtualHost Class clazz = Class.forName(messageStoreClass); Object o = clazz.newInstance(); + + if (!(o instanceof MessageStore)) { throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + @@ -390,10 +394,18 @@ public class VirtualHostImpl implements VirtualHost MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore); - messageStore.configureConfigStore(this.getName(), - recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); + + if(messageStore instanceof DurableConfigurationStore) + { + DurableConfigurationStore durableConfigurationStore = (DurableConfigurationStore) messageStore; + + durableConfigurationStore.configureConfigStore(this.getName(), + recoveryHandler, + hostConfig.getStoreConfiguration(), + storeLogSubject); + + _durableConfigurationStore = durableConfigurationStore; + } messageStore.configureMessageStore(this.getName(), recoveryHandler, @@ -405,7 +417,8 @@ public class VirtualHostImpl implements VirtualHost storeLogSubject); _messageStore = messageStore; - _durableConfigurationStore = messageStore; + + } private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException @@ -553,11 +566,6 @@ public class VirtualHostImpl implements VirtualHost return _messageStore; } - public TransactionLog getTransactionLog() - { - return _messageStore; - } - public DurableConfigurationStore getDurableConfigurationStore() { return _durableConfigurationStore; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index ea2fe90da6..24c790d799 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -72,7 +72,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase /** * Not used in this test, just there to stub out the routing calls */ - private MessageStore _store = new MemoryMessageStore(); + private MemoryMessageStore _store = new MemoryMessageStore(); BindingFactory bindingFactory = new BindingFactory(new DurableConfigurationStore.Source() @@ -310,7 +310,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase * @throws AMQException */ @Override - public void enqueue(ServerMessage msg, PostEnqueueAction action) throws AMQException + public void enqueue(ServerMessage msg, boolean sync, PostEnqueueAction action) throws AMQException { messages.add( new HeadersExchangeTest.Message((AMQMessage) msg)); final QueueEntry queueEntry = new QueueEntry() @@ -318,47 +318,47 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase public AMQQueue getQueue() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public AMQMessage getMessage() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public long getSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public boolean getDeliveredToConsumer() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean expired() throws AMQException { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isAvailable() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isAcquired() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean acquire() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean acquire(Subscription sub) { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean delete() @@ -373,17 +373,17 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase public boolean acquiredBySubscription() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isAcquiredBy(Subscription subscription) { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public void release() { - //To change body of implemented methods use File | Settings | File Templates. + } public boolean releaseButRetain() @@ -393,82 +393,82 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase public boolean immediateAndNotDelivered() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public void setRedelivered() { - //To change body of implemented methods use File | Settings | File Templates. + } public AMQMessageHeader getMessageHeader() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public boolean isPersistent() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isRedelivered() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public Subscription getDeliveredSubscription() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void reject() { - //To change body of implemented methods use File | Settings | File Templates. + } public boolean isRejectedBy(long subscriptionId) { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public void dequeue() { - //To change body of implemented methods use File | Settings | File Templates. + } public void dispose() { - //To change body of implemented methods use File | Settings | File Templates. + } public void discard() { - //To change body of implemented methods use File | Settings | File Templates. + } public void routeToAlternate() { - //To change body of implemented methods use File | Settings | File Templates. + } public boolean isQueueDeleted() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public void addStateChangeListener(StateChangeListener listener) { - //To change body of implemented methods use File | Settings | File Templates. + } public boolean removeStateChangeListener(StateChangeListener listener) { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public int compareTo(final QueueEntry o) { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public boolean isDequeued() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 2ce43052d9..d5f8ef3d54 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -64,17 +64,17 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest ArrayList<QueueEntry> msgs = _subscription.getMessages(); try { - assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber()); - assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber()); - assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber()); + assertEquals(1L, msgs.get(0).getMessage().getMessageNumber()); + assertEquals(6L, msgs.get(1).getMessage().getMessageNumber()); + assertEquals(8L, msgs.get(2).getMessage().getMessageNumber()); - assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber()); - assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber()); - assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber()); + assertEquals(2L, msgs.get(3).getMessage().getMessageNumber()); + assertEquals(5L, msgs.get(4).getMessage().getMessageNumber()); + assertEquals(7L, msgs.get(5).getMessage().getMessageNumber()); - assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber()); - assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber()); - assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber()); + assertEquals(3L, msgs.get(6).getMessage().getMessageNumber()); + assertEquals(4L, msgs.get(7).getMessage().getMessageNumber()); + assertEquals(9L, msgs.get(8).getMessage().getMessageNumber()); } catch (AssertionFailedError afe) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 0daf79122c..f43af447ff 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -168,22 +168,22 @@ public class MockAMQQueue implements AMQQueue public UUID getId() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public QueueConfigType getConfigType() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public ConfiguredObject getParent() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public boolean isDurable() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isAutoDelete() @@ -199,7 +199,7 @@ public class MockAMQQueue implements AMQQueue public AMQShortString getOwner() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void setVirtualHost(VirtualHost virtualhost) @@ -219,22 +219,22 @@ public class MockAMQQueue implements AMQQueue public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + } public void unregisterSubscription(Subscription subscription) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + } public int getConsumerCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public int getActiveConsumerCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public boolean hasExclusiveSubscriber() @@ -244,37 +244,37 @@ public class MockAMQQueue implements AMQQueue public boolean isUnused() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isEmpty() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public int getMessageCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public int getUndeliveredMessageCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public long getQueueDepth() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public long getReceivedMessageCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public long getOldestMessageArrivalTime() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public boolean isDeleted() @@ -297,59 +297,58 @@ public class MockAMQQueue implements AMQQueue } + public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException + { + } + public void requeue(QueueEntry entry) { - //To change body of implemented methods use File | Settings | File Templates. } public void requeue(QueueEntryImpl storeContext, Subscription subscription) { - //To change body of implemented methods use File | Settings | File Templates. } public void dequeue(QueueEntry entry, Subscription sub) { - //To change body of implemented methods use File | Settings | File Templates. } public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public void addQueueDeleteTask(Task task) { - //To change body of implemented methods use File | Settings | File Templates. } public void removeQueueDeleteTask(final Task task) { - //To change body of implemented methods use File | Settings | File Templates. } public List<QueueEntry> getMessagesOnTheQueue() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public List<Long> getMessagesOnTheQueue(int num) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public List<Long> getMessagesOnTheQueue(int num, int offest) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public QueueEntry getMessageOnTheQueue(long messageId) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition) @@ -359,132 +358,123 @@ public class MockAMQQueue implements AMQQueue public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext) { - //To change body of implemented methods use File | Settings | File Templates. + } public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext) { - //To change body of implemented methods use File | Settings | File Templates. + } public void removeMessagesFromQueue(long fromMessageId, long toMessageId) { - //To change body of implemented methods use File | Settings | File Templates. + } public long getMaximumMessageSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setMaximumMessageSize(long value) { - //To change body of implemented methods use File | Settings | File Templates. + } public long getMaximumMessageCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setMaximumMessageCount(long value) { - //To change body of implemented methods use File | Settings | File Templates. + } public long getMaximumQueueDepth() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setMaximumQueueDepth(long value) { - //To change body of implemented methods use File | Settings | File Templates. + } public long getMaximumMessageAge() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setMaximumMessageAge(long maximumMessageAge) { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean getBlockOnQueueFull() - { - return false; - } - - public void setBlockOnQueueFull(boolean block) - { + } public long getMinimumAlertRepeatGap() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void deleteMessageFromTop() { - //To change body of implemented methods use File | Settings | File Templates. + } public long clearQueue() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void checkMessageStatus() throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + } public Set<NotificationCheck> getNotificationChecks() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void flushSubscription(Subscription sub) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + } public void deliverAsync(Subscription sub) { - //To change body of implemented methods use File | Settings | File Templates. + } public void deliverAsync() { - //To change body of implemented methods use File | Settings | File Templates. + } public void stop() { - //To change body of implemented methods use File | Settings | File Templates. + } public boolean isExclusive() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public Exchange getAlternateExchange() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void setAlternateExchange(Exchange exchange) { - //To change body of implemented methods use File | Settings | File Templates. + } public Map<String, Object> getArguments() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void checkCapacity(AMQChannel channel) @@ -493,12 +483,12 @@ public class MockAMQQueue implements AMQQueue public ManagedObject getManagedObject() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public int compareTo(AMQQueue o) { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setMinimumAlertRepeatGap(long value) @@ -508,22 +498,22 @@ public class MockAMQQueue implements AMQQueue public long getCapacity() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setCapacity(long capacity) { - //To change body of implemented methods use File | Settings | File Templates. + } public long getFlowResumeCapacity() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setFlowResumeCapacity(long flowResumeCapacity) { - //To change body of implemented methods use File | Settings | File Templates. + } public void configure(ConfigurationPlugin config) @@ -533,7 +523,7 @@ public class MockAMQQueue implements AMQQueue public ConfigurationPlugin getConfiguration() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public AuthorizationHolder getAuthorizationHolder() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java index 78ed3e9f34..75f633f2af 100755 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.framing.ContentHeaderBody; @@ -97,7 +96,17 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> return src.limit(); } - public TransactionLog.StoreFuture flushToStore() + + + public ByteBuffer getContent(int offsetInMessage, int size) + { + ByteBuffer buf = ByteBuffer.allocate(size); + getContent(offsetInMessage, buf); + buf.position(0); + return buf; + } + + public MessageStore.StoreFuture flushToStore() { return MessageStore.IMMEDIATE_FUTURE; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index 7a3f6f701c..cf910208e7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -164,7 +164,7 @@ public abstract class QueueEntryListTestBase extends TestCase final QueueEntry head = getTestList().getHead(); assertNull("Head entry should not contain an actual message", head.getMessage()); assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head) - .getMessage().getMessageNumber().longValue()); + .getMessage().getMessageNumber()); } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 6c7094cac0..28d52f4fd1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -649,9 +649,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void onRollback() { } - }); - - + }, 0L); // Check that it is enqueued AMQQueue data = _store.getMessages().get(1L); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java index f3ba6a5495..a873739ca7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -162,8 +162,8 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase while (entry != null) { assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted()); - assertNotNull("QueueEntry was not found in the list of remaining entries", - remainingMessages.get(entry.getMessage().getMessageNumber().intValue())); + assertNotNull("QueueEntry "+entry.getMessage().getMessageNumber()+" was not found in the list of remaining entries " + remainingMessages, + remainingMessages.get((int)(entry.getMessage().getMessageNumber()))); count++; entry = entry.getNextNode(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index eca845644e..34ad0e5668 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -317,7 +317,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase assertEquals("Sorted queue entry value is not as expected", expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY")); assertEquals("Sorted queue entry id is not as expected", - Long.valueOf(expectedMessageId), entry.getMessage().getMessageNumber()); + expectedMessageId, entry.getMessage().getMessageNumber()); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 1d0a9d6316..90adaa1319 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -558,7 +558,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase /** * Delete the Store Environment path * - * @param configuration The configuration that contains the store environment path. + * @param environmentPath The configuration that contains the store environment path. */ private void cleanup(File environmentPath) { @@ -636,7 +636,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase { //To change body of implemented methods use File | Settings | File Templates. } - }); + }, 0L); } } @@ -710,7 +710,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase if (queue.isDurable() && !queue.isAutoDelete()) { - getVirtualHost().getMessageStore().createQueue(queue, queueArguments); + getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments); } } catch (AMQException e) @@ -754,7 +754,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase getVirtualHost().getExchangeRegistry().registerExchange(exchange); if (durable) { - getVirtualHost().getMessageStore().createExchange(exchange); + getVirtualHost().getDurableConfigurationStore().createExchange(exchange); } } catch (AMQException e) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 5ff84557d8..44006df517 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.exchange.Exchange; @@ -42,18 +43,11 @@ import java.nio.ByteBuffer; */ public class SkeletonMessageStore implements MessageStore { - private final AtomicLong _messageId = new AtomicLong(1); - - public void configure(String base, Configuration config) throws Exception - { - } - public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config, LogSubject logSubject) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. } public void configureMessageStore(String name, @@ -61,7 +55,6 @@ public class SkeletonMessageStore implements MessageStore Configuration config, LogSubject logSubject) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. } public void close() throws Exception @@ -70,31 +63,28 @@ public class SkeletonMessageStore implements MessageStore public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } - public void removeMessage(Long messageId) - { - } public void createExchange(Exchange exchange) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public void removeExchange(Exchange exchange) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public void createQueue(AMQQueue queue) throws AMQStoreException @@ -105,63 +95,11 @@ public class SkeletonMessageStore implements MessageStore { } - - - - public List<AMQQueue> createQueues() throws AMQException - { - return null; - } - - public Long getNewMessageId() - { - return _messageId.getAndIncrement(); - } - - public void storeContentBodyChunk( - Long messageId, - int index, - ContentChunk contentBody, - boolean lastContentBody) throws AMQException - { - - } - - public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException - { - - } - - public MessageMetaData getMessageMetaData(Long messageId) throws AMQException - { - return null; - } - - public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException - { - return null; - } - public boolean isPersistent() { return false; } - public void storeMessageHeader(Long messageNumber, ServerMessage message) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void storeContent(Long messageNumber, long offset, ByteBuffer body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public ServerMessage getMessage(Long messageNumber) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - public void removeQueue(final AMQQueue queue) throws AMQStoreException { @@ -172,7 +110,7 @@ public class SkeletonMessageStore implements MessageStore Configuration storeConfiguration, LogSubject logSubject) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. + } public Transaction newTransaction() @@ -180,19 +118,19 @@ public class SkeletonMessageStore implements MessageStore return new Transaction() { - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public void commitTran() throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public StoreFuture commitTranAsync() throws AMQStoreException @@ -213,7 +151,7 @@ public class SkeletonMessageStore implements MessageStore public void abortTran() throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } }; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java index 4dea13d391..fa698f4cf8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -82,6 +82,12 @@ public class TestMemoryMessageStore extends MemoryMessageStore return _storedMessage.getContent(offsetInMessage, dst); } + + public ByteBuffer getContent(int offsetInMessage, int size) + { + return _storedMessage.getContent(offsetInMessage, size); + } + public StoreFuture flushToStore() { return _storedMessage.flushToStore(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 3593297a05..3804d0dc8e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -25,6 +25,8 @@ import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; /** @@ -66,14 +68,14 @@ public class TestableMemoryMessageStore extends MemoryMessageStore private class TestableTransaction implements Transaction { - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - getMessages().put(messageId, (AMQQueue)queue); + getMessages().put(message.getMessageNumber(), (AMQQueue)queue); } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - getMessages().remove(messageId); + getMessages().remove(message.getMessageNumber()); } public void commitTran() throws AMQStoreException @@ -143,6 +145,12 @@ public class TestableMemoryMessageStore extends MemoryMessageStore return _storedMessage.getContent(offsetInMessage, dst); } + + public ByteBuffer getContent(int offsetInMessage, int size) + { + return _storedMessage.getContent(offsetInMessage, size); + } + public StoreFuture flushToStore() { return _storedMessage.flushToStore(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java index 9afed49922..98484db264 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockQueueEntry; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; @@ -44,7 +44,7 @@ public class AutoCommitTransactionTest extends QpidTestCase { private ServerTransaction _transaction = null; // Class under test - private TransactionLog _transactionLog; + private MessageStore _transactionLog; private AMQQueue _queue; private List<AMQQueue> _queues; private Collection<QueueEntry> _queueEntries; @@ -137,7 +137,7 @@ public class AutoCommitTransactionTest extends QpidTestCase _message = createTestMessage(false); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action); + _transaction.enqueue(_queues, _message, _action, 0L); assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -157,7 +157,7 @@ public class AutoCommitTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action); + _transaction.enqueue(_queues, _message, _action, 0L); assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -175,7 +175,7 @@ public class AutoCommitTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, true, false, true}); - _transaction.enqueue(_queues, _message, _action); + _transaction.enqueue(_queues, _message, _action, 0L); assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); @@ -198,7 +198,7 @@ public class AutoCommitTransactionTest extends QpidTestCase try { - _transaction.enqueue(_queues, _message, _action); + _transaction.enqueue(_queues, _message, _action, 0L); fail("Exception not thrown"); } catch (RuntimeException re) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index e81fd8e3f1..484beb8fb4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockQueueEntry; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; @@ -51,7 +51,7 @@ public class LocalTransactionTest extends QpidTestCase private MockAction _action1; private MockAction _action2; private MockStoreTransaction _storeTransaction; - private TransactionLog _transactionLog; + private MessageStore _transactionLog; @Override @@ -140,7 +140,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(false); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action1); + _transaction.enqueue(_queues, _message, _action1, 0L); assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -156,7 +156,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action1); + _transaction.enqueue(_queues, _message, _action1, 0L); assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -173,7 +173,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, true, false, true}); - _transaction.enqueue(_queues, _message, _action1); + _transaction.enqueue(_queues, _message, _action1, 0L); assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); @@ -196,7 +196,7 @@ public class LocalTransactionTest extends QpidTestCase try { - _transaction.enqueue(_queues, _message, _action1); + _transaction.enqueue(_queues, _message, _action1, 0L); fail("Exception not thrown"); } catch (RuntimeException re) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 422105e410..f3d71c6dea 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.StoredMessage; /** * Mock Server Message allowing its persistent flag to be controlled from test. @@ -81,6 +82,11 @@ class MockServerMessage implements ServerMessage throw new NotImplementedException(); } + public StoredMessage getStoredMessage() + { + throw new NotImplementedException(); + } + public long getExpiration() { throw new NotImplementedException(); @@ -91,12 +97,18 @@ class MockServerMessage implements ServerMessage throw new NotImplementedException(); } + + public ByteBuffer getContent(int offset, int size) + { + throw new NotImplementedException(); + } + public long getArrivalTime() { throw new NotImplementedException(); } - public Long getMessageNumber() + public long getMessageNumber() { return 0L; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index ff372532ac..bf8fda307a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -24,11 +24,11 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.NotImplementedException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.TransactionLog.StoreFuture; -import org.apache.qpid.server.store.TransactionLog.Transaction; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.*; +import org.apache.qpid.server.store.MessageStore.StoreFuture; +import org.apache.qpid.server.store.MessageStore.Transaction; /** * Mock implementation of a (Store) Transaction allow its state to be observed. @@ -61,7 +61,7 @@ class MockStoreTransaction implements Transaction return _state; } - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { if (_throwExceptionOnQueueOp) { @@ -82,7 +82,7 @@ class MockStoreTransaction implements Transaction return _numberOfEnqueuedMessages; } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { if (_throwExceptionOnQueueOp) { @@ -107,10 +107,33 @@ class MockStoreTransaction implements Transaction _state = TransactionState.ABORTED; } - public static TransactionLog createTestTransactionLog(final MockStoreTransaction storeTransaction) + public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction) { - return new TransactionLog() + return new MessageStore() { + public void configureMessageStore(final String name, + final MessageStoreRecoveryHandler recoveryHandler, + final Configuration config, + final LogSubject logSubject) throws Exception + { + //TODO. + } + + public void close() throws Exception + { + //TODO. + } + + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) + { + return null; //TODO. + } + + public boolean isPersistent() + { + return false; //TODO. + } + public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, Configuration storeConfiguration, LogSubject logSubject) throws Exception { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 7aa314bf22..153371c8d9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -41,7 +41,6 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TransactionLog; public class MockVirtualHost implements VirtualHost { @@ -159,10 +158,6 @@ public class MockVirtualHost implements VirtualHost return null; } - public TransactionLog getTransactionLog() - { - return null; - } public void removeBrokerConnection(BrokerLink brokerLink) { |
