summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-27 13:37:03 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-27 13:37:03 +0000
commite96bf6ebc67cc1a0e3421b82c0ce497771882fcf (patch)
treeae2c6858aca4d6efbc37a9b7480365dc81f9bc12 /qpid/java
parent5ea9fa4baa5219c7666ccddb04703289e56cbd6f (diff)
downloadqpid-python-e96bf6ebc67cc1a0e3421b82c0ce497771882fcf.tar.gz
QPID-1635,QPID-1636,QPID-1638 : Updated QueueEntries to contain additional values from AMQMessage, _flags and expiry this allows the checking of immediate delivery and expiry on unloaded messages.
Updated nomenclature to use load/unload rather than the overloaded flow/recover. Created new FileQueueBackingStoreFactory to ensure that validates and creates initial flowToDiskLocation and creates a new BackingStore. Responsibility for FlowToDisk has been added to the QueueEntryLists. This will allow the easy unloading of the structure in the future. Inorder to do this the size,count and memory count properties had to be moved from the SimpleAMQQueue to the QueueEntryList. An Inhaler thread was created in addition to the synchronous loading of messages. This is initiated as a result of a flowed QEL dropping below the minimumMemory value. A test to ensure that the queue never exceeds its set memory usage and that the count does not go negative has been added to SimpleAMQQueueTest. The SimpleAMQQueue is responsible for deciding when a message can be unloaded after delivery takes place. The QEL cannot decide this as there is no state for a message being marked as sent to a consumer. Only Aquired and Dequeued. The unloaded message is only deleted after the QueueEntry is deleted from the QEL. This negates the need to recreated the data on disk if the message needs to be unloaded again. All files/directories relating to FtD are created as deleteOnExit files so that under clean shutdown the VM will ensure that the files are deleted. On startup the flowToDiskLocation is also purged to ensure a clean starting point. SAMQQueueThreadPoolTest was augmented to take in to account the new inhaler executor reference. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@748519 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java305
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java166
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java168
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java30
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java70
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java29
20 files changed, 643 insertions, 292 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
index 7acb683e3b..4e3b2298d1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
@@ -22,19 +22,12 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.util.FileUtils;
-import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.ConfigurationException;
import java.io.File;
import java.io.FileInputStream;
@@ -43,219 +36,142 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
public class FileQueueBackingStore implements QueueBackingStore
{
private static final Logger _log = Logger.getLogger(FileQueueBackingStore.class);
- private AtomicBoolean _closed = new AtomicBoolean(false);
private String _flowToDiskLocation;
- private static final String QUEUE_BACKING_DIR = "queueBacking";
- public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException
+ public FileQueueBackingStore(String location)
{
- setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation());
+ _flowToDiskLocation = location;
}
- private void setFlowToDisk(String vHostName, String location) throws ConfigurationException
+ public AMQMessage load(Long messageId)
{
- if (vHostName == null)
- {
- throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified");
- }
+ _log.info("Loading Message (ID:" + messageId + ")");
- if (location == null)
- {
- throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified.");
- }
+ MessageMetaData mmd;
- _flowToDiskLocation = location;
+ File handle = getFileHandle(messageId);
+ handle.deleteOnExit();
- _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName;
+ ObjectInputStream input = null;
- File root = new File(location);
- if (!root.exists())
- {
- throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath());
- }
- else
+ Exception error = null;
+ try
{
+ input = new ObjectInputStream(new FileInputStream(handle));
- if (root.isFile())
- {
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:"+
- root.getAbsolutePath());
- }
+ long arrivaltime = input.readLong();
+
+ final AMQShortString exchange = new AMQShortString(input.readUTF());
+ final AMQShortString routingKey = new AMQShortString(input.readUTF());
+ final boolean mandatory = input.readBoolean();
+ final boolean immediate = input.readBoolean();
+
+ int bodySize = input.readInt();
+ byte[] underlying = new byte[bodySize];
+
+ input.readFully(underlying, 0, bodySize);
+
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
- if(!root.canWrite())
+ ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize);
+
+ int chunkCount = input.readInt();
+
+ // There are WAY to many annonymous MPIs in the code this should be made concrete.
+ MessagePublishInfo info = new MessagePublishInfo()
{
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:"+
- root.getAbsolutePath());
- }
- }
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+ public void setExchange(AMQShortString exchange)
+ {
- File store = new File(_flowToDiskLocation);
- if (store.exists())
- {
- if (!FileUtils.delete(store, true))
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ };
+
+ mmd = new MessageMetaData(info, chb, chunkCount);
+ mmd.setArrivalTime(arrivaltime);
+
+ AMQMessage message;
+ if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2)
+ {
+ message = new PersistentAMQMessage(messageId, null);
+ }
+ else
{
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store as directory already exsits:"
- + store.getAbsolutePath());
+ message = new TransientAMQMessage(messageId);
}
- if (store.isFile())
+ message.recoverFromMessageMetaData(mmd);
+
+ for (int chunk = 0; chunk < chunkCount; chunk++)
{
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:"+
- store.getAbsolutePath());
+ int length = input.readInt();
+
+ byte[] data = new byte[length];
+
+ input.readFully(data, 0, length);
+
+ try
+ {
+ message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount));
+ }
+ catch (AMQException e)
+ {
+ //ignore as this will not occur.
+ // It is thrown by the _transactionLog method in load on PersistentAMQMessage
+ // but we have created the message with a null log and will never call that method.
+ }
}
+ return message;
}
- else
+ catch (Exception e)
{
- if (!store.getParentFile().getParentFile().canWrite())
- {
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to parent location:"+
- store.getParentFile().getParentFile().getAbsolutePath());
- }
+ error = e;
}
-
-
- _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath());
- store.deleteOnExit();
- if (!store.mkdirs())
+ finally
{
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath());
+ try
+ {
+ input.close();
+ // We can purge the message here then reflow it if required but I believe it to be cleaner to leave it
+ // on disk until it has been deleted from the queue at that point we can be sure we won't need the data
+ //handle.delete();
+ }
+ catch (IOException e)
+ {
+ _log.info("Unable to close input on message(" + messageId + ") recovery due to:" + e.getMessage());
+ }
}
- }
-
-
- public AMQMessage recover(Long messageId)
- {
- MessageMetaData mmd;
- List<ContentChunk> contentBodies = new LinkedList<ContentChunk>();
-
- File handle = getFileHandle(messageId);
- handle.deleteOnExit();
-
- ObjectInputStream input = null;
-
- Exception error = null;
- try
- {
- input = new ObjectInputStream(new FileInputStream(handle));
-
- long arrivaltime = input.readLong();
-
- final AMQShortString exchange = new AMQShortString(input.readUTF());
- final AMQShortString routingKey = new AMQShortString(input.readUTF());
- final boolean mandatory = input.readBoolean();
- final boolean immediate = input.readBoolean();
-
- int bodySize = input.readInt();
- byte[] underlying = new byte[bodySize];
-
- input.readFully(underlying, 0, bodySize);
-
- ByteBuffer buf = ByteBuffer.wrap(underlying);
-
- ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize);
-
- int chunkCount = input.readInt();
-
- // There are WAY to many annonymous MPIs in the code this should be made concrete.
- MessagePublishInfo info = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- };
-
- mmd = new MessageMetaData(info, chb, chunkCount);
- mmd.setArrivalTime(arrivaltime);
-
- AMQMessage message;
- if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2)
- {
- message = new PersistentAMQMessage(messageId, null);
- }
- else
- {
- message = new TransientAMQMessage(messageId);
- }
-
- message.recoverFromMessageMetaData(mmd);
-
- for (int chunk = 0; chunk < chunkCount; chunk++)
- {
- int length = input.readInt();
-
- byte[] data = new byte[length];
-
- input.readFully(data, 0, length);
-
- // There are WAY to many annonymous CCs in the code this should be made concrete.
- try
- {
- message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount));
- }
- catch (AMQException e)
- {
- //ignore as this will not occur.
- // It is thrown by the _transactionLog method in recover on PersistentAMQMessage
- // but we have created the message with a null log and will never call that method.
- }
- }
-
- return message;
- }
- catch (Exception e)
- {
- error = e;
- }
- finally
- {
- try
- {
- input.close();
- }
- catch (IOException e)
- {
- _log.info("Unable to close input on message("+messageId+") recovery due to:"+e.getMessage());
- }
- }
throw new UnableToRecoverMessageException(error);
}
-
- public void flow(AMQMessage message) throws UnableToFlowMessageException
+ public void unload(AMQMessage message) throws UnableToFlowMessageException
{
long messageId = message.getMessageId();
@@ -264,10 +180,18 @@ public class FileQueueBackingStore implements QueueBackingStore
//If we have written the data once then we don't need to do it again.
if (handle.exists())
{
- _log.debug("Message(" + messageId + ") already flowed to disk.");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message(ID:" + messageId + ") already unloaded.");
+ }
return;
}
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Unloading Message (ID:" + messageId + ")");
+ }
+
handle.deleteOnExit();
ObjectOutputStream writer = null;
@@ -334,7 +258,7 @@ public class FileQueueBackingStore implements QueueBackingStore
if (error != null)
{
- _log.error("Unable to flow message(" + messageId + ") to disk, restoring state.");
+ _log.error("Unable to unload message(" + messageId + ") to disk, restoring state.");
handle.delete();
throw new UnableToFlowMessageException(messageId, error);
}
@@ -358,7 +282,7 @@ public class FileQueueBackingStore implements QueueBackingStore
// grab the 8 LSB to give us 256 bins
long bin = messageId & 0xFFL;
- String bin_path =_flowToDiskLocation + File.separator + bin;
+ String bin_path = _flowToDiskLocation + File.separator + bin;
File bin_dir = new File(bin_path);
if (!bin_dir.exists())
@@ -379,7 +303,10 @@ public class FileQueueBackingStore implements QueueBackingStore
if (handle.exists())
{
- _log.debug("Message(" + messageId + ") delete flowToDisk.");
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Message(" + messageId + ") delete flowToDisk.");
+ }
if (!handle.delete())
{
throw new RuntimeException("Unable to delete flowToDisk data");
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
new file mode 100644
index 0000000000..0cfa9d6b32
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.util.FileUtils;
+
+import java.io.File;
+
+public class FileQueueBackingStoreFactory implements QueueBackingStoreFactory
+{
+ private static final Logger _log = Logger.getLogger(FileQueueBackingStoreFactory.class);
+
+ private String _flowToDiskLocation;
+ private static final String QUEUE_BACKING_DIR = "queueBacking";
+
+ public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException
+ {
+ setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation());
+ }
+
+ private void setFlowToDisk(String vHostName, String location) throws ConfigurationException
+ {
+ if (vHostName == null)
+ {
+ throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified");
+ }
+
+ if (location == null)
+ {
+ throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified.");
+ }
+
+ _flowToDiskLocation = location;
+
+ _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName;
+
+ File root = new File(location);
+ if (!root.exists())
+ {
+ throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath());
+ }
+ else
+ {
+
+ if (root.isFile())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:" +
+ root.getAbsolutePath());
+ }
+
+ if (!root.canWrite())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:" +
+ root.getAbsolutePath());
+ }
+
+ }
+
+ // if we don't mark QUEUE_BAKCING_DIR as a deleteOnExit it will remain.
+ File backingDir = new File(location + File.separator + QUEUE_BACKING_DIR);
+ if (backingDir.exists())
+ {
+ if (!FileUtils.delete(backingDir, true))
+ {
+ throw new ConfigurationException("Unable to delete existing Flow to Disk root at:"
+ + backingDir.getAbsolutePath());
+ }
+
+ if (backingDir.isFile())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk root as specified location is a file:" +
+ backingDir.getAbsolutePath());
+ }
+ }
+
+ backingDir.deleteOnExit();
+ if (!backingDir.mkdirs())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk root:" + location + File.separator + QUEUE_BACKING_DIR);
+ }
+
+
+ File store = new File(_flowToDiskLocation);
+ if (store.exists())
+ {
+ if (!FileUtils.delete(store, true))
+ {
+ throw new ConfigurationException("Unable to delete existing Flow to Disk store at:"
+ + store.getAbsolutePath());
+ }
+
+ if (store.isFile())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:" +
+ store.getAbsolutePath());
+ }
+
+ }
+
+ _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath());
+ store.deleteOnExit();
+
+ if(!store.mkdir())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath());
+ }
+ }
+
+ public QueueBackingStore createBacking(AMQQueue queue)
+ {
+ return new FileQueueBackingStore(createStore(queue.getName().toString()));
+ }
+
+ private String createStore(String name)
+ {
+ return createStore(name, 0);
+ }
+
+ private String createStore(String name, int index)
+ {
+
+ String store = _flowToDiskLocation + File.separator + name;
+ if (index > 0)
+ {
+ store += "-" + index;
+ }
+
+ //TODO ensure store is safe for the OS
+
+ File storeFile = new File(store);
+
+ if (storeFile.exists())
+ {
+ return createStore(name, index + 1);
+ }
+
+ storeFile.mkdirs();
+
+ storeFile.deleteOnExit();
+
+ return store;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
index 72ea5f2667..a4f80a44b4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
@@ -20,16 +20,18 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.log4j.Logger;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
-/**
- * This is an abstract base class to handle
- */
+/** This is an abstract base class to handle */
public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList
{
private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
@@ -43,9 +45,12 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
/** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
private long _memoryUsageMinimum = 0;
- private AtomicBoolean _flowed;
+ private volatile AtomicBoolean _flowed;
private QueueBackingStore _backingStore;
protected AMQQueue _queue;
+ private Executor _inhaler;
+ private AtomicBoolean _stopped;
+ private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null);
FlowableBaseQueueEntryList(AMQQueue queue)
{
@@ -54,15 +59,18 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
VirtualHost vhost = queue.getVirtualHost();
if (vhost != null)
{
- _backingStore = vhost.getQueueBackingStore();
+ _backingStore = vhost.getQueueBackingStoreFactory().createBacking(queue);
}
+
+ _stopped = new AtomicBoolean(false);
+ _inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
}
public void setFlowed(boolean flowed)
{
if (_flowed.get() != flowed)
{
- _log.info("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
+ _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
_flowed.set(flowed);
}
}
@@ -94,14 +102,15 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
// Don't attempt to start the inhaler/purger unless we have a minimum value specified.
if (_memoryUsageMaximum > 0)
{
- // If we've increased the max memory above what we have in memory then we can inhale more
- if (_memoryUsageMaximum > _atomicQueueInMemory.get())
+ if (_memoryUsageMinimum == 0)
{
- //TODO start inhaler
+ setMemoryUsageMinimum(_memoryUsageMaximum / 2);
}
- else // if we have now have to much memory in use we need to purge.
+
+ // if we have now have to much memory in use we need to purge.
+ if (_memoryUsageMaximum < _atomicQueueInMemory.get())
{
- //TODO start purger
+ startPurger();
}
}
}
@@ -118,19 +127,78 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
// Don't attempt to start the inhaler unless we have a minimum value specified.
if (_memoryUsageMinimum > 0)
{
- // If we've increased the minimum memory above what we have in memory then we need to inhale more
- if (_memoryUsageMinimum >= _atomicQueueInMemory.get())
+ checkAndStartLoader();
+ }
+ }
+
+ private void checkAndStartLoader()
+ {
+ // If we've increased the minimum memory above what we have in memory then we need to inhale more
+ if (_atomicQueueInMemory.get() <= _memoryUsageMinimum)
+ {
+ startInhaler();
+ }
+ }
+
+ private void startInhaler()
+ {
+ if (_flowed.get())
+ {
+ MessageInhaler inhaler = new MessageInhaler();
+
+ if (_asynchronousInhaler.compareAndSet(null, inhaler))
{
- //TODO start inhaler
+ _inhaler.execute(inhaler);
}
}
}
+ private void startPurger()
+ {
+ //TODO create purger, used when maxMemory is reduced creating over memory situation.
+ _log.warn("Requested Purger Start.. purger TBC.");
+ //_purger.execute(new MessagePurger(this));
+ }
+
public long getMemoryUsageMinimum()
{
return _memoryUsageMinimum;
}
+ public void unloadEntry(QueueEntry queueEntry)
+ {
+ try
+ {
+ queueEntry.unload();
+ _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
+ checkAndStartLoader();
+ }
+ catch (UnableToFlowMessageException e)
+ {
+ _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+ }
+ }
+
+ public void loadEntry(QueueEntry queueEntry)
+ {
+ queueEntry.load();
+ if( _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+ {
+ _log.error("Loaded to much data!:"+_atomicQueueInMemory.get()+"/"+_memoryUsageMaximum);
+ }
+ }
+
+ public void stop()
+ {
+ if (!_stopped.getAndSet(true))
+ {
+ // The SimpleAMQQueue keeps running when stopped so we should just release the services
+ // rather than actively shutdown our threads.
+ //Shutdown thread for inhaler.
+ ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ }
+ }
+
protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry)
{
return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum;
@@ -153,13 +221,14 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
/**
* Called when we are now flowing to disk
+ *
* @param queueEntry the entry that is being flowed to disk
*/
protected void flowingToDisk(QueueEntryImpl queueEntry)
{
try
{
- queueEntry.flow();
+ queueEntry.unload();
}
catch (UnableToFlowMessageException e)
{
@@ -182,4 +251,71 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
return _backingStore;
}
+ private class MessageInhaler implements Runnable
+ {
+ public void run()
+ {
+ String threadName = Thread.currentThread().getName();
+ Thread.currentThread().setName("Inhaler-" + _queue.getVirtualHost().getName() + "-" + _queue.getName());
+ try
+ {
+ inhaleList(this);
+ }
+ finally
+ {
+ Thread.currentThread().setName(threadName);
+ }
+ }
+ }
+
+ private void inhaleList(MessageInhaler messageInhaler)
+ {
+ _log.info("Inhaler Running");
+ // If in memory count is at or over max then we can't inhale
+ if (_atomicQueueInMemory.get() >= _memoryUsageMaximum)
+ {
+ _log.debug("Unable to start inhaling as we are already over quota:" +
+ _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum);
+ return;
+ }
+
+ _asynchronousInhaler.compareAndSet(messageInhaler, null);
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && _asynchronousInhaler.compareAndSet(null, messageInhaler))
+ {
+ QueueEntryIterator iterator = iterator();
+
+ while (!iterator.getNode().isAvailable() && iterator.advance())
+ {
+ //Find first AVAILABLE node
+ }
+
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && !iterator.atTail())
+ {
+ QueueEntry entry = iterator.getNode();
+
+ if (entry.isAvailable() && entry.isFlowed())
+ {
+ loadEntry(entry);
+ }
+
+ iterator.advance();
+ }
+
+ if (iterator.atTail())
+ {
+ setFlowed(false);
+ }
+
+ _asynchronousInhaler.set(null);
+ }
+
+ //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
+ if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+ {
+ _inhaler.execute(messageInhaler);
+
+ }
+
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
index 4e95978bf8..b547a41047 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.queue;
-public interface FlowableQueueEntryList
+import java.util.concurrent.atomic.AtomicLong;
+
+public interface FlowableQueueEntryList extends QueueEntryList
{
void setFlowed(boolean flowed);
@@ -38,5 +40,19 @@ public interface FlowableQueueEntryList
void setMemoryUsageMinimum(long minimumMemoryUsage);
- long getMemoryUsageMinimum();
+ long getMemoryUsageMinimum();
+
+ /**
+ * Immediately unload Entry
+ * @param queueEntry the entry to unload
+ */
+ public void unloadEntry(QueueEntry queueEntry);
+
+ /**
+ * Immediately load Entry
+ * @param queueEntry the entry to load
+ */
+ public void loadEntry(QueueEntry queueEntry);
+
+ void stop();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
index d812b8ceca..23307d8acf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
@@ -22,10 +22,10 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.CommonContentHeaderProperties;
-public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
+public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
{
private final AMQQueue _queue;
- private final QueueEntryList[] _priorityLists;
+ private final FlowableQueueEntryList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
@@ -33,7 +33,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
{
super(queue);
_queue = queue;
- _priorityLists = new QueueEntryList[priorities];
+ _priorityLists = new FlowableQueueEntryList[priorities];
_priorities = priorities;
_priorityOffset = 5-((priorities + 1)/2);
for(int i = 0; i < priorities; i++)
@@ -53,7 +53,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
}
public QueueEntry add(AMQMessage message)
- {
+ {
int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
if(index >= _priorities)
{
@@ -152,7 +152,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
_priorities = priorities;
}
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
{
return new PriorityQueueEntryList(queue, _priorities);
}
@@ -162,7 +162,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
public int size()
{
int size=0;
- for (QueueEntryList queueEntryList : _priorityLists)
+ for (FlowableQueueEntryList queueEntryList : _priorityLists)
{
size += queueEntryList.size();
}
@@ -174,9 +174,6 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
@Override
protected void flowingToDisk(QueueEntryImpl queueEntry)
{
- //TODO this disables FTD for priority queues
- // As the incomming message isn't always the one to purge.
- // More logic is required up in the add() method here to determine if the
- // incomming message is at the 'front' or not.
+ //This class doesn't maintain it's own sizes it delegates to the sub FlowableQueueEntryLists
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
index 376e6f1b57..1f575d1e05 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
@@ -26,11 +26,9 @@ import org.apache.commons.configuration.ConfigurationException;
public interface QueueBackingStore
{
- void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException;
+ AMQMessage load(Long messageId);
- AMQMessage recover(Long messageId);
-
- void flow(AMQMessage message) throws UnableToFlowMessageException;
+ void unload(AMQMessage message) throws UnableToFlowMessageException;
void delete(Long messageId);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java
new file mode 100644
index 0000000000..3dd23a2f40
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+
+public interface QueueBackingStoreFactory
+{
+ void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException;
+
+ public QueueBackingStore createBacking(AMQQueue queue);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 25d41c8203..7e41cf53a2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -157,6 +157,8 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
boolean isAcquired();
+ boolean isAvailable();
+
boolean acquire();
boolean acquire(Subscription sub);
@@ -219,9 +221,9 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
boolean removeStateChangeListener(StateChangeListener listener);
- void flow() throws UnableToFlowMessageException;
+ void unload() throws UnableToFlowMessageException;
- void recover();
+ void load();
boolean isFlowed();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 3d464d01d3..8ee03d3d74 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -181,6 +181,11 @@ public class QueueEntryImpl implements QueueEntry
return _state.getState() == State.ACQUIRED;
}
+ public boolean isAvailable()
+ {
+ return _state.getState() == State.AVAILABLE;
+ }
+
public boolean acquire()
{
return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
@@ -220,7 +225,15 @@ public class QueueEntryImpl implements QueueEntry
public String debugIdentity()
{
- return getMessage().debugIdentity();
+ String entry="[State:"+_state.getState().name()+"]";
+ if (_message == null)
+ {
+ return entry+"(Message Unloaded ID:" + _messageId +")";
+ }
+ else
+ {
+ return entry+_message.debugIdentity();
+ }
}
@@ -380,25 +393,29 @@ public class QueueEntryImpl implements QueueEntry
return false;
}
- public void flow() throws UnableToFlowMessageException
+ public void unload() throws UnableToFlowMessageException
{
if (_message != null && _backingStore != null)
{
if(_log.isDebugEnabled())
{
- _log.debug("Flowing message:" + _message.debugIdentity());
+ _log.debug("Unloading:" + debugIdentity());
}
- _backingStore.flow(_message);
+ _backingStore.unload(_message);
_message = null;
- _flowed.getAndSet(true);
+ _flowed.getAndSet(true);
}
}
- public void recover()
+ public void load()
{
if (_messageId != null && _backingStore != null)
{
- _message = _backingStore.recover(_messageId);
+ _message = _backingStore.load(_messageId);
+ if(_log.isDebugEnabled())
+ {
+ _log.debug("Loading:" + debugIdentity());
+ }
_flowed.getAndSet(false);
}
}
@@ -471,5 +488,4 @@ public class QueueEntryImpl implements QueueEntry
return _queueEntryList;
}
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 72783e3f78..313e076f61 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryList extends FlowableQueueEntryList
+public interface QueueEntryList
{
AMQQueue getQueue();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
index 4dbce45f67..b4a868cf3c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
@@ -22,5 +22,5 @@ package org.apache.qpid.server.queue;
interface QueueEntryListFactory
{
- public QueueEntryList createQueueEntryList(AMQQueue queue);
+ public FlowableQueueEntryList createQueueEntryList(AMQQueue queue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 7f46a6063a..fa67162228 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -79,7 +79,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private volatile Subscription _exclusiveSubscriber;
- protected final QueueEntryList _entries;
+ protected final FlowableQueueEntryList _entries;
private final AMQQueueMBean _managedObject;
private final Executor _asyncDelivery;
@@ -465,8 +465,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
_deliveredMessages.incrementAndGet();
+
+ if (entry.isFlowed())
+ {
+ _logger.debug("Synchronously loading flowed entry:" + entry.debugIdentity());
+ _entries.loadEntry(entry);
+ }
+
sub.send(entry);
+ // We have delivered this message so we can unload it.
+ if (_entries.isFlowed() && entry.isAcquired() && entry.getDeliveredToConsumer())
+ {
+ _entries.unloadEntry(entry);
+ }
+
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
@@ -1101,6 +1114,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (!_stopped.getAndSet(true))
{
ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ _entries.stop();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index 10abdd8318..c72353db6e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
* under the License.
*
*/
-public class SimpleQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
+public class SimpleQueueEntryList extends FlowableBaseQueueEntryList
{
private final QueueEntryImpl _head;
@@ -172,7 +172,7 @@ public class SimpleQueueEntryList extends FlowableBaseQueueEntryList implements
static class Factory implements QueueEntryListFactory
{
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
{
return new SimpleQueueEntryList(queue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index c5b6eeca3e..db05c7b299 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -49,7 +49,8 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.QueueBackingStore;
-import org.apache.qpid.server.queue.FileQueueBackingStore;
+import org.apache.qpid.server.queue.FileQueueBackingStoreFactory;
+import org.apache.qpid.server.queue.QueueBackingStoreFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.security.access.ACLManager;
@@ -88,7 +89,7 @@ public class VirtualHost implements Accessable
private final Timer _houseKeepingTimer;
private VirtualHostConfiguration _configuration;
- private QueueBackingStore _queueBackingStore;
+ private QueueBackingStoreFactory _queueBackingStoreFactory;
public void setAccessableName(String name)
{
@@ -116,9 +117,9 @@ public class VirtualHost implements Accessable
return _configuration ;
}
- public QueueBackingStore getQueueBackingStore()
+ public QueueBackingStoreFactory getQueueBackingStoreFactory()
{
- return _queueBackingStore;
+ return _queueBackingStoreFactory;
}
/**
@@ -194,8 +195,8 @@ public class VirtualHost implements Accessable
initialiseRoutingTable(hostConfig);
}
- _queueBackingStore = new FileQueueBackingStore();
- _queueBackingStore.configure(this,hostConfig);
+ _queueBackingStoreFactory = new FileQueueBackingStoreFactory();
+ _queueBackingStoreFactory.configure(this, hostConfig);
_exchangeFactory.initialise(hostConfig);
_exchangeRegistry.initialise();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 6021f100f5..4716f6691a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -240,6 +240,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isAvailable()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean acquire()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -346,12 +351,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void flow() throws UnableToFlowMessageException
+ public void unload() throws UnableToFlowMessageException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void recover()
+ public void load()
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index a11e60d7de..f73bafd3b4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import java.util.ArrayList;
@@ -64,7 +63,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
- ArrayList<QueueEntry> msgs = _subscription.getMessages();
+ ArrayList<QueueEntry> msgs = _subscription.getQueueEntries();
try
{
assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
index bb2a5f3d3b..d2cbd46e28 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
@@ -21,9 +21,9 @@
package org.apache.qpid.server.queue;
import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -42,22 +42,26 @@ import java.io.File;
public class FileQueueBackingStoreTest extends TestCase
{
- FileQueueBackingStore _backing;
+ QueueBackingStore _backing;
private TransactionLog _transactionLog;
VirtualHost _vhost;
- VirtualHostConfiguration _vhostConfig;
+ VirtualHostConfiguration _vhostConfig;
+ FileQueueBackingStoreFactory _factory;
+ AMQQueue _queue;
public void setUp() throws Exception
{
- _backing = new FileQueueBackingStore();
+ _factory = new FileQueueBackingStoreFactory();
PropertiesConfiguration config = new PropertiesConfiguration();
config.addProperty("store.class", MemoryMessageStore.class.getName());
_vhostConfig = new VirtualHostConfiguration(this.getName() + "-Vhost", config);
_vhost = new VirtualHost(_vhostConfig);
_transactionLog = _vhost.getTransactionLog();
- _backing.configure(_vhost, _vhost.getConfiguration());
+ _factory.configure(_vhost, _vhost.getConfiguration());
+ _queue = new SimpleAMQQueue(new AMQShortString(this.getName()), false, null, false, _vhost);
+ _backing = _factory.createBacking(_queue);
}
private void resetBacking(Configuration configuration) throws Exception
@@ -67,9 +71,11 @@ public class FileQueueBackingStoreTest extends TestCase
_vhost = new VirtualHost(_vhostConfig);
_transactionLog = _vhost.getTransactionLog();
- _backing = new FileQueueBackingStore();
+ _factory = new FileQueueBackingStoreFactory();
+
+ _factory.configure(_vhost, _vhost.getConfiguration());
- _backing.configure(_vhost, _vhost.getConfiguration());
+ _backing = _factory.createBacking(_queue);
}
public void testInvalidSetupRootExistsIsFile() throws Exception
@@ -171,18 +177,18 @@ public class FileQueueBackingStoreTest extends TestCase
chb);
if (chb.bodySize > 0)
{
- ContentChunk chunk = new MockContentChunk((int) chb.bodySize/2);
+ ContentChunk chunk = new MockContentChunk((int) chb.bodySize / 2);
original.addContentBodyFrame(null, chunk, false);
- chunk = new MockContentChunk((int) chb.bodySize/2);
+ chunk = new MockContentChunk((int) chb.bodySize / 2);
- original.addContentBodyFrame(null, chunk, true);
+ original.addContentBodyFrame(null, chunk, true);
}
- _backing.flow(original);
+ _backing.unload(original);
- AMQMessage fromDisk = _backing.recover(original.getMessageId());
+ AMQMessage fromDisk = _backing.load(original.getMessageId());
assertEquals("Message IDs do not match", original.getMessageId(), fromDisk.getMessageId());
assertEquals("Message arrival times do not match", original.getArrivalTime(), fromDisk.getArrivalTime());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 9a5f7f20c6..0e2b17914c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -20,6 +20,7 @@ package org.apache.qpid.server.queue;
*
*/
+import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -29,6 +30,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -58,7 +60,7 @@ public class SimpleAMQQueueTest extends TestCase
protected FieldTable _arguments = null;
MessagePublishInfo info = new MessagePublishInfoImpl();
- private static final long MESSAGE_SIZE = 100;
+ private static long MESSAGE_SIZE = 100;
@Override
protected void setUp() throws Exception
@@ -68,7 +70,7 @@ public class SimpleAMQQueueTest extends TestCase
ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _transactionLog);
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -362,56 +364,69 @@ public class SimpleAMQQueueTest extends TestCase
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = 500;
+ int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
//Set the Memory Usage to be very low
- _queue.setMemoryUsageMaximum(10);
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
- for (int msgCount = 0; msgCount < 10; msgCount++)
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
{
sendMessage(txnContext);
}
//Check that we can hold 10 messages without flowing
- assertEquals(10, _queue.getMessageCount());
- assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
assertTrue("Queue is flowed.", !_queue.isFlowed());
// Send anothe and ensure we are flowed
sendMessage(txnContext);
- assertEquals(11, _queue.getMessageCount());
- assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
- //send another 9 so there are 20msgs in total on the queue
- for (int msgCount = 0; msgCount < 9; msgCount++)
+ //send another 99 so there are 200msgs in total on the queue
+ for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
{
sendMessage(txnContext);
+
+ long usage = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + usage,
+ usage <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + usage,usage > 0);
+
}
- assertEquals(20, _queue.getMessageCount());
- assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
_queue.registerSubscription(_subscription, false);
- Thread.sleep(200);
+ int slept = 0;
+ while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+ {
+ Thread.sleep(500);
+ slept++;
+ }
//Ensure the messages are retreived
- assertEquals("Not all messages were received.", 20, _subscription.getMessages().size());
+ assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
- //Ensure we got the content
- for (int index = 0; index < 10; index++)
- {
- QueueEntry entry = _subscription.getMessages().get(index);
- assertNotNull("Message:" + index + " was null.", entry.getMessage());
- assertTrue(!entry.isFlowed());
- }
+ //Check the queue is still within it's limits.
+ assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
+ _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
- //ensure we were received 10 flowed messages
- for (int index = 10; index < 20; index++)
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0);
+
+ for (int index = 0; index < MESSAGE_COUNT; index++)
{
- QueueEntry entry = _subscription.getMessages().get(index);
- assertNull("Message:" + index + " was not null.", entry.getMessage());
- assertTrue(entry.isFlowed());
+ // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+ AMQMessage message = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
}
+
}
private void sendMessage(TransactionalContext txnContext) throws AMQException
@@ -419,7 +434,8 @@ public class SimpleAMQQueueTest extends TestCase
IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = 1;
+ contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ contentHeaderBody.bodySize = MESSAGE_SIZE;
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 832df80004..0c33b406e6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -46,7 +46,10 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
- assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
+ //This is +2 because:
+ // 1 - asyncDelivery Thread
+ // 2 - queueInhalerThread
+ assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount());
queue.stop();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 33fd669d5c..ab0870144b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -30,10 +30,13 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
+import org.apache.log4j.Logger;
public class MockSubscription implements Subscription
{
+ private static final Logger _logger = Logger.getLogger(MockSubscription.class);
private boolean _closed = false;
private AMQShortString tag = new AMQShortString("mocktag");
@@ -41,8 +44,12 @@ public class MockSubscription implements Subscription
private StateListener _listener = null;
private QueueEntry lastSeen = null;
private State _state = State.ACTIVE;
- private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
+ private ArrayList<QueueEntry> _queueEntries = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private ArrayList<AMQMessage> _messages = new ArrayList<AMQMessage>();
+
+
+
public void close()
{
@@ -136,10 +143,14 @@ public class MockSubscription implements Subscription
{
}
- public void send(QueueEntry msg) throws AMQException
+ public void send(QueueEntry entry) throws AMQException
{
- lastSeen = msg;
- messages.add(msg);
+ _logger.info("Sending Message(" + entry.debugIdentity() + ") to subscription:" + this);
+
+ lastSeen = entry;
+ _queueEntries.add(entry);
+ _messages.add(entry.getMessage());
+ entry.setDeliveredToSubscription();
}
public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
@@ -173,8 +184,14 @@ public class MockSubscription implements Subscription
return false;
}
- public ArrayList<QueueEntry> getMessages()
+ public ArrayList<QueueEntry> getQueueEntries()
{
- return messages;
+ return _queueEntries;
}
+
+ public ArrayList<AMQMessage> getMessages()
+ {
+ return _messages;
+ }
+
}