diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-09-28 10:41:49 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-09-28 10:41:49 +0000 |
| commit | bcab74d5db0b5111809c9519f552524dcc4daf2c (patch) | |
| tree | 49dc98c21995dc9744ce5da923779f1c3ab700f0 /java | |
| parent | 80f4de5c935c466bffef714e9e598cd8f354ebce (diff) | |
| download | qpid-python-bcab74d5db0b5111809c9519f552524dcc4daf2c.tar.gz | |
Merged revisions 574555-574873,574875-574901,574903-575737,575739-575787,575789-575810,575812-577772,577774-577940,577942-578057,578059-578732,578734,578736-578744,578746-578827,578829-578844,578846-580022 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
........
r574555 | ritchiem | 2007-09-11 12:39:10 +0100 (Tue, 11 Sep 2007) | 1 line
QPID-590 : Provide test case and resolution to prevent deadlock occurring on the client when two threads work on the AMQSession object.
........
r574585 | rgreig | 2007-09-11 14:02:19 +0100 (Tue, 11 Sep 2007) | 1 line
QPID-591 Fixed to use dirname and avoid working directory issues.
........
r579115 | ritchiem | 2007-09-25 09:15:04 +0100 (Tue, 25 Sep 2007) | 1 line
QPID-604 Commited patch provided by Aidan Skinner, to prevent NPE in FieldTable.
........
r579147 | ritchiem | 2007-09-25 10:27:22 +0100 (Tue, 25 Sep 2007) | 2 lines
QPID-610 : Fix for Returned Messages leak. Augmented TestableMemoryMessageStore to allow it to query the MemoryMessageStore in use by the broker.
New MessageReturnTest to verify no leak after messages have been returned.
........
r579198 | ritchiem | 2007-09-25 12:13:23 +0100 (Tue, 25 Sep 2007) | 1 line
Update to start M2.1 python test broker on port 2100 (management 2101) and for the tests to use that broker.
MODIFIED: to start on port 2000 and 2001 as this is M2
........
r579229 | ritchiem | 2007-09-25 13:51:09 +0100 (Tue, 25 Sep 2007) | 3 lines
QPID-610 : Fix for Get NO_ACK leak. The Java Client doesn't use get so augmented the python test_get to send persistent messages and used debugger to verify that messages were correctly removed. Verified that prior to this commit they would remain in the store. We need a management exchange to fully validate this with a python tests.
NOTE: The setting of "delivery mode" property on M2.1 is not the same as on trunk where _ is use such as "delivery_mode".
There is also no error that you have sent an incorrect property.
........
r579574 | ritchiem | 2007-09-26 11:42:54 +0100 (Wed, 26 Sep 2007) | 4 lines
QPID-610 : Fix for ManagementConcole NO_ACK & Msg Expire leaks. Updated AMQQueueMBeanTest to verify the contents of the MessageStore after clearing.
All the MC features only dequeued the message and didn't correctly decrementReference.
The MessageReturnTest was failing due to the TimeToLive test causing messages to be left on the store. The expired messages were not having the reference decremented.
........
r579577 | ritchiem | 2007-09-26 11:45:21 +0100 (Wed, 26 Sep 2007) | 5 lines
Updated TransportConnection to synchronize around the creation/destruction of VM Brokers. I had observed a ConcurrentModificationException in the KillAllVMBrokers().
This isn't good this suggests that the tests are overlapping. This fix won't address that problem but will stop any CModifications occuring. If there is test setup/teardown overlapping we should now see tests failing because the VM broker isn't there.
Potentially addresses VM issues in QPID-596
........
r579578 | ritchiem | 2007-09-26 11:48:14 +0100 (Wed, 26 Sep 2007) | 1 line
Updated the version of slf4j-simple to be one that would work. Changing the systests/pom.xml to depend on this rather than the slf4j-log4j will cause maven to provide more details in the tests. All info and above is logged.
........
r579602 | rupertlssmith | 2007-09-26 12:27:45 +0100 (Wed, 26 Sep 2007) | 1 line
Added timeout to perftests, to fail tests if message loss causes test to jam.
........
r579614 | rupertlssmith | 2007-09-26 12:51:14 +0100 (Wed, 26 Sep 2007) | 1 line
Added timeout to perftests, wait limit set to higher value to stop threads thashing.
........
r579709 | ritchiem | 2007-09-26 17:40:09 +0100 (Wed, 26 Sep 2007) | 1 line
Update for three tests that don't remove their VMBroker
........
r580022 | ritchiem | 2007-09-27 15:27:22 +0100 (Thu, 27 Sep 2007) | 18 lines
QPID-596 : ConnectionStartTest was broken. I've fixed it but here is the problem for those like me that like to know why:
Previously:
The setUp method created a producer connection and then sent a message
- This will result in that message being bounced as there is no consumer.
The first test should fail but the test was wrong, which caused it to pass.
There was an assert that was expecting the receive a message yet the test was recieve() == null !!!!
The second test worked because the broker was not killed between tests
This left the queue created so on the second run the message was delivered causing the test to succeed.
Now:
Fixed the InVM broker setup/teardown so the client is created first and the broker removed at the end of the test.
Also updated the asserts to be more explicit rather than having the == null or !=null put that as assertNull/NotNull.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@580293 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
24 files changed, 1219 insertions, 487 deletions
diff --git a/java/broker/etc/log4j.xml b/java/broker/etc/log4j.xml index 2060246b7f..af8e7a8293 100644 --- a/java/broker/etc/log4j.xml +++ b/java/broker/etc/log4j.xml @@ -76,6 +76,7 @@ <category name="Qpid.Broker"> <priority value="debug"/> <appender-ref ref="AlertFile"/> + <appender-ref ref="STDOUT"/> </category> <category name="org.apache.qpid.server.queue.AMQQueueMBean"> diff --git a/java/broker/pom.xml b/java/broker/pom.xml index b1dbded542..4901f1d8ad 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -204,18 +204,12 @@ <configuration> <tasks> - <condition property="broker.dir" - else="${user.dir}${file.separator}broker" - value="${user.dir}"> - <contains string="${user.dir}" substring="broker" /> - </condition> - <condition property="skip-python-tests" value="true"> <isset property="skip.python.tests"/> </condition> <property name="command" - value="python run-tests -v -I java_failing.txt"/> + value="python run-tests -v -I java_failing.txt -b localhost:2000"/> <!--value="bash -c 'python run-tests -v -I java_failing.txt'"/>--> <ant antfile="python-test.xml" inheritRefs="true"> diff --git a/java/broker/python-test.xml b/java/broker/python-test.xml index a31dae0060..e044207948 100755 --- a/java/broker/python-test.xml +++ b/java/broker/python-test.xml @@ -1,50 +1,56 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<!-- ====================================================================== --> -<!-- Ant build file (http://ant.apache.org/) for Ant 1.6.2 or above. --> -<!-- ====================================================================== --> - -<project basedir="." default="default"> - - <target name="default" > - <echo message="Used via maven to run python tests."/> - </target> - - <property name="pythondir" value="../../python"/> - - <target name="run-tests" unless="skip-python-tests"> - - <echo message="Starting Broker with command"/> - - <java classname="org.apache.qpid.server.RunBrokerWithCommand" - fork="true" - dir="${pythondir}" - failonerror="true" - > - <arg value="${command}"/> - - <classpath refid="maven.test.classpath"/> - <sysproperty key="QPID_HOME" value="${broker.dir}"/> - <sysproperty key="QPID_WORK" value="${broker.dir}${file.separator}target"/> - </java> - - </target> -</project> +<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<!-- ====================================================================== -->
+<!-- Ant build file (http://ant.apache.org/) for Ant 1.6.2 or above. -->
+<!-- ====================================================================== -->
+
+<project basedir="." default="default">
+
+ <target name="default" >
+ <echo message="Used via maven to run python tests."/>
+ </target>
+
+ <dirname property="broker.dir" file="${ant.file.python-test}"/>
+
+ <property name="pythondir" value="${broker.dir}/../../python"/>
+
+ <target name="run-tests" unless="skip-python-tests">
+
+ <echo message="Starting Broker with command"/>
+
+ <java classname="org.apache.qpid.server.RunBrokerWithCommand"
+ fork="true"
+ dir="${pythondir}"
+ failonerror="true"
+ >
+ <arg value="${command}"/>
+ <arg value="-p"/>
+ <arg value="2000"/>
+ <arg value="-m"/>
+ <arg value="2001"/>
+
+ <classpath refid="maven.test.classpath"/>
+ <sysproperty key="QPID_HOME" value="${broker.dir}"/>
+ <sysproperty key="QPID_WORK" value="${broker.dir}${file.separator}target"/>
+ </java>
+
+ </target>
+</project>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index f19b6823b8..d3b459c48a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -943,6 +943,8 @@ public class AMQChannel AMQMessage message = bouncedMessage.getAMQMessage(); session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage())); + + message.decrementReference(_storeContext); } _returnMessages.clear(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index faa5d4a5c5..4a336ef71c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -581,7 +581,7 @@ public class AMQQueue implements Managable, Comparable /** Removes the AMQMessage from the top of the queue. */ public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException { - _deliveryMgr.removeAMessageFromTop(storeContext); + _deliveryMgr.removeAMessageFromTop(storeContext, this); } /** removes all the messages from the queue. */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 026761a618..d9629a20b5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -330,6 +330,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager deliveryTag, _queue.getMessageCount()); _totalMessageSize.addAndGet(-msg.getSize()); } + + if (!acks) + { + msg.decrementReference(channel.getStoreContext()); + } } finally { @@ -402,11 +407,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * * @throws AMQException */ - public void removeAMessageFromTop(StoreContext storeContext) throws AMQException + public void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException { _lock.lock(); AMQMessage message = _messages.poll(); + + message.dequeue(storeContext, queue); + + message.decrementReference(storeContext); + if (message != null) { _totalMessageSize.addAndGet(-message.getSize()); @@ -429,6 +439,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _messages.poll(); _queue.dequeue(storeContext, msg); + + msg.decrementReference(_reapingStoreContext); + msg = getNextMessage(); count++; } @@ -474,6 +487,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Use the reapingStoreContext as any sub(if we have one) may be in a tx. message.dequeue(_reapingStoreContext, _queue); + message.decrementReference(_reapingStoreContext); + if (_log.isInfoEnabled()) { _log.info(debugIdentity() + " Doing clean up of the main _message queue."); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 10ba48552c..153106d919 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -72,7 +72,7 @@ interface DeliveryManager */ void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException; - void removeAMessageFromTop(StoreContext storeContext) throws AMQException; + void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException; long clearAllMessages(StoreContext storeContext) throws AMQException; diff --git a/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java b/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java index 1ebecbacb6..b8803206e0 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java +++ b/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java @@ -33,11 +33,11 @@ public class RunBrokerWithCommand public static void main(String[] args) { //Start broker - try { + String[] fudge = args.clone(); - String[] fudge = new String[1]; + // Override the first value which is the command we are going to run later. fudge[0] = "-v"; new Main(fudge).startup(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 4c02f266ed..3caf6ad73d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -24,10 +24,13 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.protocol.TestMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -36,6 +39,8 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.mina.common.ByteBuffer; import javax.management.JMException; import java.util.LinkedList; @@ -49,18 +54,16 @@ public class AMQQueueMBeanTest extends TestCase private static long MESSAGE_SIZE = 1000; private AMQQueue _queue; private AMQQueueMBean _queueMBean; - private MessageStore _messageStore = new MemoryMessageStore(); + private MessageStore _messageStore; private StoreContext _storeContext = new StoreContext(); - private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, - null, - new LinkedList<RequiredDeliveryException>(), - new HashSet<Long>()); + private TransactionalContext _transactionalContext; private VirtualHost _virtualHost; + private AMQProtocolSession _protocolSession; - public void testMessageCount() throws Exception + public void testMessageCountTransient() throws Exception { int messageCount = 10; - sendMessages(messageCount); + sendMessages(messageCount, false); assertTrue(_queueMBean.getMessageCount() == messageCount); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; @@ -73,6 +76,43 @@ public class AMQQueueMBeanTest extends TestCase _queueMBean.clearQueue(); assertTrue(_queueMBean.getMessageCount() == 0); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + //Ensure that the data has been removed from the Store + verifyBrokerState(); + } + + public void testMessageCountPersistent() throws Exception + { + int messageCount = 10; + sendMessages(messageCount, true); + assertEquals("", messageCount, _queueMBean.getMessageCount().intValue()); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; + assertTrue(_queueMBean.getQueueDepth() == queueDepth); + + _queueMBean.deleteMessageFromTop(); + assertTrue(_queueMBean.getMessageCount() == (messageCount - 1)); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + _queueMBean.clearQueue(); + assertTrue(_queueMBean.getMessageCount() == 0); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + //Ensure that the data has been removed from the Store + verifyBrokerState(); + } + + // todo: collect to a general testing class -duplicated from Systest/MessageReturntest + private void verifyBrokerState() + { + + TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore()); + + // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. + assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap()); + assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size()); + assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap()); + assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size()); } public void testConsumerCount() throws AMQException @@ -86,26 +126,26 @@ public class AMQQueueMBeanTest extends TestCase AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); protocolSession.addChannel(channel); - _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false); + _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false); assertTrue(_queueMBean.getActiveConsumerCount() == 1); SubscriptionSet _subscribers = (SubscriptionSet) mgr; SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory(); - Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(), - protocolSession, - new AMQShortString("S1"), - false, - null, - true, - _queue); - - Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(), - protocolSession, - new AMQShortString("S2"), - false, - null, - true, - _queue); + Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S1"), + false, + null, + true, + _queue); + + Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S2"), + false, + null, + true, + _queue); _subscribers.addSubscriber(s1); _subscribers.addSubscriber(s2); assertTrue(_queueMBean.getActiveConsumerCount() == 3); @@ -165,7 +205,7 @@ public class AMQQueueMBeanTest extends TestCase } - AMQMessage msg = message(false); + AMQMessage msg = message(false, false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); @@ -184,7 +224,7 @@ public class AMQQueueMBeanTest extends TestCase } } - private AMQMessage message(final boolean immediate) throws AMQException + private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() { @@ -209,9 +249,11 @@ public class AMQQueueMBeanTest extends TestCase return null; } }; - + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes + contentHeaderBody.properties = new BasicContentHeaderProperties(); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1)); return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); } @@ -221,22 +263,38 @@ public class AMQQueueMBeanTest extends TestCase super.setUp(); IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _messageStore = _virtualHost.getMessageStore(); + + _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost); _queueMBean = new AMQQueueMBean(_queue); + + _protocolSession = new TestMinaProtocolSession(); } - private void sendMessages(int messageCount) throws AMQException + private void sendMessages(int messageCount, boolean persistent) throws AMQException { - AMQMessage[] messages = new AMQMessage[messageCount]; - for (int i = 0; i < messages.length; i++) - { - messages[i] = message(false); - messages[i].enqueue(_queue); - messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); - } for (int i = 0; i < messageCount; i++) { - _queue.process(_storeContext, messages[i], false); + AMQMessage currentMessage = message(false, persistent); + currentMessage.enqueue(_queue); + + // route header + currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); + + // Add the body so we have somthing to test later + currentMessage.addContentBodyFrame(_storeContext, + _protocolSession.getRegistry() + .getProtocolVersionMethodConverter() + .convertToContentChunk( + new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), + MESSAGE_SIZE))); + + } } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java new file mode 100644 index 0000000000..48d808142c --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.abstraction.ContentChunk; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.List; + +/** + * Adds some extra methods to the memory message store for testing purposes. + */ +public class TestableMemoryMessageStore extends MemoryMessageStore +{ + + MemoryMessageStore _mms = null; + + public TestableMemoryMessageStore(MemoryMessageStore mms) + { + _mms = mms; + } + + public TestableMemoryMessageStore() + { + _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); + } + + public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() + { + if (_mms != null) + { + return _mms._metaDataMap; + } + else + { + return _metaDataMap; + } + } + + public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() + { + if (_mms != null) + { + return _mms._contentBodyMap; + } + else + { + return _contentBodyMap; + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c118e5eec0..f873801dc9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -72,7 +72,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +99,6 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; - import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -208,14 +206,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * subscriptions between executions of the client. */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = - new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); + new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); /** * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = - new ConcurrentHashMap<BasicMessageConsumer, String>(); + new ConcurrentHashMap<BasicMessageConsumer, String>(); /** * Used to hold incoming messages. @@ -248,11 +246,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * consumer. */ private Map<AMQShortString, BasicMessageConsumer> _consumers = - new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = - new ConcurrentHashMap<Destination, AtomicInteger>(); + new ConcurrentHashMap<Destination, AtomicInteger>(); /** * Used as a source of unique identifiers for producers within the session. @@ -312,15 +310,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. */ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) + MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); _strictAMQPFATAL = - Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); + Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); _immediatePrefetch = - _strictAMQP - || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); + _strictAMQP + || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); _connection = con; _transacted = transacted; @@ -341,31 +339,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = - new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, - new FlowControllingBlockingQueue.ThresholdListener() - { - public void aboveThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.debug( - "Above threshold(" + _defaultPrefetchHighMark - + ") so suspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(true)).start(); - } - } - - public void underThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.debug( - "Below threshold(" + _defaultPrefetchLowMark - + ") so unsuspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(false)).start(); - } - } - }); + new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, + new FlowControllingBlockingQueue.ThresholdListener() + { + public void aboveThreshold(int currentValue) + { + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.debug( + "Above threshold(" + _defaultPrefetchHighMark + + ") so suspending channel. Current value is " + currentValue); + new Thread(new SuspenderRunner(true)).start(); + } + } + + public void underThreshold(int currentValue) + { + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.debug( + "Below threshold(" + _defaultPrefetchLowMark + + ") so unsuspending channel. Current value is " + currentValue); + new Thread(new SuspenderRunner(false)).start(); + } + } + }); } else { @@ -384,10 +382,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, - int defaultPrefetchLow) + int defaultPrefetchLow) { this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, - defaultPrefetchLow); + defaultPrefetchLow); } // ===== JMS Session methods. @@ -442,8 +440,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void acknowledgeMessage(long deliveryTag, boolean multiple) { final AMQFrame ackFrame = - BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, - multiple); + BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + multiple); if (_logger.isDebugEnabled()) { @@ -470,27 +468,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException + final AMQShortString exchangeName) throws AMQException { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException { - public Object execute() throws AMQException, FailoverException - { - AMQFrame queueBind = + AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - arguments, // arguments - exchangeName, // exchange - false, // nowait - queueName, // queue - routingKey, // routingKey - getTicket()); // ticket + arguments, // arguments + exchangeName, // exchange + false, // nowait + queueName, // queue + routingKey, // routingKey + getTicket()); // ticket - getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); - return null; - } - }, _connection).execute(); + return null; + } + }, _connection).execute(); } /** @@ -517,62 +515,58 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_logger.isInfoEnabled()) { _logger.info("Closing session: " + this + ":" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } - - - synchronized(_messageDeliveryLock) - { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_connection.getFailoverMutex()) + synchronized (_messageDeliveryLock) { - - - // Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (_connection.getFailoverMutex()) { - // we pass null since this is not an error case - closeProducersAndConsumers(null); - - try + // Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { + // we pass null since this is not an error case + closeProducersAndConsumers(null); - getProtocolHandler().closeSession(this); + try + { - final AMQFrame frame = - ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText + getProtocolHandler().closeSession(this); - getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + final AMQFrame frame = + ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully. - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error closing session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - // This is ignored because the channel is already marked as closed so the fail-over process will - // not re-open it. - catch (FailoverException e) - { - _logger.debug( - "Got FailoverException during channel close, ignored as channel already marked as closed."); - } - finally - { - _connection.deregisterSession(_channelId); + getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + // This is ignored because the channel is already marked as closed so the fail-over process will + // not re-open it. + catch (FailoverException e) + { + _logger.debug( + "Got FailoverException during channel close, ignored as channel already marked as closed."); + } + finally + { + _connection.deregisterSession(_channelId); + } } } } - } } /** @@ -582,27 +576,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) throws JMSException { - - synchronized(_messageDeliveryLock) - { - synchronized (_connection.getFailoverMutex()) + synchronized (_messageDeliveryLock) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - _closed.set(true); - AMQException amqe; - if (e instanceof AMQException) + synchronized (_connection.getFailoverMutex()) { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + _closed.set(true); + AMQException amqe; + if (e instanceof AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); - } + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); + } } } @@ -637,7 +630,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final AMQProtocolHandler handler = getProtocolHandler(); handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), - TxCommitOkBody.class); + TxCommitOkBody.class); } catch (AMQException e) { @@ -719,12 +712,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException + throws JMSException { checkValidDestination(destination); return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, - messageSelector, null, true, true); + messageSelector, null, true, true); } public MessageConsumer createConsumer(Destination destination) throws JMSException @@ -732,7 +725,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkValidDestination(destination); return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null, - false, false); + false, false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException @@ -740,20 +733,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkValidDestination(destination); return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, - messageSelector, null, false, false); + messageSelector, null, false, false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException + throws JMSException { checkValidDestination(destination); return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, - messageSelector, null, false, false); + messageSelector, null, false, false); } public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, - String selector) throws JMSException + String selector) throws JMSException { checkValidDestination(destination); @@ -761,7 +754,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, - boolean exclusive, String selector) throws JMSException + boolean exclusive, String selector) throws JMSException { checkValidDestination(destination); @@ -769,7 +762,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, - String selector, FieldTable rawSelector) throws JMSException + String selector, FieldTable rawSelector) throws JMSException { checkValidDestination(destination); @@ -777,12 +770,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, - boolean exclusive, String selector, FieldTable rawSelector) throws JMSException + boolean exclusive, String selector, FieldTable rawSelector) throws JMSException { checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false, - false); + false); } public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException @@ -797,7 +790,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (subscriber.getTopic().equals(topic)) { throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); + + name); } else { @@ -825,7 +818,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); + + "for creation durableSubscriber. Requesting queue deletion regardless."); } deleteQueue(dest.getAMQQueueName()); @@ -835,7 +828,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // if the queue is bound to the exchange but NOT for this topic, then the JMS spec // says we must trash the subscription. if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) { deleteQueue(dest.getAMQQueueName()); } @@ -852,7 +845,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Note, currently this does not handle reuse of the same name with different topics correctly. */ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException + throws JMSException { checkNotClosed(); checkValidTopic(topic); @@ -909,13 +902,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) - throws JMSException + throws JMSException { return createProducerImpl(destination, mandatory, immediate); } public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, - boolean waitUntilSent) throws JMSException + boolean waitUntilSent) throws JMSException { return createProducerImpl(destination, mandatory, immediate, waitUntilSent); } @@ -965,28 +958,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Be aware of possible changes to parameter order as versions change. */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive) throws AMQException + final boolean exclusive) throws AMQException { new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException { - public Object execute() throws AMQException, FailoverException - { - AMQFrame queueDeclare = + AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - null, // arguments - autoDelete, // autoDelete - durable, // durable - exclusive, // exclusive - false, // nowait - false, // passive - name, // queue - getTicket()); // ticket - - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); - - return null; - } - }, _connection).execute(); + null, // arguments + autoDelete, // autoDelete + durable, // durable + exclusive, // exclusive + false, // nowait + false, // passive + name, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + + return null; + } + }, _connection).execute(); } /** @@ -1279,8 +1272,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_logger.isDebugEnabled()) { _logger.debug("Message[" - + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody())) - + "] received in session with channel id " + _channelId); + + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody())) + + "] received in session with channel id " + _channelId); } if (message.getDeliverBody() == null) @@ -1354,15 +1347,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue + getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); } else { _connection.getProtocolHandler().syncWrite( - BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue - , BasicRecoverOkBody.class); + BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue + , BasicRecoverOkBody.class); } if (!isSuspended) @@ -1412,8 +1405,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } AMQFrame basicRejectBody = - BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, - requeue); + BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + requeue); _connection.getProtocolHandler().writeFrame(basicRejectBody); } @@ -1453,7 +1446,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); if (!isSuspended) { @@ -1532,7 +1525,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." - + " Requesting queue deletion regardless."); + + " Requesting queue deletion regardless."); } deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); @@ -1553,8 +1546,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, - final boolean noConsume, final boolean autoClose) throws JMSException + final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean noConsume, final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); @@ -1597,9 +1590,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } BasicMessageConsumer consumer = - new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, - _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, - exclusive, _acknowledgeMode, noConsume, autoClose); + new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, + _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, + exclusive, _acknowledgeMode, noConsume, autoClose); if (_messageListener != null) { @@ -1619,7 +1612,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (AMQInvalidRoutingKeyException e) { JMSException ide = - new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); + new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); ide.setLinkedException(e); throw ide; } @@ -1705,26 +1698,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Be aware of possible changes to parameter order as versions change. */ boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) - throws JMSException + throws JMSException { try { AMQMethodEvent response = - new FailoverRetrySupport<AMQMethodEvent, AMQException>( - new FailoverProtectedOperation<AMQMethodEvent, AMQException>() - { - public AMQMethodEvent execute() throws AMQException, FailoverException - { - AMQFrame boundFrame = - ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), - getProtocolMinorVersion(), exchangeName, // exchange - queueName, // queue - routingKey); // routingKey + new FailoverRetrySupport<AMQMethodEvent, AMQException>( + new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + AMQFrame boundFrame = + ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), + getProtocolMinorVersion(), exchangeName, // exchange + queueName, // queue + routingKey); // routingKey - return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); - } - }, _connection).execute(); + } + }, _connection).execute(); // Extract and return the response code from the query. ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); @@ -1794,9 +1787,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - synchronized void startDistpatcherIfNecessary() + void startDistpatcherIfNecessary() { + //If we are the dispatcher then we don't need to check we are started + if (Thread.currentThread() == _dispatcher) + { + return; + } + // If IMMEDIATE_PREFETCH is not set then we need to start fetching + // This is final per session so will be multi-thread safe. if (!_immediatePrefetch) { // We do this now if this is the first call on a started connection @@ -1933,14 +1933,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this)) { throw new javax.jms.InvalidDestinationException( - "Cannot create a subscription on a temporary topic created in another session"); + "Cannot create a subscription on a temporary topic created in another session"); } if (!(topic instanceof AMQTopic)) { throw new javax.jms.InvalidDestinationException( - "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " - + topic.getClass().getName()); + "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + + topic.getClass().getName()); } return (AMQTopic) topic; @@ -2040,7 +2040,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param queueName */ private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { // need to generate a consumer tag on the client so we can exploit the nowait flag AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); @@ -2069,14 +2069,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame jmsConsume = - BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - getTicket()); // ticket + BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + getTicket()); // ticket if (nowait) { @@ -2096,13 +2096,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) - throws JMSException + throws JMSException { return createProducerImpl(destination, mandatory, immediate, false); } private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent) throws JMSException + final boolean immediate, final boolean waitUntilSent) throws JMSException { return new FailoverRetrySupport<BasicMessageProducer, JMSException>( new FailoverProtectedOperation<BasicMessageProducer, JMSException>() @@ -2112,8 +2112,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotClosed(); long producerId = getNextProducerId(); BasicMessageProducer producer = - new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, - AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); + new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, + AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); registerProducer(producerId, producer); return producer; @@ -2141,29 +2141,29 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException + final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException { new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException { - public Object execute() throws AMQException, FailoverException - { - AMQFrame exchangeDeclare = + AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - nowait, // nowait - false, // passive - getTicket(), // ticket - type); // type - - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); - - return null; - } - }, _connection).execute(); + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + nowait, // nowait + false, // passive + getTicket(), // ticket + type); // type + + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + + return null; + } + }, _connection).execute(); } /** @@ -2188,7 +2188,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Be aware of possible changes to parameter order as versions change. */ private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) - throws AMQException + throws AMQException { /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ return new FailoverNoopSupport<AMQShortString, AMQException>( @@ -2203,15 +2203,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } AMQFrame queueDeclare = - QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - false, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - getTicket()); // ticket + QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + false, // nowait + false, // passive + amqd.getAMQQueueName(), // queue + getTicket()); // ticket protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); @@ -2236,22 +2236,22 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException { - public Object execute() throws AMQException, FailoverException - { - AMQFrame queueDeleteFrame = + AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - getTicket()); // ticket + false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + getTicket()); // ticket - getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); - return null; - } - }, _connection).execute(); + return null; + } + }, _connection).execute(); } catch (AMQException e) { @@ -2370,7 +2370,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { suspendChannel(true); _logger.info( - "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); + "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); } catch (AMQException e) { @@ -2419,7 +2419,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_logger.isInfoEnabled()) { _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" - + requeue); + + requeue); if (messages.hasNext()) { @@ -2439,7 +2439,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_logger.isDebugEnabled()) { _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" - + message.getDeliverBody().deliveryTag); + + message.getDeliverBody().deliveryTag); } messages.remove(); @@ -2480,44 +2480,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void returnBouncedMessage(final UnprocessedMessage message) { _connection.performConnectionTask(new Runnable() + { + public void run() { - public void run() + try { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange, - message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies()); + message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); - AMQShortString reason = message.getBounceBody().replyText; - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); - } - else - { - _connection.exceptionReceived( - new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } + AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); + AMQShortString reason = message.getBounceBody().replyText; + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); } - catch (Exception e) + else if (errorCode == AMQConstant.NO_ROUTE) { - _logger.error( + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); + } + else + { + _connection.exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } + + } + catch (Exception e) + { + _logger.error( "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); - } } - }); + } + }); } /** @@ -2544,8 +2544,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _suspended = suspend; AMQFrame channelFlowFrame = - ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - !suspend); + ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + !suspend); _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); } @@ -2735,7 +2735,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_dispatcherLogger.isDebugEnabled()) { _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") - + ": Currently " + (currently ? "Stopped" : "Started")); + + ": Currently " + (currently ? "Stopped" : "Started")); } } @@ -2747,7 +2747,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (message.getDeliverBody() != null) { final BasicMessageConsumer consumer = - (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); + (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); if ((consumer == null) || consumer.isClosed()) { @@ -2756,14 +2756,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().deliveryTag + "] from queue " - + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)..."); + + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)..."); } else { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().deliveryTag + "] from queue " + " consumer(" - + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); + + message.getDeliverBody().deliveryTag + "] from queue " + " consumer(" + + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); } } // Don't reject if we're already closing diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index dfac0d45a8..014fd36414 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -240,15 +240,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (messageListener != null) { - // handle case where connection has already been started, and the dispatcher has alreaded started + //todo: handle case where connection has already been started, and the dispatcher has alreaded started // putting values on the _synchronousQueue - synchronized (_session) - { _messageListener.set(messageListener); _session.setHasMessageListeners(); _session.startDistpatcherIfNecessary(); - } } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 140eeaf2bb..1d0d6a3491 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -149,19 +149,21 @@ public class TransportConnection { int port = details.getPort(); - if (!_inVmPipeAddress.containsKey(port)) + synchronized (_inVmPipeAddress) { - if (AutoCreate) + if (!_inVmPipeAddress.containsKey(port)) { - createVMBroker(port); - } - else - { - throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port - + " does not exist. Auto create disabled.", null); + if (AutoCreate) + { + createVMBroker(port); + } + else + { + throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port + + " does not exist. Auto create disabled.", null); + } } } - return new VmPipeTransportConnection(port); } @@ -176,69 +178,71 @@ public class TransportConnection config.setThreadModel(ReadWriteThreadModel.getInstance()); } - if (!_inVmPipeAddress.containsKey(port)) + synchronized (_inVmPipeAddress) { - _logger.info("Creating InVM Qpid.AMQP listening on port " + port); - IoHandlerAdapter provider = null; - try + if (!_inVmPipeAddress.containsKey(port)) { - VmPipeAddress pipe = new VmPipeAddress(port); - - provider = createBrokerInstance(port); - - _acceptor.bind(pipe, provider); - - _inVmPipeAddress.put(port, pipe); - _logger.info("Created InVM Qpid.AMQP listening on port " + port); - } - catch (IOException e) - { - _logger.error("Got IOException.", e); - - // Try and unbind provider + _logger.info("Creating InVM Qpid.AMQP listening on port " + port); + IoHandlerAdapter provider = null; try { VmPipeAddress pipe = new VmPipeAddress(port); - try - { - _acceptor.unbind(pipe); - } - catch (Exception ignore) - { - // ignore - } - - if (provider == null) - { - provider = createBrokerInstance(port); - } + provider = createBrokerInstance(port); _acceptor.bind(pipe, provider); + _inVmPipeAddress.put(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } - catch (IOException justUseFirstException) + catch (IOException e) { - String because; - if (e.getCause() == null) + _logger.error("Got IOException.", e); + + // Try and unbind provider + try { - because = e.toString(); + VmPipeAddress pipe = new VmPipeAddress(port); + + try + { + _acceptor.unbind(pipe); + } + catch (Exception ignore) + { + // ignore + } + + if (provider == null) + { + provider = createBrokerInstance(port); + } + + _acceptor.bind(pipe, provider); + _inVmPipeAddress.put(port, pipe); + _logger.info("Created InVM Qpid.AMQP listening on port " + port); } - else + catch (IOException justUseFirstException) { - because = e.getCause().toString(); - } + String because; + if (e.getCause() == null) + { + because = e.toString(); + } + else + { + because = e.getCause().toString(); + } - throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); + throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); + } } } + else + { + _logger.info("InVM Qpid.AMQP on port " + port + " already exits."); + } } - else - { - _logger.info("InVM Qpid.AMQP on port " + port + " already exits."); - } - } private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException @@ -285,25 +289,29 @@ public class TransportConnection { _logger.info("Killing all VM Brokers"); _acceptor.unbindAll(); - - Iterator keys = _inVmPipeAddress.keySet().iterator(); - - while (keys.hasNext()) + synchronized (_inVmPipeAddress) { - int id = (Integer) keys.next(); - _inVmPipeAddress.remove(id); - } + Iterator keys = _inVmPipeAddress.keySet().iterator(); + while (keys.hasNext()) + { + int id = (Integer) keys.next(); + _inVmPipeAddress.remove(id); + } + } } public static void killVMBroker(int port) { - VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); - if (pipe != null) + synchronized (_inVmPipeAddress) { - _logger.info("Killing VM Broker:" + port); - _inVmPipeAddress.remove(port); - _acceptor.unbind(pipe); + VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); + if (pipe != null) + { + _logger.info("Killing VM Broker:" + port); + _inVmPipeAddress.remove(port); + _acceptor.unbind(pipe); + } } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java index ffc7be82f0..55654fa522 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java @@ -68,6 +68,7 @@ public class LargeMessageTest extends TestCase protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killAllVMBrokers(); } private void init(AMQConnection connection) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java index 3aefc098aa..a3d0cf6dcd 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java @@ -48,6 +48,7 @@ public class PubSubTwoConnectionTest extends TestCase protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killAllVMBrokers(); } /** diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index 6753ed48e4..559e9a4741 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -125,6 +125,7 @@ public class ChannelCloseOkTest extends TestCase protected void tearDown() throws Exception { closeConnection(); + TransportConnection.killAllVMBrokers(); super.tearDown(); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java index daa1086561..1d108b9c5c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java @@ -20,8 +20,11 @@ */ package org.apache.qpid.test.unit.client.connection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; import javax.jms.JMSException; import javax.jms.Message; @@ -30,14 +33,20 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -import junit.framework.TestCase; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.transport.TransportConnection; - +/** + * ConnectionStartTest: + * This test verifies that a fresh connection is not started and no messages are delivered until the connection is + * started. + * + * After the connection is started then the message should be there, and the connection started. + * + * This Test verifies that using receive() and a messageListener does not cause message delivery before start is called. + * + */ public class ConnectionStartTest extends TestCase { @@ -54,11 +63,18 @@ public class ConnectionStartTest extends TestCase try { + // Create Consumer Connection + _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); - AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + Queue queue = _consumerSess.createQueue("ConnectionStartTest"); + + _consumer = _consumerSess.createConsumer(queue); - AMQQueue queue = new AMQQueue(pubCon,"ConnectionStartTest"); + + // Create Producer Connection to send message + AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test"); Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); @@ -66,12 +82,6 @@ public class ConnectionStartTest extends TestCase pub.send(pubSess.createTextMessage("Initial Message")); - _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test"); - - _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); - - _consumer = _consumerSess.createConsumer(queue); - pubCon.close(); } @@ -85,6 +95,7 @@ public class ConnectionStartTest extends TestCase { _connection.close(); TransportConnection.killVMBroker(1); + super.tearDown(); } public void testSimpleReceiveConnection() @@ -94,9 +105,9 @@ public class ConnectionStartTest extends TestCase assertTrue("Connection should not be started", !_connection.started()); //Note that this next line will start the dispatcher in the session // should really not be called before _connection start - assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null); + assertNull("There should not be messages waiting for the consumer", _consumer.receiveNoWait()); _connection.start(); - assertTrue("There should be messages waiting for the consumer", _consumer.receive(1000) == null); + assertNotNull("There should be messages waiting for the consumer", _consumer.receive(1000)); assertTrue("Connection should be started", _connection.started()); } @@ -131,7 +142,11 @@ public class ConnectionStartTest extends TestCase } }); + // Ensure that setting a ML doesn't start the connection assertTrue("Connection should not be started", !_connection.started()); + // Ensure that the message wasn't delivered while the connection was stopped. + assertEquals("Message latch should still be set",1,_gotMessage.getCount()); + _connection.start(); assertTrue("Connection should be started", _connection.started()); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 1a45773907..6db27d6be0 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -434,6 +434,13 @@ public class CommitRollbackTest extends TestCase verifyMessages(_consumer.receive(1000)); } + /** + * This test sends two messages receives on of them but doesn't ack it. + * The consumer is then closed + * the first message should be returned as redelivered. + * the second message should be delivered normally. + * @throws Exception + */ public void testSend2ThenCloseAfter1andTryAgain() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 3438770450..46b10b5963 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -711,7 +711,10 @@ public class FieldTable if (trace) { _logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); - _logger.trace(_properties.toString()); + if (_properties != null) + { + _logger.trace(_properties.toString()); + } } EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize()); diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index bd34fd8f20..1e01f6bc8e 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -885,24 +885,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti synchronized (_sendPauseMonitor)
{
if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
- // && (_sendPauseBarrier.getNumberWaiting() == 1))
{
- // log.debug("unreceived size estimate under limit = " + unreceivedSize);
-
- // Wait on the send pause barrier for the limit to be re-established.
- /*try
- {*/
- // _sendPauseBarrier.await();
_sendPauseMonitor.notify();
- /*}
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
- }*/
}
}
@@ -1159,12 +1143,23 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // If necessary, wait until the max pending message size comes within its limit.
synchronized (_sendPauseMonitor)
{
+ // Used to keep track of the number of times that send has to wait.
+ int numWaits = 0;
+
+ // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
+ // the test timeout.
+ int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
+
while ((_maxPendingSize > 0))
{
// Get the size estimate of sent but not yet received messages.
int unreceived = _unreceived.get();
int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+ // log.debug("unreceived = " + unreceived);
+ // log.debug("unreceivedSize = " + unreceivedSize);
+ // log.debug("_maxPendingSize = " + _maxPendingSize);
+
if (unreceivedSize > _maxPendingSize)
{
// log.debug("unreceived size estimate over limit = " + unreceivedSize);
@@ -1172,8 +1167,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Wait on the send pause barrier for the limit to be re-established.
try
{
- // _sendPauseBarrier.await();
- _sendPauseMonitor.wait(1000);
+ _sendPauseMonitor.wait(10000);
+ numWaits++;
}
catch (InterruptedException e)
{
@@ -1181,10 +1176,17 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
- /*catch (BrokenBarrierException e)
+
+ // Fail the test if the send has had to wait more than the maximum allowed number of times.
+ if (numWaits >= waitLimit)
{
- throw new RuntimeException(e);
- }*/
+ String errorMessage =
+ "Send has had to wait for the unreceivedSize (" + unreceivedSize
+ + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
+ + " times.";
+ log.warn(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
}
else
{
diff --git a/java/pom.xml b/java/pom.xml index 547b5d12ec..ee02b4a362 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -506,7 +506,7 @@ under the License. <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> - <version>1.0</version> + <version>1.4.3</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> diff --git a/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java b/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java new file mode 100644 index 0000000000..a25af30008 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java @@ -0,0 +1,211 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.failure; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import java.util.Random; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * DeadlockTestCase: + * From a client requirement. + * + * The JMS Spec specifies that a Session has a single thread of control. And as such setting message listeners from a + * second thread is not allowed. + * Section 4.4.6 of the Spec states: + <quote>Another consequence is that a connection must be in stopped mode to set up a +session with more than one message listener. The reason is that when a +connection is actively delivering messages, once the first message listener for a +session has been registered, the session is now controlled by the thread of +control that delivers messages to it. At this point a client thread of control +cannot be used to further configure the session.</quote> + * + * It, however, does not specified what we should do in the case. it only states: + <quote>Once a connection has been started, all its sessions with a registered message +listener are dedicated to the thread of control that delivers messages to them. It +is erroneous for client code to use such a session from another thread of +control. The only exception to this is the use of the session or connection close +method.</quote> + * + * While it may be erroneous the causing a Deadlock is not a very satisfactory solution. This test ensures that we do + * no do this. There is no technical reason we cannot currently allow the setting of a messageListener on a new consumer. + * The only caveate is due to QPID-577 there is likely to be temporary message 'loss'. As they are stuck on the internal + * _synchronousQueue pending a synchronous receive. + * + */ +public class DeadlockTest extends TestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(DeadlockTest.class); + + + public static final String QPID_BROKER_CONNECTION_PROPERTY = "QPIDBROKER"; + + private String topic1 = "TEST.DeadLock1.TMP"; + private String topic2 = "TEST.DeadLock2.TMP"; + + private Session sess; + + private Semaphore s = new Semaphore(0); + private final String LOCAL = "tcp://localhost:5670"; + private final String VM = "vm://:1"; + + private String BROKER = VM; + + String connectionString = System.getProperty(QPID_BROKER_CONNECTION_PROPERTY, + "amqp://guest:guest@/test?brokerlist='" + BROKER + "'"); + + + public void setUp() throws AMQVMBrokerCreationException + { + if (BROKER.equals("vm://:1")) + { + TransportConnection.createVMBroker(1); + } + } + + public void tearDown() throws AMQVMBrokerCreationException + { + if (BROKER.equals("vm://:1")) + { + TransportConnection.killAllVMBrokers(); + } + } + + public class EmptyMessageListener implements javax.jms.MessageListener + { + public void onMessage(Message message) + { + // do nothing + } + } + + public void setSessionListener(String topic, javax.jms.MessageListener listener) + { + try + { + Topic jmsTopic = sess.createTopic(topic); + MessageConsumer subscriber = sess.createConsumer(jmsTopic); + subscriber.setMessageListener(listener); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("Caught JMSException"); + } + } + + public class TestMessageListener implements javax.jms.MessageListener + { + public Random r = new Random(); + + public void onMessage(Message message) + { + if (r.nextBoolean()) + { + setSessionListener(topic2, new EmptyMessageListener()); + } + } + + } + + public void testDeadlock() throws InterruptedException, URLSyntaxException, JMSException + { + // in order to trigger the deadlock we need to + // set a message listener from one thread + // whilst receiving a message on another thread and on that thread also setting (the same?) message listener + AMQConnectionFactory acf = new AMQConnectionFactory(connectionString); + Connection conn = acf.createConnection(); + conn.start(); + sess = conn.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + setSessionListener(topic1, new TestMessageListener()); + + + Thread th = new Thread() + { + public void run() + { + try + { + Topic jmsTopic = sess.createTopic(topic1); + MessageProducer producer = sess.createProducer(jmsTopic); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + Random r = new Random(); + long end = System.currentTimeMillis() + 2000; + while (end - System.currentTimeMillis() > 0) + { + if (r.nextBoolean()) + { + _logger.info("***************** send message"); + Message jmsMessage = sess.createTextMessage(""); + producer.send(jmsMessage); + } + else + { + _logger.info("***************** set session listener"); + setSessionListener(topic2, new EmptyMessageListener()); + } + Thread.yield(); + } + _logger.info("done sends"); + s.release(); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("Caught JMSException"); + } + } + }; + th.setDaemon(true); + th.setName("testDeadlock"); + th.start(); + + boolean success = s.tryAcquire(1, 4, TimeUnit.SECONDS); + + // if we failed, closing the connection will just hang the test case. + if (success) + { + conn.close(); + } + + if (!success) + { + fail("Deadlock ocurred"); + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java new file mode 100644 index 0000000000..45bf73bd9d --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java @@ -0,0 +1,315 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + + +/** + * Test Case to ensure that messages are correctly returned. + * This includes checking: + * - The message is returned. + * - The broker doesn't leak memory. + * - The broker's state is correct after test. + */ +public class MessageReturnTest extends TestCase implements ExceptionListener +{ + private static final Logger _logger = Logger.getLogger(MessageReturnTest.class); + + + protected final String BROKER = "vm://:1"; + protected final String VHOST = "test"; + protected final String QUEUE = "MessageReturnTest"; + protected final String BADQUEUE = "MessageReturnTest-bad-to-force-returns"; + + + private Context _context; + + private Connection _producerConnection; + + private MessageProducer _producer; + private Session _clientSession, _producerSession; + private static final int MSG_COUNT = 50; + + private Message[] _messages = new Message[MSG_COUNT]; + + private CountDownLatch _returns = new CountDownLatch(1); + private int _receivedCount = 0; + private int _initialContentBodyMapSize; + private int _initilaMessageMetaDataMapSize; + + protected void setUp() throws Exception + { + if (BROKER.startsWith("vm://")) + { + TransportConnection.createVMBroker(1); + } + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID/" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + env.put("queue.badQueue", QUEUE); + + _context = factory.getInitialContext(env); + + getBrokerInitialState(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + + if (_producerConnection != null) + { + _producerConnection.close(); + } + + if (BROKER.startsWith("vm://")) + { + TransportConnection.killAllVMBrokers(); + } + } + + public void test() throws Exception + { + init(); + //Send Msgs + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg)); + } + + try + { + // Wait for all returns to arrive any longer than 5secs and something has gone wrong. + _returns.await(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + + } + + //Close the connection.. .giving the broker time to clean up its state. + _producerConnection.close(); + + //Verify we get all the messages. + verifyAllMessagesRecevied(); + //Verify Broker state + verifyBrokerState(); + } + + private void init() throws NamingException, JMSException + { + _receivedCount = 0; + _messages = new Message[MSG_COUNT]; + _returns = new CountDownLatch(1); + + //Create Producer + _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _producerConnection.setExceptionListener(this); + + _producerConnection.start(); + + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producer = _producerSession.createProducer((Queue) _context.lookup("badQueue")); + } + + // todo: collect to a general testing class - duplicated in AMQQueueMBeanTest + private void getBrokerInitialState() + { + IApplicationRegistry registry = ApplicationRegistry.getInstance(); + + VirtualHost testVhost = registry.getVirtualHostRegistry().getVirtualHost(VHOST); + + assertNotNull("Unable to get test Vhost", testVhost.getMessageStore()); + + TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) testVhost.getMessageStore()); + + _initialContentBodyMapSize = store.getContentBodyMap() == null ? 0 : store.getContentBodyMap().size(); + _initilaMessageMetaDataMapSize = store.getMessageMetaDataMap() == null ? 0 : store.getMessageMetaDataMap().size(); + + if (_initialContentBodyMapSize != 0) + { + _logger.warn("Store is dirty: ContentBodyMap has Size:" + _initialContentBodyMapSize); + System.out.println("Store is dirty: ContentBodyMap has Size:" + _initialContentBodyMapSize); + } + + if (_initilaMessageMetaDataMapSize != 0) + { + _logger.warn("Store is dirty: MessageMetaDataMap has Size:" + _initilaMessageMetaDataMapSize); + System.out.println("Store is dirty: MessageMetaDataMap has Size:" + _initilaMessageMetaDataMapSize); + } + + } + + private void verifyBrokerState() + { + IApplicationRegistry registry = ApplicationRegistry.getInstance(); + + VirtualHost testVhost = registry.getVirtualHostRegistry().getVirtualHost(VHOST); + + assertNotNull("Unable to get test Vhost", testVhost.getMessageStore()); + + TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) testVhost.getMessageStore()); + + + assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap()); + + // If the CBM has content it may be due to the broker not yet purging. + // Closing the producer connection before testing should give the store time to clean up. + // Perform a quick sleep just in case + while (store.getContentBodyMap().size() > _initialContentBodyMapSize) + { + try + { + Thread.sleep(500); + } + catch (InterruptedException e) + { + } + } + assertTrue("Expected the store content size not reached at test start it was :" + _initialContentBodyMapSize + " Now it is :" + store.getContentBodyMap().size(), _initialContentBodyMapSize >= store.getContentBodyMap().size()); + assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap()); + assertTrue("Expected the store MessageMetaData size not reached at test start it was :" + _initilaMessageMetaDataMapSize + " Now it is :" + store.getMessageMetaDataMap().size(), _initialContentBodyMapSize >= store.getMessageMetaDataMap().size()); + } + + private void verifyAllMessagesRecevied() + { + + boolean[] msgIdRecevied = new boolean[MSG_COUNT]; + + int msgId = 0; + + //Check received messages + for (Message msg : _messages) + { + assertNotNull("Missing message:" + msgId, msg); + assertFalse("Already received msg id " + msgId, msgIdRecevied[msgId]); + msgIdRecevied[msgId] = true; + msgId++; + } + + //Check all recevied + for (msgId = 0; msgId < MSG_COUNT; msgId++) + { + assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]); + } + } + + /** + * We can't verify messageOrder here as the return threads are not synchronized so we have no way of + * guarranting the order. + */ + private void verifyMessageOrder() + { + int msgId = 0; + for (Message msg : _messages) + { + assertNotNull("Missing message:" + msgId, msg); + try + { + assertEquals("Message not received in correct order", msgId, msg.getIntProperty("ID")); + } + catch (JMSException e) + { + fail("Unable to get messageID for msg:" + msg); + } + + msgId++; + } + } + + /** + * Get the next message putting the given count into the intProperties as ID. + * + * @param msgNo the message count to store as ID. + * @return + * @throws JMSException + */ + + private Message nextMessage(int msgNo) throws JMSException + { + Message send = _producerSession.createTextMessage("MessageReturnTest"); + send.setIntProperty("ID", msgNo); + return send; + } + + + public void onException(JMSException jmsException) + { + // NOTE: + // This method MUST be thread-safe. Mulitple threads can call this at once. + synchronized (this) + { + if (jmsException.getLinkedException() instanceof AMQNoRouteException) + { + AMQNoRouteException amq = (AMQNoRouteException) jmsException.getLinkedException(); + + Message msg = (Message) amq.getUndeliveredMessage(); + + if (_receivedCount < MSG_COUNT) + { + assertNotNull("Reeceived Null message:" + _receivedCount, msg); + _messages[_receivedCount] = msg; + _receivedCount++; + } + else + { + fail("Received to many messages expected :" + MSG_COUNT + " received: " + _receivedCount + 1); + } + + if (_receivedCount == MSG_COUNT) + { + _returns.countDown(); + } + } + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 79d428fee8..48d808142c 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -33,6 +33,14 @@ import java.util.List; */ public class TestableMemoryMessageStore extends MemoryMessageStore { + + MemoryMessageStore _mms = null; + + public TestableMemoryMessageStore(MemoryMessageStore mms) + { + _mms = mms; + } + public TestableMemoryMessageStore() { _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); @@ -41,11 +49,25 @@ public class TestableMemoryMessageStore extends MemoryMessageStore public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() { - return _metaDataMap; + if (_mms != null) + { + return _mms._metaDataMap; + } + else + { + return _metaDataMap; + } } public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() { - return _contentBodyMap; + if (_mms != null) + { + return _mms._contentBodyMap; + } + else + { + return _contentBodyMap; + } } } |
