diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-11 15:22:03 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-11 15:22:03 +0000 |
| commit | 531f130c20f73c08ffc963c1e15c7b98cea05d8f (patch) | |
| tree | d30c93bde8eb6515c076568102a008702c98567a /java/systests/src | |
| parent | 1a7004fd9dab8306ec31b96d9d5e2d5a44256d98 (diff) | |
| download | qpid-python-531f130c20f73c08ffc963c1e15c7b98cea05d8f.tar.gz | |
Updates on the refactoring work
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
11 files changed, 554 insertions, 527 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 144e4be6af..4e39367da1 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -26,13 +26,16 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.QueueEntryImpl; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -100,12 +103,16 @@ public class TxAckTest extends TestCase private final List<Long> _unacked; private StoreContext _storeContext = new StoreContext(); - Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws AMQException + Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception { TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(), _storeContext, null, new LinkedList<RequiredDeliveryException>() ); + AMQQueue queue = + AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("", new MemoryMessageStore()), + null); + for (int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; @@ -140,7 +147,7 @@ public class TxAckTest extends TestCase }; TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); - _map.add(deliveryTag, new QueueEntryImpl(null,message, Long.MIN_VALUE)); + _map.add(deliveryTag, queue.enqueue(new StoreContext(), message)); } _acked = acked; _unacked = unacked; diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 80470d44b3..74d7b39b72 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.log4j.Logger; @@ -236,7 +237,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase return properties; } - static class TestQueue extends AMQQueueImpl + static class TestQueue extends SimpleAMQQueue { final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); @@ -250,12 +251,160 @@ public class AbstractHeadersExchangeTestBase extends TestCase * not invoked. It is unnecessary since for this test we only care to know whether the message was * sent to the queue; the queue processing logic is not being tested. * @param msg - * @param deliverFirst * @throws AMQException */ - public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException + @Override + public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException { - messages.add( new HeadersExchangeTest.Message(msg.getMessage())); + messages.add( new HeadersExchangeTest.Message(msg)); + return new QueueEntry() + { + + public AMQQueue getQueue() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public AMQMessage getMessage() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getSize() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean getDeliveredToConsumer() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean expired() throws AMQException + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isAcquired() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean acquire() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean acquire(Subscription sub) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean delete() + { + return false; + } + + public boolean isDeleted() + { + return false; + } + + public boolean acquiredBySubscription() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setDeliveredToSubscription() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void release() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public String debugIdentity() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean immediateAndNotDelivered() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setRedelivered(boolean b) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Subscription getDeliveredSubscription() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void reject() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void reject(Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isRejectedBy(Subscription subscription) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void requeue(StoreContext storeContext) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void dequeue(final StoreContext storeContext) throws FailedDequeueException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void dispose(final StoreContext storeContext) throws MessageCleanupException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void restoreCredit() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void discard(StoreContext storeContext) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isQueueDeleted() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void addStateChangeListener(StateChangeListener listener) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean removeStateChangeListener(StateChangeListener listener) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public int compareTo(final QueueEntry o) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + }; } boolean isInQueue(Message msg) diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index 6f8963131b..8e7038eec3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -59,7 +59,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase false, new AMQShortString("test"), true, - _protocolSession.getVirtualHost()); + _protocolSession.getVirtualHost(), null); AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore); channel.setDefaultQueue(queue); _protocolSession.addChannel(channel); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index 30240217a2..dae81a875d 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -78,7 +78,8 @@ public class AckTest extends TestCase _protocolSession.addChannel(_channel); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test")); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), + null); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java deleted file mode 100644 index 3ff691c792..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.subscription.Subscription; - -import java.util.*; -import java.util.concurrent.Executor; - -/** - * Tests delivery in the face of concurrent incoming _messages, subscription alterations - * and attempts to asynchronously process queued _messages. - */ -public class ConcurrencyTestDisabled extends MessageTestHelper -{ - private final Random random = new Random(); - - private final int numMessages = 1000; - - private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>(); - private final Set<Subscription> _active = new HashSet<Subscription>(); - private final List<QueueEntry> _messages = new ArrayList<QueueEntry>(); - private int next = 0;//index to next message to send - private final List<QueueEntry> _received = Collections.synchronizedList(new ArrayList<QueueEntry>()); - private final Executor _executor = new OnCurrentThreadExecutor(); - private final List<Thread> _threads = new ArrayList<Thread>(); - - private final SubscriptionSet _subscriptionMgr; - private final ConcurrentSelectorDeliveryManager _deliveryMgr; - - private boolean isComplete; - private boolean failed; - private VirtualHost _virtualHost; - - public ConcurrencyTestDisabled() throws Exception - { - - IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); - _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager( new AMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, - _virtualHost)); - _subscriptionMgr = _deliveryMgr.getSubscribers(); - } - - public void testConcurrent1() throws InterruptedException, AMQException - { - initSubscriptions(10); - initMessages(numMessages); - initThreads(1, 4, 4, 4); - doRun(); - check(); - } - - public void testConcurrent2() throws InterruptedException, AMQException - { - initSubscriptions(10); - initMessages(numMessages); - initThreads(4, 2, 2, 2); - doRun(); - check(); - } - - void check() - { - assertFalse("Failed", failed); - - _deliveryMgr.processAsync(_executor); - - assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size()); - for(int i = 0; i < _messages.size(); i++) - { - assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i)); - } - } - - void initSubscriptions(int subscriptions) - { - for(int i = 0; i < subscriptions; i++) - { - _subscribers.add(new SubscriptionTestHelper("Subscriber" + i, _received)); - } - } - - void initMessages(int messages) throws AMQException - { - for(int i = 0; i < messages; i++) - { - _messages.add(message()); - } - } - - void initThreads(int senders, int subscribers, int suspenders, int processors) - { - addThreads(senders, senders == 1 ? new Sender() : new OrderedSender()); - addThreads(subscribers, new Subscriber()); - addThreads(suspenders, new Suspender()); - addThreads(processors, new Processor()); - } - - void addThreads(int count, Runnable runner) - { - for(int i = 0; i < count; i++) - { - _threads.add(new Thread(runner, runner.toString())); - } - } - - void doRun() throws InterruptedException - { - for(Thread t : _threads) - { - t.start(); - } - - for(Thread t : _threads) - { - t.join(); - } - } - - private void toggle(Subscription s) - { - synchronized (_active) - { - if (_active.contains(s)) - { - _active.remove(s); - Subscription result = _subscriptionMgr.removeSubscriber(s); - assertTrue("Removed subscription " + result + " but trying to remove subscription " + s, - result != null && result.equals(s)); - } - else - { - _active.add(s); - _subscriptionMgr.addSubscriber(s); - } - } - } - - private QueueEntry nextMessage() - { - synchronized (_messages) - { - if (next < _messages.size()) - { - return _messages.get(next++); - } - else - { - if (!_deliveryMgr.hasQueuedMessages()) { - isComplete = true; - } - return null; - } - } - } - - private boolean randomBoolean() - { - return random.nextBoolean(); - } - - private SubscriptionTestHelper randomSubscriber() - { - return _subscribers.get(random.nextInt(_subscribers.size())); - } - - private class Sender extends Runner - { - void doRun() throws Throwable - { - QueueEntry msg = nextMessage(); - if (msg != null) - { - _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false); - } - } - } - - private class OrderedSender extends Sender - { - synchronized void doRun() throws Throwable - { - super.doRun(); - } - } - - private class Suspender extends Runner - { - void doRun() throws Throwable - { - randomSubscriber().setSuspended(randomBoolean()); - } - } - - private class Subscriber extends Runner - { - void doRun() throws Throwable - { - toggle(randomSubscriber()); - } - } - - private class Processor extends Runner - { - void doRun() throws Throwable - { - _deliveryMgr.processAsync(_executor); - } - } - - private abstract class Runner implements Runnable - { - public void run() - { - try - { - while (!stop()) - { - doRun(); - } - } - catch (Throwable t) - { - failed = true; - t.printStackTrace(); - } - } - - abstract void doRun() throws Throwable; - - boolean stop() - { - return isComplete || failed; - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(ConcurrencyTestDisabled.class); - } - -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java new file mode 100644 index 0000000000..3d53b87b45 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java @@ -0,0 +1,171 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +import javax.jms.*; +import javax.naming.NamingException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; + +/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */ +public class PriorityTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(PriorityTest.class); + + + protected final String BROKER = "vm://:1"; + protected final String VHOST = "/test"; + protected final String QUEUE = "PriorityQueue"; + + + private static final int MSG_COUNT = 50; + + protected void setUp() throws Exception + { + super.setUp(); + + if (usingInVMBroker()) + { + TransportConnection.createVMBroker(1); + } + + + } + + private boolean usingInVMBroker() + { + return BROKER.startsWith("vm://"); + } + + protected void tearDown() throws Exception + { + if (usingInVMBroker()) + { + TransportConnection.killAllVMBrokers(); + } + super.tearDown(); + } + + public void testPriority() throws JMSException, NamingException, AMQException + { + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + + Context context = factory.getInitialContext(env); + + Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final FieldTable arguments = new FieldTable(); + arguments.put(new AMQShortString("x-qpid-priorities"),10); + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + + Queue queue = (Queue) context.lookup("queue"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + + + + + + + producerConnection.start(); + + + MessageProducer producer = producerSession.createProducer(queue); + + + + + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.setPriority(msg % 10); + producer.send(nextMessage(msg, false, producerSession, producer)); + } + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + + + + consumerConnection.start(); + + Message received; + //Receive Message 0 + StringBuilder buf = new StringBuilder(); + int receivedCount = 0; + Message previous = null; + while((received = consumer.receive(1000))!=null) + { + if(previous != null) + { + assertTrue("Messages arrived in unexpected order", (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) ); + } + + previous = received; + receivedCount++; + } + + assertEquals("Incorrect number of message received", 50, receivedCount); + + producerSession.close(); + producer.close(); + + } + + private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + send.setIntProperty("msg", msg); + + return send; + } + + +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java deleted file mode 100644 index 1200d7fc2b..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; -import org.apache.qpid.server.subscription.Subscription; - -public class SubscriptionManagerTest extends TestCase -{ - private final SubscriptionSet mgr = new SubscriptionSet(); - - public void testBasicSubscriptionManagement() - { - assertTrue(mgr.isEmpty()); - assertFalse(mgr.hasActiveSubscribers()); - SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1"); - mgr.addSubscriber(s1); - assertFalse(mgr.isEmpty()); - assertTrue(mgr.hasActiveSubscribers()); - - SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2"); - mgr.addSubscriber(s2); - - s2.setSuspended(true); - assertFalse(mgr.isEmpty()); - assertTrue(mgr.hasActiveSubscribers()); - assertTrue(s2.isSuspended()); - assertFalse(s1.isSuspended()); - - s1.setSuspended(true); - assertFalse(mgr.hasActiveSubscribers()); - - mgr.removeSubscriber(s1); - assertFalse(mgr.isEmpty()); - mgr.removeSubscriber(s2); - assertTrue(mgr.isEmpty()); - } - - public void testRoundRobin() - { - SubscriptionTestHelper a = new SubscriptionTestHelper("A"); - SubscriptionTestHelper b = new SubscriptionTestHelper("B"); - SubscriptionTestHelper c = new SubscriptionTestHelper("C"); - SubscriptionTestHelper d = new SubscriptionTestHelper("D"); - mgr.addSubscriber(a); - mgr.addSubscriber(b); - mgr.addSubscriber(c); - mgr.addSubscriber(d); - - for (int i = 0; i < 3; i++) - { - assertEquals(a, mgr.nextSubscriber(null)); - assertEquals(b, mgr.nextSubscriber(null)); - assertEquals(c, mgr.nextSubscriber(null)); - assertEquals(d, mgr.nextSubscriber(null)); - } - - c.setSuspended(true); - - for (int i = 0; i < 3; i++) - { - assertEquals(a, mgr.nextSubscriber(null)); - assertEquals(b, mgr.nextSubscriber(null)); - assertEquals(d, mgr.nextSubscriber(null)); - } - - mgr.removeSubscriber(a); - d.setSuspended(true); - c.setSuspended(false); - Subscription e = new SubscriptionTestHelper("D"); - mgr.addSubscriber(e); - - for (int i = 0; i < 3; i++) - { - assertEquals(b, mgr.nextSubscriber(null)); - assertEquals(c, mgr.nextSubscriber(null)); - assertEquals(e, mgr.nextSubscriber(null)); - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(SubscriptionManagerTest.class); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java deleted file mode 100644 index bcf54693d3..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; - -public class SubscriptionSetTest extends TestCase -{ - /** - * A SubscriptionSet that counts the number of items scanned. - */ - static class TestSubscriptionSet extends SubscriptionSet - { - private int scanned = 0; - - void resetScanned() - { - scanned = 0; - } - - protected void subscriberScanned() - { - ++scanned; - } - - int getScanned() - { - return scanned; - } - } - - final SubscriptionTestHelper sub1 = new SubscriptionTestHelper("1"); - final SubscriptionTestHelper sub2 = new SubscriptionTestHelper("2"); - final SubscriptionTestHelper sub3 = new SubscriptionTestHelper("3"); - - final SubscriptionTestHelper suspendedSub1 = new SubscriptionTestHelper("sus1", true); - final SubscriptionTestHelper suspendedSub2 = new SubscriptionTestHelper("sus2", true); - final SubscriptionTestHelper suspendedSub3 = new SubscriptionTestHelper("sus3", true); - - public void testNextMessage() - { - SubscriptionSet ss = new SubscriptionSet(); - assertNull(ss.nextSubscriber(null)); - assertEquals(0, ss.getCurrentSubscriber()); - - ss.addSubscriber(sub1); - assertEquals(sub1, ss.nextSubscriber(null)); - assertEquals(1, ss.getCurrentSubscriber()); - assertEquals(sub1, ss.nextSubscriber(null)); - assertEquals(1, ss.getCurrentSubscriber()); - - ss.addSubscriber(sub2); - ss.addSubscriber(sub3); - - assertEquals(sub2, ss.nextSubscriber(null)); - assertEquals(2, ss.getCurrentSubscriber()); - - assertEquals(sub3, ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - } - - public void testNextMessageWhenAllSuspended() - { - SubscriptionSet ss = createAllSuspendedSubscriptionSet(); - assertNull(ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - - assertNull(ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - } - - private TestSubscriptionSet createAllSuspendedSubscriptionSet() - { - TestSubscriptionSet ss = new TestSubscriptionSet(); - ss.addSubscriber(suspendedSub1); - ss.addSubscriber(suspendedSub2); - ss.addSubscriber(suspendedSub3); - return ss; - } - - public void testNextMessageAfterRemove() - { - SubscriptionSet ss = new SubscriptionSet(); - ss.addSubscriber(suspendedSub1); - ss.addSubscriber(suspendedSub2); - ss.addSubscriber(sub3); - assertEquals(sub3, ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - - assertEquals(suspendedSub1, ss.removeSubscriber(suspendedSub1)); - - assertEquals(sub3, ss.nextSubscriber(null)); // Current implementation handles OutOfBoundsException here. - assertEquals(2, ss.getCurrentSubscriber()); - } - - public void testNextMessageOverScanning() - { - TestSubscriptionSet ss = new TestSubscriptionSet(); - SubscriptionTestHelper sub = new SubscriptionTestHelper("test"); - ss.addSubscriber(suspendedSub1); - ss.addSubscriber(sub); - ss.addSubscriber(suspendedSub3); - assertEquals(sub, ss.nextSubscriber(null)); - assertEquals(2, ss.getCurrentSubscriber()); - assertEquals(2, ss.getScanned()); - - ss.resetScanned(); - sub.setSuspended(true); - assertNull(ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - // Current implementation overscans by one item here. - assertEquals(ss.size() + 1, ss.getScanned()); - } - - public void testNextMessageOverscanWorstCase() { - TestSubscriptionSet ss = createAllSuspendedSubscriptionSet(); - ss.nextSubscriber(null); - // Scans the subscriptions twice. - assertEquals(ss.size() * 2, ss.getScanned()); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(SubscriptionSetTest.class); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 624a368b1f..7fd46474ab 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -106,12 +106,12 @@ public class SubscriptionTestHelper implements Subscription //To change body of implemented methods use File | Settings | File Templates. } - public Object getQueueContext() + public QueueEntry getLastSeenEntry() { return null; //To change body of implemented methods use File | Settings | File Templates. } - public boolean setQueueContext(Object expected, Object newValue) + public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) { return false; //To change body of implemented methods use File | Settings | File Templates. } @@ -126,7 +126,7 @@ public class SubscriptionTestHelper implements Subscription //no-op } - public AMQShortString getConumerTag() + public AMQShortString getConsumerTag() { return null; //To change body of implemented methods use File | Settings | File Templates. } @@ -141,6 +141,11 @@ public class SubscriptionTestHelper implements Subscription return null; } + public QueueEntry.SubscriptionAcquiredState getOwningState() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public void queueDeleted(AMQQueue queue) { } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 57bc173812..dea52fea5a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -25,7 +25,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.queue.AMQQueueImpl; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java new file mode 100644 index 0000000000..347217f732 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java @@ -0,0 +1,209 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.test.client; + +import org.apache.qpid.test.VMTestCase; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +import javax.jms.*; +import javax.naming.NamingException; +import java.util.Enumeration; + +public class FlowControlTest extends VMTestCase +{ + private static final Logger _logger = Logger.getLogger(FlowControlTest.class); + + private Connection _clientConnection; + private Session _clientSession; + private Queue _queue; + + public void setUp() throws Exception + { + + super.setUp(); + + + } + + /** + * Simply + */ + public void testBasicBytesFlowControl() throws JMSException, NamingException, AMQException + { + _queue = new AMQQueue("amq.direct","testqueue");//(Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + BytesMessage m1 = producerSession.createBytesMessage(); + m1.writeBytes(new byte[128]); + m1.setIntProperty("msg",1); + producer.send(m1); + BytesMessage m2 = producerSession.createBytesMessage(); + m2.writeBytes(new byte[128]); + m2.setIntProperty("msg",2); + producer.send(m2); + BytesMessage m3 = producerSession.createBytesMessage(); + m3.writeBytes(new byte[256]); + m3.setIntProperty("msg",3); + producer.send(m3); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession)consumerSession).setPrefecthLimits(0,256); + MessageConsumer recv = consumerSession.createConsumer(_queue); + consumerConnection.start(); + + Message r1 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); + + Message r2 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); + + Message r3 = recv.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r1.acknowledge(); + + r3 = recv.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r2.acknowledge(); + + + r3 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("Third message not received", r3); + assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); + + r3.acknowledge(); + recv.close(); + consumerSession.close(); + consumerConnection.close(); + + } + + public void testTwoConsumersBytesFlowControl() throws JMSException, NamingException, AMQException + { + _queue = new AMQQueue("amq.direct","testqueue1");//(Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + BytesMessage m1 = producerSession.createBytesMessage(); + m1.writeBytes(new byte[128]); + m1.setIntProperty("msg",1); + producer.send(m1); + BytesMessage m2 = producerSession.createBytesMessage(); + m2.writeBytes(new byte[256]); + m2.setIntProperty("msg",2); + producer.send(m2); + BytesMessage m3 = producerSession.createBytesMessage(); + m3.writeBytes(new byte[128]); + m3.setIntProperty("msg",3); + producer.send(m3); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + Session consumerSession1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession)consumerSession1).setPrefecthLimits(0,256); + MessageConsumer recv1 = consumerSession1.createConsumer(_queue); + + consumerConnection.start(); + + Message r1 = recv1.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); + + + Message r2 = recv1.receiveNoWait(); + assertNull("Second message incorrectly delivered", r2); + + Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession)consumerSession2).setPrefecthLimits(0,256); + MessageConsumer recv2 = consumerSession2.createConsumer(_queue); + + + r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); + + Message r3 = recv2.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT); + assertNotNull("Third message not received", r3); + assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); + + + + r2.acknowledge(); + r3.acknowledge(); + recv1.close(); + recv2.close(); + consumerSession1.close(); + consumerSession2.close(); + consumerConnection.close(); + + } + +} |
