diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2011-12-15 13:54:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2011-12-15 13:54:36 +0000 |
| commit | 45066f17e52eb44dbb9c8f0250200d4ef974b54c (patch) | |
| tree | 54489ee4f3c48f8e055304f905651484f5a45926 /qpid/java | |
| parent | 759e158c316c0ace2e166c0aa9cf0ab76352cfd5 (diff) | |
| download | qpid-python-45066f17e52eb44dbb9c8f0250200d4ef974b54c.tar.gz | |
QPID-3687 : Improve Java Broker performance
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1214760 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
18 files changed, 800 insertions, 210 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java index fda8cd0eb0..a0c2e9f977 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java @@ -45,6 +45,15 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl } + public long getBytesCreditLimit() + { + return _bytesCreditLimit; + } + + public long getMessageCreditLimit() + { + return _messageCreditLimit; + } public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index b960ce8608..2a277848ed 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -47,7 +48,6 @@ import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; @@ -87,16 +87,20 @@ import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.io.IoSender; public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig { @@ -139,7 +143,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); - + private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); private FieldTable _clientProperties; private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); @@ -173,6 +177,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private NetworkConnection _network; private Sender<ByteBuffer> _sender; + private volatile boolean _deferFlush; + private long _lastReceivedTime; + public ManagedObject getManagedObject() { return _managedObject; @@ -240,14 +247,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return _closing.get(); } + public synchronized void flushBatched() + { + _sender.flush(); + } + + + public ClientDeliveryMethod createDeliveryMethod(int channelId) + { + return new WriteDeliverMethod(channelId); + } + public void received(final ByteBuffer msg) { - _lastIoTime = System.currentTimeMillis(); + final long arrivalTime = System.currentTimeMillis(); + _lastReceivedTime = arrivalTime; + _lastIoTime = arrivalTime; try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - for (AMQDataBlock dataBlock : dataBlocks) + final int len = dataBlocks.size(); + for (int i = 0; i < len; i++) { + AMQDataBlock dataBlock = dataBlocks.get(i); try { dataBlockReceived(dataBlock); @@ -347,7 +369,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } } - private void protocolInitiationReceived(ProtocolInitiation pi) + private synchronized void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); @@ -524,12 +546,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr */ public synchronized void writeFrame(AMQDataBlock frame) { - _lastSent = frame; + final ByteBuffer buf = asByteBuffer(frame); - _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); _sender.send(buf); - _sender.flush(); + _lastIoTime = System.currentTimeMillis(); + if(!_deferFlush) + { + _sender.flush(); + } } public AMQShortString getContextKey() @@ -918,7 +943,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private void setProtocolVersion(ProtocolVersion pv) { _protocolVersion = pv; - + _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion); } @@ -1023,7 +1048,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public MethodRegistry getMethodRegistry() { - return MethodRegistry.getMethodRegistry(getProtocolVersion()); + return _methodRegistry; } public MethodDispatcher getMethodDispatcher() @@ -1052,7 +1077,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr // Nothing } - public void writerIdle() + public synchronized void writerIdle() { _sender.send(asByteBuffer(HeartbeatBody.FRAME)); } @@ -1109,6 +1134,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return _lastIoTime; } + public long getLastReceivedTime() + { + return _lastReceivedTime; + } + public ProtocolSessionIdentifier getSessionIdentifier() { return _sessionIdentifier; @@ -1402,9 +1432,215 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return true; } + public void setDeferFlush(boolean deferFlush) + { + _deferFlush = deferFlush; + } + + + @Override public String getUserName() { return getAuthorizedPrincipal().getName(); } + + private static class ByteBufferOutputStream extends OutputStream + { + + + private final ByteBuffer _buf; + + public ByteBufferOutputStream(ByteBuffer buf) + { + _buf = buf; + } + + @Override + public void write(int b) throws IOException + { + _buf.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + _buf.put(b, off, len); + } + } + + public final class WriteDeliverMethod + implements ClientDeliveryMethod + { + private final int _channelId; + + public WriteDeliverMethod(int channelId) + { + _channelId = channelId; + } + + public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) + throws AMQException + { + registerMessageDelivered(entry.getMessage().getSize()); + _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, sub.getConsumerTag()); + entry.incrementDeliveryCount(); + } + + } + + private static class BytesDataOutput implements DataOutput + { + int _pos = 0; + byte[] _buf; + + public BytesDataOutput(byte[] buf) + { + _buf = buf; + } + + public void setBuffer(byte[] buf) + { + _buf = buf; + _pos = 0; + } + + public void reset() + { + _pos = 0; + } + + public int length() + { + return _pos; + } + + public void write(int b) + { + _buf[_pos++] = (byte) b; + } + + public void write(byte[] b) + { + System.arraycopy(b, 0, _buf, _pos, b.length); + _pos+=b.length; + } + + + public void write(byte[] b, int off, int len) + { + System.arraycopy(b, off, _buf, _pos, len); + _pos+=len; + + } + + public void writeBoolean(boolean v) + { + _buf[_pos++] = v ? (byte) 1 : (byte) 0; + } + + public void writeByte(int v) + { + _buf[_pos++] = (byte) v; + } + + public void writeShort(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeChar(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeInt(int v) + { + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeLong(long v) + { + _buf[_pos++] = (byte) (v >>> 56); + _buf[_pos++] = (byte) (v >>> 48); + _buf[_pos++] = (byte) (v >>> 40); + _buf[_pos++] = (byte) (v >>> 32); + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte)v; + } + + public void writeFloat(float v) + { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) + { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + _buf[_pos++] = ((byte)s.charAt(i)); + } + } + + public void writeChars(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + int v = s.charAt(i); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + } + + public void writeUTF(String s) + { + int strlen = s.length(); + + int pos = _pos; + _pos+=2; + + + for (int i = 0; i < strlen; i++) + { + int c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) + { + c = s.charAt(i); + _buf[_pos++] = (byte) c; + + } + else if (c > 0x07FF) + { + _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); + } + else + { + _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); + } + } + + int len = _pos - (pos + 2); + + _buf[pos++] = (byte) (len >>> 8); + _buf[pos] = (byte) len; + } + + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index c1b5b02f8f..dfba10750c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.List; @@ -49,6 +50,14 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth boolean isClosing(); + void flushBatched(); + + void setDeferFlush(boolean defer); + + ClientDeliveryMethod createDeliveryMethod(int channelId); + + long getLastReceivedTime(); + public static final class ProtocolSessionIdentifier { private final Object _sessionIdentifier; @@ -77,15 +86,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth } /** - * Called when a protocol data block is received - * - * @param message the data block that has been received - * - * @throws Exception if processing the datablock fails - */ - void dataBlockReceived(AMQDataBlock message) throws Exception; - - /** * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC * 6). * @@ -234,4 +234,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth List<AMQChannel> getChannels(); void mgmtCloseChannel(int channelId); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index 48a8a1bf42..5d4b8c603b 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -100,6 +100,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return _network.getLocalAddress(); } + public void received(final ByteBuffer buf) + { + super.received(buf); + _connection.receivedComplete(); + } + public long getReadBytes() { return _readBytes; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index 7e1d57e205..5270f9f740 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -27,6 +27,11 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.queue.QueueRunner; import org.apache.qpid.server.queue.SimpleAMQQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + /** * QueueRunners are Runnables used to process a queue when requiring * asynchronous message delivery to subscriptions, which is necessary @@ -37,33 +42,64 @@ public class QueueRunner implements ReadWriteRunnable { private static final Logger _logger = Logger.getLogger(QueueRunner.class); - private final String _name; private final SimpleAMQQueue _queue; - public QueueRunner(SimpleAMQQueue queue, long count) + private static int IDLE = 0; + private static int SCHEDULED = 1; + private static int RUNNING = 2; + + + private final AtomicInteger _scheduled = new AtomicInteger(IDLE); + + private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES; + private final AtomicBoolean _stateChange = new AtomicBoolean(); + + private final AtomicLong _lastRunAgain = new AtomicLong(); + private final AtomicLong _lastRunTime = new AtomicLong(); + + private long _runs; + private long _continues; + + public QueueRunner(SimpleAMQQueue queue) { _queue = queue; - _name = "QueueRunner-" + count + "-" + queue.getLogActor(); } + private int trouble = 0; + public void run() { - String originalName = Thread.currentThread().getName(); - try + if(_scheduled.compareAndSet(SCHEDULED,RUNNING)) { - Thread.currentThread().setName(_name); - CurrentActor.set(_queue.getLogActor()); + long runAgain = Long.MIN_VALUE; + _stateChange.set(false); + try + { + CurrentActor.set(_queue.getLogActor()); + + runAgain = _queue.processQueue(this); + } + catch (AMQException e) + { + _logger.error("Exception during asynchronous delivery by " + toString(), e); + } + finally + { + CurrentActor.remove(); + } + _scheduled.compareAndSet(RUNNING, IDLE); + long stateChangeCount = _queue.getStateChangeCount(); + _lastRunAgain.set(runAgain); + _lastRunTime.set(System.nanoTime()); + if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false)) + { + _continues++; + if(_scheduled.compareAndSet(IDLE, SCHEDULED)) + { + _queue.execute(this); + } + } - _queue.processQueue(this); - } - catch (AMQException e) - { - _logger.error("Exception during asynchronous delivery by " + _name, e); - } - finally - { - CurrentActor.remove(); - Thread.currentThread().setName(originalName); } } @@ -79,6 +115,21 @@ public class QueueRunner implements ReadWriteRunnable public String toString() { - return _name; + return "QueueRunner-" + _queue.getLogActor(); } -}
\ No newline at end of file + + public void execute(Executor executor) + { + _stateChange.set(true); + if(_scheduled.compareAndSet(IDLE, SCHEDULED)) + { + executor.execute(this); + } + } + + public boolean isIdle() + { + return _scheduled.get() == IDLE; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 7717c8ebfc..cc93cbc2f1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -155,11 +155,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); - static final int MAX_ASYNC_DELIVERIES = 10; + static final int MAX_ASYNC_DELIVERIES = 80; private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); - private AtomicReference<Runnable> _asynchronousRunner = new AtomicReference<Runnable>(null); + private final Executor _asyncDelivery; private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); @@ -584,33 +584,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException { + incrementTxnEnqueueStats(message); incrementQueueCount(); incrementQueueSize(message); + _totalMessagesReceived.incrementAndGet(); QueueEntry entry; - Subscription exclusiveSub = _exclusiveSubscriber; + final Subscription exclusiveSub = _exclusiveSubscriber; + entry = _entries.add(message); - if (exclusiveSub != null) + if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) { - exclusiveSub.getSendLock(); - - try - { - entry = _entries.add(message); - - deliverToSubscription(exclusiveSub, entry); - } - finally - { - exclusiveSub.releaseSendLock(); - } - } - else - { - entry = _entries.add(message); /* iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message @@ -666,7 +653,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { checkSubscriptionsNotAheadOfDelivery(entry); - deliverAsync(); + if (exclusiveSub != null) + { + deliverAsync(exclusiveSub); + } + else + { + deliverAsync(); + } } if(_managedObject != null) @@ -685,30 +679,32 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { - sub.getSendLock(); - try + if(sub.trySendLock()) { - if (subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended()) + try { - if (!sub.wouldSuspend(entry)) + if (subscriptionReadyAndHasInterest(sub, entry) + && !sub.isSuspended()) { - if (sub.acquires() && !entry.acquire(sub)) - { - // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription - sub.restoreCredit(entry); - } - else + if (!sub.wouldSuspend(entry)) { - deliverMessage(sub, entry); + if (sub.acquires() && !entry.acquire(sub)) + { + // restore credit here that would have been taken away by wouldSuspend since we didn't manage + // to acquire the entry for this subscription + sub.restoreCredit(entry); + } + else + { + deliverMessage(sub, entry, false); + } } } } - } - finally - { - sub.releaseSendLock(); + finally + { + sub.releaseSendLock(); + } } } @@ -752,7 +748,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _byteTxnDequeues.addAndGet(entry.getSize()); } - private void deliverMessage(final Subscription sub, final QueueEntry entry) + private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch) throws AMQException { setLastSeenEntry(sub, entry); @@ -760,7 +756,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _deliveredMessages.incrementAndGet(); incrementUnackedMsgCount(); - sub.send(entry); + sub.send(entry, batch); } private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException @@ -866,7 +862,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (!subscription.isClosed()) { - deliverMessage(subscription, entry); + deliverMessage(subscription, entry, false); return true; } else @@ -1008,6 +1004,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _exclusiveSubscriber = exclusiveSubscriber; } + long getStateChangeCount() + { + return _stateChangeCount.get(); + } + + public static interface QueueEntryFilter { public boolean accept(QueueEntry entry); @@ -1308,7 +1310,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntryIterator queueListIterator = _entries.iterator(); long count = 0; - ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); while (queueListIterator.advance()) { @@ -1331,7 +1333,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private void dequeueEntry(final QueueEntry node) { - ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog()); + ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore()); dequeueEntry(node, txn); } @@ -1408,7 +1410,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } }); - ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); if(_alternateExchange != null) { @@ -1577,26 +1579,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + private QueueRunner _queueRunner = new QueueRunner(this); public void deliverAsync() { - QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); + _stateChangeCount.incrementAndGet(); + + _queueRunner.execute(_asyncDelivery); - if (_asynchronousRunner.compareAndSet(null, runner)) - { - _asyncDelivery.execute(runner); - } } public void deliverAsync(Subscription sub) { - SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER); - if(flusher == null) + //_stateChangeCount.incrementAndGet(); + if(_exclusiveSubscriber == null) { - flusher = new SubFlushRunner(sub); - sub.set(SUB_FLUSH_RUNNER, flusher); + deliverAsync(); } - _asyncDelivery.execute(flusher); + else + { + SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER); + if(flusher == null) + { + flusher = new SubFlushRunner(sub); + sub.set(SUB_FLUSH_RUNNER, flusher); + } + flusher.execute(_asyncDelivery); + } + } public void flushSubscription(Subscription sub) throws AMQException @@ -1612,31 +1622,56 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean flushSubscription(Subscription sub, long iterations) throws AMQException { boolean atTail = false; + final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES; - while (!sub.isSuspended() && !atTail && iterations != 0) + try { - try + if(keepSendLockHeld) { sub.getSendLock(); - atTail = attemptDelivery(sub); - if (atTail && sub.isAutoClose()) + } + while (!sub.isSuspended() && !atTail && iterations != 0) + { + try { - unregisterSubscription(sub); + if(!keepSendLockHeld) + { + sub.getSendLock(); + } - sub.confirmAutoClose(); + atTail = attemptDelivery(sub, true); + if (atTail && !sub.isSuspended() && sub.isAutoClose()) + { + unregisterSubscription(sub); + + sub.confirmAutoClose(); + } + else if (!atTail) + { + iterations--; + } } - else if (!atTail) + finally { - iterations--; + if(!keepSendLockHeld) + { + sub.releaseSendLock(); + } } } - finally + } + finally + { + if(keepSendLockHeld) { sub.releaseSendLock(); } + sub.flushBatched(); + } + // if there's (potentially) more than one subscription the others will potentially not have been advanced to the // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc // which would give us memory "leak". @@ -1653,11 +1688,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener * * Looks up the next node for the subscription and attempts to deliver it. * + * * @param sub + * @param batch * @return true if we have completed all possible deliveries for this sub. * @throws AMQException */ - private boolean attemptDelivery(Subscription sub) throws AMQException + private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException { boolean atTail = false; @@ -1681,7 +1718,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else { - deliverMessage(sub, node); + deliverMessage(sub, node, batch); } } @@ -1785,23 +1822,26 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener * @param runner the Runner to schedule * @throws AMQException */ - public void processQueue(QueueRunner runner) throws AMQException + public long processQueue(QueueRunner runner) throws AMQException { - long stateChangeCount; + long stateChangeCount = Long.MIN_VALUE; long previousStateChangeCount = Long.MIN_VALUE; + long rVal = Long.MIN_VALUE; boolean deliveryIncomplete = true; boolean lastLoop = false; int iterations = MAX_ASYNC_DELIVERIES; - _asynchronousRunner.compareAndSet(runner, null); + final int numSubs = _subscriptionList.size(); + + final int perSub = Math.max(iterations / Math.max(numSubs,1), 1); // For every message enqueue/requeue the we fire deliveryAsync() which // increases _stateChangeCount. If _sCC changes whilst we are in our loop // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) // then we will continue to run for a maximum of iterations. // So whilst delivery/rejection is going on a processQueue thread will be running - while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) + while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete)) { // we want to have one extra loop after every subscription has reached the point where it cannot move // further, just in case the advance of one subscription in the last loop allows a different subscription to @@ -1812,6 +1852,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //further asynchronous delivery is required since the //previous loop. keep going if iteration slicing allows. lastLoop = false; + rVal = stateChangeCount; } previousStateChangeCount = stateChangeCount; @@ -1824,33 +1865,47 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { Subscription sub = subscriptionIter.getNode().getSubscription(); sub.getSendLock(); - try - { - //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub); - if (subscriptionDone) + + try { - //close autoClose subscriptions if we are not currently intent on continuing - if (lastLoop && sub.isAutoClose()) + for(int i = 0 ; i < perSub; i++) { - unregisterSubscription(sub); + //attempt delivery. returns true if no further delivery currently possible to this sub + subscriptionDone = attemptDelivery(sub, true); + if (subscriptionDone) + { + sub.flushBatched(); + //close autoClose subscriptions if we are not currently intent on continuing + if (lastLoop && !sub.isSuspended() && sub.isAutoClose()) + { + + unregisterSubscription(sub); + + sub.confirmAutoClose(); + } + break; + } + else + { + //this subscription can accept additional deliveries, so we must + //keep going after this (if iteration slicing allows it) + allSubscriptionsDone = false; + lastLoop = false; + if(--iterations == 0) + { + sub.flushBatched(); + break; + } + } - sub.confirmAutoClose(); } + + sub.flushBatched(); } - else + finally { - //this subscription can accept additional deliveries, so we must - //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; - lastLoop = false; - iterations--; + sub.releaseSendLock(); } - } - finally - { - sub.releaseSendLock(); - } } if(allSubscriptionsDone && lastLoop) @@ -1876,24 +1931,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener deliveryIncomplete = true; } - _asynchronousRunner.set(null); } // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). - if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) + if (iterations == 0) { if (_logger.isDebugEnabled()) { _logger.debug("Rescheduling runner:" + runner); } - _asyncDelivery.execute(runner); + return 0L; } + return rVal; + } public void checkMessageStatus() throws AMQException { - QueueEntryIterator queueListIterator = _entries.iterator(); while (queueListIterator.advance()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index 46c1a6af9a..fbef23dca1 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -27,6 +27,10 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + class SubFlushRunner implements ReadWriteRunnable { @@ -34,29 +38,33 @@ class SubFlushRunner implements ReadWriteRunnable private final Subscription _sub; - private final String _name; + + private static int IDLE = 0; + private static int SCHEDULED = 1; + private static int RUNNING = 2; + + + private final AtomicInteger _scheduled = new AtomicInteger(IDLE); + + private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES; + private final AtomicBoolean _stateChange = new AtomicBoolean(); public SubFlushRunner(Subscription sub) { _sub = sub; - _name = "SubFlushRunner-"+_sub; } public void run() { - - String originalName = Thread.currentThread().getName(); - try + if(_scheduled.compareAndSet(SCHEDULED, RUNNING)) { - Thread.currentThread().setName(_name); - boolean complete = false; + _stateChange.set(false); try { CurrentActor.set(_sub.getLogActor()); complete = getQueue().flushSubscription(_sub, ITERATIONS); - } catch (AMQException e) { @@ -66,17 +74,15 @@ class SubFlushRunner implements ReadWriteRunnable { CurrentActor.remove(); } - if (!complete && !_sub.isSuspended()) + _scheduled.compareAndSet(RUNNING, IDLE); + if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended()) { - getQueue().execute(this); + if(_scheduled.compareAndSet(IDLE,SCHEDULED)) + { + getQueue().execute(this); + } } - } - finally - { - Thread.currentThread().setName(originalName); - } - } private SimpleAMQQueue getQueue() @@ -93,4 +99,18 @@ class SubFlushRunner implements ReadWriteRunnable { return true; } + + public String toString() + { + return "SubFlushRunner-" + _sub.getLogActor(); + } + + public void execute(Executor executor) + { + _stateChange.set(true); + if(_scheduled.compareAndSet(IDLE,SCHEDULED)) + { + executor.execute(this); + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 3a950c2f4f..e8aa5158fc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -73,13 +73,18 @@ public interface Subscription void close(); - void send(QueueEntry msg) throws AMQException; + void send(QueueEntry entry, boolean batch) throws AMQException; + + void flushBatched(); void queueDeleted(AMQQueue queue); boolean wouldSuspend(QueueEntry msg); + boolean trySendLock(); + + void getSendLock(); void releaseSendLock(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 8b11a5817a..b84971d8c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -119,11 +119,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * - * @param msg The message to send + * + * @param entry + * @param batch * @throws AMQException */ @Override - public void send(QueueEntry msg) throws AMQException + public void send(QueueEntry entry, boolean batch) throws AMQException { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -131,7 +133,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(msg, deliveryTag); + sendToClient(entry, deliveryTag); } } @@ -173,11 +175,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * + * * @param entry The message to send + * @param batch * @throws AMQException */ @Override - public void send(QueueEntry entry) throws AMQException + public void send(QueueEntry entry, boolean batch) throws AMQException { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -193,6 +197,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage synchronized (getChannel()) { + getChannel().getProtocolSession().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); sendToClient(entry, deliveryTag); @@ -263,11 +268,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * + * * @param entry The message to send + * @param batch * @throws AMQException */ @Override - public void send(QueueEntry entry) throws AMQException + public void send(QueueEntry entry, boolean batch) throws AMQException { // if we do not need to wait for client acknowledgements @@ -282,6 +289,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage synchronized (getChannel()) { + getChannel().getProtocolSession().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); @@ -441,10 +449,12 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * - * @param msg The message to send + * + * @param entry + * @param batch * @throws AMQException */ - abstract public void send(QueueEntry msg) throws AMQException; + abstract public void send(QueueEntry entry, boolean batch) throws AMQException; public boolean isSuspended() @@ -578,6 +588,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage()); } + public boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } + public void getSendLock() { _stateChangeLock.lock(); @@ -814,4 +829,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage { return _createTime; } + + public void flushBatched() + { + _channel.getProtocolSession().setDeferFlush(false); + + _channel.getProtocolSession().flushBatched(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 99f2d6cbc2..6c18b2e229 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -61,6 +61,7 @@ import org.apache.qpid.transport.MessageFlowMode; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.Option; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.Struct; import org.apache.qpid.framing.AMQShortString; @@ -91,6 +92,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this); + private static final Option[] BATCHED = new Option[] { Option.BATCH }; + private final Lock _stateChangeLock = new ReentrantLock(); private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); @@ -127,6 +130,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final long _createTime = System.currentTimeMillis(); private final AtomicLong _deliveredCount = new AtomicLong(0); private final Map<String, Object> _arguments; + private int _deferredMessageCredit; + private long _deferredSizeCredit; public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode, @@ -137,6 +142,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { _subscriptionID = subscriptionId; _session = session; + _postIdSettingAction = new AddMessageDispositionListenerAction(session); _destination = destination; _acceptMode = acceptMode; _acquireMode = acquireMode; @@ -325,10 +331,26 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } - private class AddMessageDispositionListnerAction implements Runnable + public static class AddMessageDispositionListenerAction implements Runnable { - public MessageTransfer _xfr; - public ServerSession.MessageDispositionChangeListener _action; + private MessageTransfer _xfr; + private ServerSession.MessageDispositionChangeListener _action; + private ServerSession _session; + + public AddMessageDispositionListenerAction(ServerSession session) + { + _session = session; + } + + public void setXfr(MessageTransfer xfr) + { + _xfr = xfr; + } + + public void setAction(ServerSession.MessageDispositionChangeListener action) + { + _action = action; + } public void run() { @@ -339,9 +361,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } } - private final AddMessageDispositionListnerAction _postIdSettingAction = new AddMessageDispositionListnerAction(); + private final AddMessageDispositionListenerAction _postIdSettingAction; - public void send(final QueueEntry entry) throws AMQException + public void send(final QueueEntry entry, boolean batch) throws AMQException { ServerMessage serverMsg = entry.getMessage(); @@ -586,26 +608,27 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { public void onComplete(Method method) { - restoreCredit(entry); + deferredAddCredit(1, entry.getSize()); } }); } - _postIdSettingAction._xfr = xfr; + _postIdSettingAction.setXfr(xfr); if(_acceptMode == MessageAcceptMode.EXPLICIT) { - _postIdSettingAction._action = new ExplicitAcceptDispositionChangeListener(entry, this); + _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this)); } else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED) { - _postIdSettingAction._action = new ImplicitAcceptDispositionChangeListener(entry, this); + _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this)); } else { - _postIdSettingAction._action = null; + _postIdSettingAction.setAction(null); } + _session.sendMessage(xfr, _postIdSettingAction); entry.incrementDeliveryCount(); _deliveredCount.incrementAndGet(); @@ -723,6 +746,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return !_creditManager.useCreditForMessage(entry.getMessage().getSize()); } + public boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } + public void getSendLock() { _stateChangeLock.lock(); @@ -788,6 +816,28 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _properties.get(key); } + private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) + { + _deferredMessageCredit += deferredMessageCredit; + _deferredSizeCredit += deferredSizeCredit; + + } + + public void flushCreditState() + { + flushCreditState(false); + } + public void flushCreditState(boolean strict) + { + if(strict || !isSuspended() || _deferredMessageCredit >= 200 + || !(_creditManager instanceof WindowCreditManager) + || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 ) + { + _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit); + _deferredMessageCredit = 0; + _deferredSizeCredit = 0l; + } + } public FlowCreditManager_0_10 getCreditManager() { @@ -890,6 +940,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void flush() throws AMQException { + flushCreditState(true); _queue.flushSubscription(this); stop(); } @@ -1029,4 +1080,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return (LogSubject) this; } + + public void flushBatched() + { + _session.getConnection().flush(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 922531a271..00f0c9f0f1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -446,6 +446,14 @@ public class ServerConnection extends Connection implements Managable, AMQConnec } } + public void receivedComplete() + { + for (Session ssn : getChannels()) + { + ((ServerSession)ssn).flushCreditState(); + } + } + @Override public ManagedObject getManagedObject() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index dfad097dbc..1ce7b806d8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -697,6 +697,15 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } + public void flushCreditState() + { + final Collection<Subscription_0_10> subscriptions = getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subscriptions) + { + subscription_0_10.flushCreditState(false); + } + } + public int getUnacknowledgedMessageCount() { return _messageDispositionListenerMap.size(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 5a411c6807..ab41158548 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -43,6 +43,8 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TestNetworkConnection; @@ -120,6 +122,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { } + public ClientDeliveryMethod createDeliveryMethod(int channelId) + { + return new InternalWriteDeliverMethod(channelId); + } + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) { } @@ -213,4 +220,42 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr ((AMQChannel)session).getProtocolSession().closeSession(); } + + private class InternalWriteDeliverMethod implements ClientDeliveryMethod + { + private int _channelId; + + public InternalWriteDeliverMethod(int channelId) + { + _channelId = channelId; + } + + + @Override + public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException + { + _deliveryCount.incrementAndGet(); + + synchronized (_channelDelivers) + { + Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId); + + if (consumers == null) + { + consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); + _channelDelivers.put(_channelId, consumers); + } + + LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getConsumerTag()); + + if (consumerDelivers == null) + { + consumerDelivers = new LinkedList<DeliveryPair>(); + consumers.put(sub.getConsumerTag(), consumerDelivers); + } + + consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage())); + } + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 0f5374b3e5..5d559c9d0d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -30,6 +30,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.util.InternalBrokerBaseCase; @@ -143,16 +144,19 @@ public class AckTest extends InternalBrokerBaseCase qs.add(_queue); msg.enqueue(qs); MessageMetaData mmd = msg.headersReceived(); - msg.setStoredMessage(_messageStore.addMessage(mmd)); + final StoredMessage storedMessage = _messageStore.addMessage(mmd); + msg.setStoredMessage(storedMessage); + final AMQMessage message = new AMQMessage(storedMessage); if(msg.allContentReceived()) { ServerTransaction txn = new AutoCommitTransaction(_messageStore); - txn.enqueue(_queue, msg, new ServerTransaction.Action() { + txn.enqueue(_queue, message, new ServerTransaction.Action() { public void postCommit() { try { - _queue.enqueue(new AMQMessage(msg.getStoredMessage())); + + _queue.enqueue(message); } catch (AMQException e) { @@ -170,6 +174,15 @@ public class AckTest extends InternalBrokerBaseCase // we manually send the message to the subscription //_subscription.send(new QueueEntry(_queue,msg), _queue); } + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + e.printStackTrace(); //TODO. + } + } /** @@ -181,9 +194,8 @@ public class AckTest extends InternalBrokerBaseCase _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount, true); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertEquals("",msgCount,map.size()); + assertEquals("Unextpected size for unacknowledge message map",msgCount,map.size()); Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -206,7 +218,6 @@ public class AckTest extends InternalBrokerBaseCase _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); assertTrue(_messageStore.getMessageCount() == 0); @@ -243,7 +254,7 @@ public class AckTest extends InternalBrokerBaseCase _channel.acknowledgeMessage(5, false); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == msgCount - 1); + assertEquals("Map not expected size",msgCount - 1,map.size()); Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -270,6 +281,8 @@ public class AckTest extends InternalBrokerBaseCase final int msgCount = 10; publishMessages(msgCount); + + _channel.acknowledgeMessage(5, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 5); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index e4ed232f13..6c7094cac0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -44,7 +44,6 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; @@ -189,6 +188,13 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); + try + { + Thread.sleep(2000L); + } + catch(InterruptedException e) + { + } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry); @@ -430,6 +436,13 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue @@ -723,7 +736,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); // call processQueue to deliver the messages - testQueue.processQueue(new QueueRunner(testQueue, 1) + testQueue.processQueue(new QueueRunner(testQueue) { @Override public void run() @@ -826,7 +839,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message is not copied as part of invocation of - * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)} + * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)} */ public void testCopyMessagesWithDequeuedEntry() { @@ -844,7 +857,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase SimpleAMQQueue queue = createQueue(anotherQueueName); // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); // copy messages into another queue _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); @@ -876,7 +889,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message is not moved as part of invocation of - * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)} + * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)} */ public void testMovedMessagesWithDequeuedEntry() { @@ -894,7 +907,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase SimpleAMQQueue queue = createQueue(anotherQueueName); // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); // move messages into another queue _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); @@ -927,7 +940,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that messages in given range including dequeued one are deleted * from the queue on invocation of - * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)} + * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long)} */ public void testRemoveMessagesFromQueueWithDequeuedEntry() { @@ -954,7 +967,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message on the top is not accounted and next message * is deleted from the queue on invocation of - * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)} + * {@link SimpleAMQQueue#deleteMessageFromTop()} */ public void testDeleteMessageFromTopWithDequeuedEntryOnTop() { @@ -983,7 +996,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that all messages including dequeued one are deleted from the queue - * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)} + * on invocation of {@link SimpleAMQQueue#clearQueue()} */ public void testClearQueueWithDequeuedEntry() { @@ -1049,10 +1062,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase { /** * Send a message and decrement latch + * @param entry + * @param batch */ - public void send(QueueEntry msg) throws AMQException + public void send(QueueEntry entry, boolean batch) throws AMQException { - super.send(msg); + super.send(entry, batch); latch.countDown(); } }; @@ -1063,7 +1078,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase testQueue.registerSubscription(subscription, false); // process queue - testQueue.processQueue(new QueueRunner(testQueue, 1) + testQueue.processQueue(new QueueRunner(testQueue) { public void run() { @@ -1127,6 +1142,19 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase { return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0); } + + @Override + public boolean acquire(Subscription sub) + { + if(((AMQMessage) message).getMessageId().longValue() % 2 == 0) + { + return false; + } + else + { + return super.acquire(sub); + } + } }; } }; @@ -1243,6 +1271,14 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase fail("Failure to put message on queue:" + e.getMessage()); } } + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 1efe1028db..c5a7ddd691 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -125,6 +125,12 @@ public class MockSubscription implements Subscription return queue; } + public boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } + + public void getSendLock() { _stateChangeLock.lock(); @@ -216,7 +222,7 @@ public class MockSubscription implements Subscription { } - public void send(QueueEntry entry) throws AMQException + public void send(QueueEntry entry, boolean batch) throws AMQException { if (messages.contains(entry)) { @@ -225,6 +231,12 @@ public class MockSubscription implements Subscription messages.add(entry); } + @Override + public void flushBatched() + { + + } + public void setQueueContext(AMQQueue.Context queueContext) { _queueContext = queueContext; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java index a5b9c618bc..45aa6a0e18 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -56,7 +56,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered"); /** Default number of messages to send before failover */ - private static final int DEFAULT_NUMBER_OF_MESSAGES = 10; + private static final int DEFAULT_NUMBER_OF_MESSAGES = 40; /** Actual number of messages to send before failover */ protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES); @@ -1157,7 +1157,6 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio { init(acknowledgeMode, false); _consumer.close(); - QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination); _connection.start(); produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); @@ -1165,6 +1164,8 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio { _producerSession.commit(); } + + QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination); return browser; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index 57ff6a4fa2..474a425b28 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -54,7 +54,14 @@ public class AMQConnectionTest extends QpidBrokerTestCase _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic")); _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue")); } - + + @Override + protected void tearDown() throws Exception + { + _connection.close(); + super.tearDown(); //To change body of overridden methods use File | Settings | File Templates. + } + protected void createConnection() throws Exception { _connection = (AMQConnection) getConnection("guest", "guest"); @@ -67,16 +74,27 @@ public class AMQConnectionTest extends QpidBrokerTestCase public void testCreateQueueSession() throws JMSException { - _queueSession = _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE); + createQueueSession(); + } + + private void createQueueSession() throws JMSException + { + _queueSession = _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE); } public void testCreateTopicSession() throws JMSException { + createTopicSession(); + } + + private void createTopicSession() throws JMSException + { _topicSession = _connection.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); } public void testTopicSessionCreateBrowser() throws JMSException { + createTopicSession(); try { _topicSession.createBrowser(_queue); @@ -94,6 +112,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase public void testTopicSessionCreateQueue() throws JMSException { + createTopicSession(); try { _topicSession.createQueue("abc"); @@ -111,6 +130,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase public void testTopicSessionCreateTemporaryQueue() throws JMSException { + createTopicSession(); try { _topicSession.createTemporaryQueue(); @@ -128,6 +148,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase public void testQueueSessionCreateTemporaryTopic() throws JMSException { + createQueueSession(); try { _queueSession.createTemporaryTopic(); @@ -145,6 +166,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase public void testQueueSessionCreateTopic() throws JMSException { + createQueueSession(); try { _queueSession.createTopic("abc"); @@ -162,6 +184,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase public void testQueueSessionDurableSubscriber() throws JMSException { + createQueueSession(); try { _queueSession.createDurableSubscriber(_topic, "abc"); @@ -179,6 +202,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase public void testQueueSessionUnsubscribe() throws JMSException { + createQueueSession(); try { _queueSession.unsubscribe("abc"); @@ -243,25 +267,6 @@ public class AMQConnectionTest extends QpidBrokerTestCase assertNotNull("Consumer B should have received the message",msg); } - public void testGetChannelID() throws Exception - { - long maxChannelID = _connection.getMaximumChannelCount(); - if (isBroker010()) - { - //Usable numbers are 0 to N-1 when using 0-10 - //and 1 to N for 0-8/0-9 - maxChannelID = maxChannelID-1; - } - for (int j = 0; j < 3; j++) - { - int i = isBroker010() ? 0 : 1; - for ( ; i <= maxChannelID; i++) - { - int id = _connection.getNextChannelID(); - assertEquals("Unexpected number on iteration "+j, i, id); - _connection.deregisterSession(id); - } - } - } + } |
