summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java34
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/failure/Create32kQueueWithoutFailure.java143
2 files changed, 174 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
index 21073c22ae..b53d5a99ee 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
@@ -138,16 +138,39 @@ public class FileQueueBackingStoreFactory implements QueueBackingStoreFactory
return createStore(name, 0);
}
+ /**
+ * Returns a hash code for non-null Object x.
+ * Uses the same hash code spreader as most other java.util hash tables.
+ *
+ * Borrowed from the Apache Harmony project
+ * @param x the object serving as a key
+ * @return the hash code
+ */
+ public static int hash(Object x) {
+ int h = x.hashCode();
+ h += ~(h << 9);
+ h ^= (h >>> 14);
+ h += (h << 4);
+ h ^= (h >>> 10);
+ return h;
+ }
+
private String createStore(String name, int index)
{
- String store = _flowToDiskLocation + File.separator + name;
+ int hash = hash(name);
+
+ long bin = hash & 0xFFL;
+
+ String store = _flowToDiskLocation + File.separator + bin + File.separator + name;
+
if (index > 0)
{
store += "-" + index;
}
- //TODO ensure store is safe for the OS
+ //TODO ensure name is safe for the OS i.e. on OSX you can't have any ':'
+ // Does java take care of this?
File storeFile = new File(store);
@@ -156,7 +179,12 @@ public class FileQueueBackingStoreFactory implements QueueBackingStoreFactory
return createStore(name, index + 1);
}
- storeFile.mkdirs();
+ // Ensure we report an error if we cannot create the backing store.
+ if (!storeFile.mkdirs())
+ {
+ _log.error("Unable to create queue backing directory for queue:" + name);
+ throw new RuntimeException("Unable to create queue backing directory for queue:" + name);
+ }
storeFile.deleteOnExit();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/Create32kQueueWithoutFailure.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/Create32kQueueWithoutFailure.java
new file mode 100644
index 0000000000..92f209d2d9
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/Create32kQueueWithoutFailure.java
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.failure;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.FileQueueBackingStoreFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.Map;
+import java.io.File;
+
+import junit.framework.TestCase;
+
+/**
+ * The idea behind this is to test how a broker with flow to disk copes when
+ * over 31998 queues are created in a single directory.
+ *
+ * As the Java broker uses a directory per queue as the queueBacking for FtD
+ * this test will fail until we do some sort of bin allocation for the queues.
+ */
+public class Create32kQueueWithoutFailure extends TestCase implements ConnectionListener
+{
+
+ static final int QUEUE_COUNT = 32000;
+
+ public void test() throws Exception
+ {
+ AMQConnection connection = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'");//(AMQConnection) getConnectionFactory("default").createConnection("guest", "guest");
+
+ connection.setConnectionListener(this);
+
+ AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Map<String, Object> arguments = new HashMap<String, Object>();
+
+ //Ensure we can call createQueue with a priority int value
+ arguments.put(AMQQueueFactory.QPID_POLICY_TYPE.toString(), AMQQueueFactory.QPID_FLOW_TO_DISK);
+ // Make a small limit just for show
+ arguments.put(AMQQueueFactory.QPID_MAX_SIZE.toString(), 1);
+
+ for (int index = 0; index < QUEUE_COUNT; index++)
+ { //getName() +
+ System.out.println("Creating:"+index);
+ session.createQueue(new AMQShortString( "TempQueue-" + index), false, false, false, arguments);
+ }
+
+ connection.close();
+ }
+
+ public void bytesSent(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bytesReceived(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return false; //Veto Failover
+ //If we cause a connection failure creating lots of queues
+ // then we don't want to attempt to resetup the session on a new
+ // connection.
+ }
+
+ public boolean preResubscribe()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void failoverComplete()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ /**
+ * Simple test app that shows the distribution of 32000 'queues' callled
+ * 'TempQueue-<ID>' where ID is 0-32000
+ *
+ * Using straight Object.hashCode() we get quite an uneven distribution
+ * but using the hash function from Harmony's ConcurrentHashMap we smooth
+ * things out.
+ *
+ * @param args
+ */
+ public static void main(String[] args)
+ {
+
+ int[] hit = new int[256];
+ String name = "TempQueue-";
+ for (int index = 0; index < QUEUE_COUNT; index++)
+ {
+ int hash = FileQueueBackingStoreFactory.hash(name + index);
+
+ long bin = hash & 0xFFL;
+
+ File dir = new File(System.getProperty("java.io.tmpdir")+File.separator+bin);
+
+ if (dir.exists())
+ {
+ hit[(int)bin]++;
+ }
+ else
+ {
+ dir.mkdirs();
+ dir.deleteOnExit();
+ }
+ }
+
+ for (int index = 0; index < hit.length; index++)
+ {
+ System.out.println("Bin:" + index + " Hit:" + hit[index]);
+ }
+
+ }
+
+}