diff options
Diffstat (limited to 'qpid/java')
2 files changed, 174 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java index 21073c22ae..b53d5a99ee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java @@ -138,16 +138,39 @@ public class FileQueueBackingStoreFactory implements QueueBackingStoreFactory return createStore(name, 0); } + /** + * Returns a hash code for non-null Object x. + * Uses the same hash code spreader as most other java.util hash tables. + * + * Borrowed from the Apache Harmony project + * @param x the object serving as a key + * @return the hash code + */ + public static int hash(Object x) { + int h = x.hashCode(); + h += ~(h << 9); + h ^= (h >>> 14); + h += (h << 4); + h ^= (h >>> 10); + return h; + } + private String createStore(String name, int index) { - String store = _flowToDiskLocation + File.separator + name; + int hash = hash(name); + + long bin = hash & 0xFFL; + + String store = _flowToDiskLocation + File.separator + bin + File.separator + name; + if (index > 0) { store += "-" + index; } - //TODO ensure store is safe for the OS + //TODO ensure name is safe for the OS i.e. on OSX you can't have any ':' + // Does java take care of this? File storeFile = new File(store); @@ -156,7 +179,12 @@ public class FileQueueBackingStoreFactory implements QueueBackingStoreFactory return createStore(name, index + 1); } - storeFile.mkdirs(); + // Ensure we report an error if we cannot create the backing store. + if (!storeFile.mkdirs()) + { + _log.error("Unable to create queue backing directory for queue:" + name); + throw new RuntimeException("Unable to create queue backing directory for queue:" + name); + } storeFile.deleteOnExit(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/Create32kQueueWithoutFailure.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/Create32kQueueWithoutFailure.java new file mode 100644 index 0000000000..92f209d2d9 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/Create32kQueueWithoutFailure.java @@ -0,0 +1,143 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.failure; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.FileQueueBackingStoreFactory; +import org.apache.qpid.test.utils.QpidTestCase; + +import javax.jms.Session; +import java.util.HashMap; +import java.util.Map; +import java.io.File; + +import junit.framework.TestCase; + +/** + * The idea behind this is to test how a broker with flow to disk copes when + * over 31998 queues are created in a single directory. + * + * As the Java broker uses a directory per queue as the queueBacking for FtD + * this test will fail until we do some sort of bin allocation for the queues. + */ +public class Create32kQueueWithoutFailure extends TestCase implements ConnectionListener +{ + + static final int QUEUE_COUNT = 32000; + + public void test() throws Exception + { + AMQConnection connection = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'");//(AMQConnection) getConnectionFactory("default").createConnection("guest", "guest"); + + connection.setConnectionListener(this); + + AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Map<String, Object> arguments = new HashMap<String, Object>(); + + //Ensure we can call createQueue with a priority int value + arguments.put(AMQQueueFactory.QPID_POLICY_TYPE.toString(), AMQQueueFactory.QPID_FLOW_TO_DISK); + // Make a small limit just for show + arguments.put(AMQQueueFactory.QPID_MAX_SIZE.toString(), 1); + + for (int index = 0; index < QUEUE_COUNT; index++) + { //getName() + + System.out.println("Creating:"+index); + session.createQueue(new AMQShortString( "TempQueue-" + index), false, false, false, arguments); + } + + connection.close(); + } + + public void bytesSent(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bytesReceived(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean preFailover(boolean redirect) + { + return false; //Veto Failover + //If we cause a connection failure creating lots of queues + // then we don't want to attempt to resetup the session on a new + // connection. + } + + public boolean preResubscribe() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void failoverComplete() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + /** + * Simple test app that shows the distribution of 32000 'queues' callled + * 'TempQueue-<ID>' where ID is 0-32000 + * + * Using straight Object.hashCode() we get quite an uneven distribution + * but using the hash function from Harmony's ConcurrentHashMap we smooth + * things out. + * + * @param args + */ + public static void main(String[] args) + { + + int[] hit = new int[256]; + String name = "TempQueue-"; + for (int index = 0; index < QUEUE_COUNT; index++) + { + int hash = FileQueueBackingStoreFactory.hash(name + index); + + long bin = hash & 0xFFL; + + File dir = new File(System.getProperty("java.io.tmpdir")+File.separator+bin); + + if (dir.exists()) + { + hit[(int)bin]++; + } + else + { + dir.mkdirs(); + dir.deleteOnExit(); + } + } + + for (int index = 0; index < hit.length; index++) + { + System.out.println("Bin:" + index + " Hit:" + hit[index]); + } + + } + +} |
