From 58fb658d72dd0a0d750fdd3f97e607cf3d2e9134 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 15 Jul 2013 23:32:24 +0000 Subject: QPID-4659 : [Java Broker] tidy up amqp 0-8 implementation, reduce unnecessary usage in tests git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503523 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/ack/AcknowledgeTest.java | 180 ----------------- .../qpid/server/exchange/TopicExchangeTest.java | 221 ++++++--------------- .../logging/actors/AMQPChannelActorTest.java | 4 +- .../actors/BaseConnectionActorTestCase.java | 24 ++- .../server/logging/actors/CurrentActorTest.java | 8 +- .../qpid/server/logging/actors/QueueActorTest.java | 2 +- .../logging/actors/SubscriptionActorTest.java | 2 +- .../logging/subjects/AbstractTestLogSubject.java | 4 +- .../logging/subjects/ChannelLogSubjectTest.java | 13 +- .../logging/subjects/ConnectionLogSubjectTest.java | 37 ++-- .../subjects/SubscriptionLogSubjectTest.java | 117 ----------- .../server/protocol/AMQProtocolEngineTest.java | 73 ------- .../qpid/server/protocol/v0_8/AMQChannelTest.java | 1 - .../protocol/v0_8/AMQProtocolEngineTest.java | 72 +++++++ .../apache/qpid/server/protocol/v0_8/AckTest.java | 4 +- .../qpid/server/protocol/v0_8/AcknowledgeTest.java | 181 +++++++++++++++++ .../server/protocol/v0_8/BrokerTestHelper_0_8.java | 99 +++++++++ .../qpid/server/protocol/v0_8/MaxChannelsTest.java | 4 +- .../protocol/v0_8/QueueBrowserUsesNoAckTest.java | 149 ++++++++++++++ .../protocol/v0_8/SubscriptionFactoryImplTest.java | 96 +++++++++ .../protocol/v0_8/SubscriptionLogSubjectTest.java | 115 +++++++++++ .../qpid/server/queue/SimpleAMQQueueTest.java | 1 + .../subscription/QueueBrowserUsesNoAckTest.java | 147 -------------- .../subscription/SubscriptionFactoryImplTest.java | 98 --------- .../apache/qpid/server/util/BrokerTestHelper.java | 70 ++----- 25 files changed, 846 insertions(+), 876 deletions(-) delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolEngineTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionLogSubjectTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java (limited to 'qpid/java/broker/src/test') diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java deleted file mode 100644 index 0e4dbe87a3..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.ack; - - -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -import java.util.List; - -public class AcknowledgeTest extends QpidTestCase -{ - private AMQChannel _channel; - private SimpleAMQQueue _queue; - private MessageStore _messageStore; - private String _queueName; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(); - VirtualHost virtualHost = _channel.getVirtualHost(); - _queueName = getTestName(); - _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); - _messageStore = virtualHost.getMessageStore(); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - private AMQChannel getChannel() - { - return _channel; - } - - private InternalTestProtocolSession getSession() - { - return (InternalTestProtocolSession)_channel.getProtocolSession(); - } - - private SimpleAMQQueue getQueue() - { - return _queue; - } - - public void testTransactionalSingleAck() throws AMQException - { - getChannel().setLocalTransactional(); - runMessageAck(1, 1, 1, false, 0); - } - - public void testTransactionalMultiAck() throws AMQException - { - getChannel().setLocalTransactional(); - runMessageAck(10, 1, 5, true, 5); - } - - public void testTransactionalAckAll() throws AMQException - { - getChannel().setLocalTransactional(); - runMessageAck(10, 1, 0, true, 0); - } - - public void testNonTransactionalSingleAck() throws AMQException - { - runMessageAck(1, 1, 1, false, 0); - } - - public void testNonTransactionalMultiAck() throws AMQException - { - runMessageAck(10, 1, 5, true, 5); - } - - public void testNonTransactionalAckAll() throws AMQException - { - runMessageAck(10, 1, 0, true, 0); - } - - protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int remainingUnackedMessages) throws AMQException - { - //Check store is empty - checkStoreContents(0); - - //Send required messsages to the queue - BrokerTestHelper.publishMessages(getChannel(), sendMessageCount, _queueName, ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); - - if (getChannel().isTransactional()) - { - getChannel().commit(); - } - - //Ensure they are stored - checkStoreContents(sendMessageCount); - - //Check that there are no unacked messages - assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); - - //Subscribe to the queue - AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true); - - getQueue().deliverAsync(); - - //Wait for the messages to be delivered - getSession().awaitDelivery(sendMessageCount); - - //Check that they are all waiting to be acknoledged - assertEquals("Channel should have unacked msgs", sendMessageCount, getChannel().getUnacknowledgedMessageMap().size()); - - List messages = getSession().getDelivers(getChannel().getChannelId(), subscriber, sendMessageCount); - - //Double check we received the right number of messages - assertEquals(sendMessageCount, messages.size()); - - //Check that the first message has the expected deliveryTag - assertEquals("First message does not have expected deliveryTag", firstDeliveryTag, messages.get(0).getDeliveryTag()); - - //Send required Acknowledgement - getChannel().acknowledgeMessage(acknowledgeDeliveryTag, acknowldegeMultiple); - - if (getChannel().isTransactional()) - { - getChannel().commit(); - } - - // Check Remaining Acknowledgements - assertEquals("Channel unacked msgs count incorrect", remainingUnackedMessages, getChannel().getUnacknowledgedMessageMap().size()); - - //Check store contents are also correct. - checkStoreContents(remainingUnackedMessages); - } - - private void checkStoreContents(int messageCount) - { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index b2d7237737..f2a64381df 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -20,33 +20,31 @@ */ package org.apache.qpid.server.exchange; +import java.util.List; import junit.framework.Assert; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TopicExchangeTest extends QpidTestCase { private TopicExchange _exchange; private VirtualHost _vhost; - private MessageStore _store; @Override @@ -56,7 +54,6 @@ public class TopicExchangeTest extends QpidTestCase BrokerTestHelper.setUp(); _exchange = new TopicExchange(); _vhost = BrokerTestHelper.createVirtualHost(getName()); - _store = new TestMemoryMessageStore(); } @Override @@ -82,8 +79,7 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b"); - routeMessage(message); + routeMessage("a.b", 0l); Assert.assertEquals(0, queue.getMessageCount()); } @@ -94,21 +90,16 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b"); - - routeMessage(message); + routeMessage("a.b",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c",1l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -121,34 +112,26 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b"); - - routeMessage(message); + routeMessage("a.b",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.c"); - - int queueCount = routeMessage(message); + routeMessage("a.c",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a"); - - - queueCount = routeMessage(message); + int queueCount = routeMessage("a",2l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -160,57 +143,45 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b.c"); - - int queueCount = routeMessage(message); + routeMessage("a.b.c",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a.b"); - - queueCount = routeMessage(message); + routeMessage("a.b",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.c"); - - queueCount = routeMessage(message); + routeMessage("a.c",2l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 2l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a"); - - queueCount = routeMessage(message); + routeMessage("a",3l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 3l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("b"); - - - queueCount = routeMessage(message); + int queueCount = routeMessage("b", 4l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -222,25 +193,20 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.c.d.b"); - - routeMessage(message); + routeMessage("a.c.d.b",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.c.b"); - - routeMessage(message); + routeMessage("a.c.b",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -253,39 +219,31 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null)); - IncomingMessage message = createMessage("a.c.b.b"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c.b.b",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.a.b.c"); - - routeMessage(message); + routeMessage("a.a.b.c",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.b.c.b"); - - queueCount = routeMessage(message); + queueCount = routeMessage("a.b.c.b",2l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.b.c.b.c"); - - routeMessage(message); + routeMessage("a.b.c.b.c",3l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 3l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -298,22 +256,16 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.c.b.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c.b.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a.a.b.c.d"); - - routeMessage(message); + routeMessage("a.a.b.c.d",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -325,21 +277,16 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.c.b.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c.b.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.a.b.c.d"); - - routeMessage(message); + routeMessage("a.a.b.c.d",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -351,29 +298,30 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } - private int routeMessage(final IncomingMessage message) - throws AMQException + private int routeMessage(String routingKey, long messageNumber) throws AMQException { - MessageMetaData mmd = message.headersReceived(System.currentTimeMillis()); - message.setStoredMessage(_store.addMessage(mmd)); - - message.enqueue(_exchange.route(message)); - AMQMessage msg = new AMQMessage(message.getStoredMessage()); - for(BaseQueue q : message.getDestinationQueues()) + InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getRoutingKey()).thenReturn(routingKey); + when(inboundMessage.getRoutingKeyShortString()).thenReturn(new AMQShortString(routingKey)); + List queues = _exchange.route(inboundMessage); + ServerMessage message = mock(ServerMessage.class); + MessageReference ref = mock(MessageReference.class); + when(ref.getMessage()).thenReturn(message); + when(message.newReference()).thenReturn(ref); + when(message.getMessageNumber()).thenReturn(messageNumber); + for(BaseQueue q : queues) { - q.enqueue(msg); + q.enqueue(message); } - return message.getDestinationQueues().size(); + + return queues.size(); } public void testMoreRouting() throws AMQException @@ -382,9 +330,7 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -397,62 +343,11 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } - private IncomingMessage createMessage(String s) throws AMQException - { - MessagePublishInfo info = new PublishInfo(new AMQShortString(s)); - - IncomingMessage message = new IncomingMessage(info); - final ContentHeaderBody chb = new ContentHeaderBody(); - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - chb.setProperties(props); - message.setContentHeaderBody(chb); - - - return message; - } - - - class PublishInfo implements MessagePublishInfo - { - private AMQShortString _routingkey; - - PublishInfo(AMQShortString routingkey) - { - _routingkey = routingkey; - } - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return true; - } - - public AMQShortString getRoutingKey() - { - return _routingkey; - } - } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java index 055197c23e..41b42fac78 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.logging.actors; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.BrokerTestHelper; import java.util.List; @@ -45,7 +45,7 @@ public class AMQPChannelActorTest extends BaseConnectionActorTestCase private void setUpNow() throws Exception { super.setUp(); - AMQChannel channel = BrokerTestHelper.createChannel(1, getSession()); + AMQSessionModel channel = BrokerTestHelper.createSession(1, getConnection()); setAmqpActor(new AMQPChannelActor(channel, getRootLogger())); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java index 09dd48e4d3..1cb6474e41 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java @@ -20,31 +20,43 @@ */ package org.apache.qpid.server.logging.actors; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; public class BaseConnectionActorTestCase extends BaseActorTestCase { - private AMQProtocolSession _session; + private AMQConnectionModel _session; + private VirtualHost _virtualHost; @Override public void setUp() throws Exception { super.setUp(); BrokerTestHelper.setUp(); - _session = BrokerTestHelper.createSession(); - + _session = BrokerTestHelper.createConnection(); + _virtualHost = BrokerTestHelper.createVirtualHost("test"); setAmqpActor(new AMQPConnectionActor(_session, getRootLogger())); } + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + @Override public void tearDown() throws Exception { try { + if(_virtualHost != null) + { + _virtualHost.close(); + } if (_session != null) { - _session.getVirtualHost().close(); + _session.close(AMQConstant.CONNECTION_FORCED, ""); } } finally @@ -54,7 +66,7 @@ public class BaseConnectionActorTestCase extends BaseActorTestCase } } - public AMQProtocolSession getSession() + public AMQConnectionModel getConnection() { return _session; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java index 0c64ce837e..d413c4d4c9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java @@ -23,9 +23,11 @@ package org.apache.qpid.server.logging.actors; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.AMQException; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.NullRootMessageLogger; +import org.apache.qpid.server.util.BrokerTestHelper; /** * Test : CurrentActorTest @@ -71,7 +73,7 @@ public class CurrentActorTest extends BaseConnectionActorTestCase public void testLIFO() throws AMQException, ConfigurationException { assertTrue("Unexpected actor: " + CurrentActor.get(), CurrentActor.get() instanceof TestLogActor); - AMQPConnectionActor connectionActor = new AMQPConnectionActor(getSession(), + AMQPConnectionActor connectionActor = new AMQPConnectionActor(getConnection(), new NullRootMessageLogger()); /* @@ -98,7 +100,7 @@ public class CurrentActorTest extends BaseConnectionActorTestCase * */ - AMQChannel channel = new AMQChannel(getSession(), 1, getSession().getVirtualHost().getMessageStore()); + AMQSessionModel channel = BrokerTestHelper.createSession(1, getConnection()); AMQPChannelActor channelActor = new AMQPChannelActor(channel, new NullRootMessageLogger()); @@ -214,7 +216,7 @@ public class CurrentActorTest extends BaseConnectionActorTestCase { LogActor defaultActor = CurrentActor.get(); - AMQPConnectionActor actor = new AMQPConnectionActor(getSession(), + AMQPConnectionActor actor = new AMQPConnectionActor(getConnection(), new NullRootMessageLogger()); CurrentActor.set(actor); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java index 2dc44c58ce..55153b7389 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java @@ -31,7 +31,7 @@ public class QueueActorTest extends BaseConnectionActorTestCase public void setUp() throws Exception { super.setUp(); - setAmqpActor(new QueueActor(BrokerTestHelper.createQueue(getName(), getSession().getVirtualHost()), getRootLogger())); + setAmqpActor(new QueueActor(BrokerTestHelper.createQueue(getName(), getVirtualHost()), getRootLogger())); } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java index 58fca488c4..92915e7092 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java @@ -44,7 +44,7 @@ public class SubscriptionActorTest extends BaseConnectionActorTestCase MockSubscription mockSubscription = new MockSubscription(); - mockSubscription.setQueue(BrokerTestHelper.createQueue(getName(), getSession().getVirtualHost()), false); + mockSubscription.setQueue(BrokerTestHelper.createQueue(getName(), getVirtualHost()), false); setAmqpActor(new SubscriptionActor(getRootLogger(), mockSubscription)); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java index 193e8a490d..f779295cd4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java @@ -113,7 +113,7 @@ public abstract class AbstractTestLogSubject extends QpidTestCase // This should return us MockProtocolSessionUser@null/test String connectionSlice = getSlice("con:" + connectionID, message); - assertNotNull("Unable to find connection 'con:" + connectionID + "'", + assertNotNull("Unable to find connection 'con:" + connectionID + "' in '"+message+"'", connectionSlice); // Exract the userName @@ -131,7 +131,7 @@ public abstract class AbstractTestLogSubject extends QpidTestCase // We will have three sections assertEquals("Unable to split IP from rest of Connection:" - + userNameParts[1], 3, ipParts.length); + + userNameParts[1] + " in '"+message+"'", 3, ipParts.length); // We need to skip the first '/' split will be empty so validate 1 as IP assertEquals("IP not as expected", ipString, ipParts[1]); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java index a6701301f9..a3d96c6d12 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.AMQSessionModel; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Validate ChannelLogSubjects are logged as expected @@ -34,10 +37,10 @@ public class ChannelLogSubjectTest extends ConnectionLogSubjectTest { super.setUp(); - - AMQChannel channel = new AMQChannel(getSession(), _channelID, getSession().getVirtualHost().getMessageStore()); - - _subject = new ChannelLogSubject(channel); + AMQSessionModel session = mock(AMQSessionModel.class); + when(session.getConnectionModel()).thenReturn(getConnection()); + when(session.getChannelId()).thenReturn(_channelID); + _subject = new ChannelLogSubject(session); } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java index 835bcb3e97..e9a9317102 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java @@ -20,8 +20,10 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; -import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.protocol.AMQConnectionModel; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Validate ConnectionLogSubjects are logged as expected @@ -29,25 +31,24 @@ import org.apache.qpid.server.util.BrokerTestHelper; public class ConnectionLogSubjectTest extends AbstractTestLogSubject { - private InternalTestProtocolSession _session; + private static final long CONNECTION_ID = 456l; + private static final String USER = "InternalTestProtocolSession"; + private static final String IP_STRING = "127.0.0.1:1"; + private static final String VHOST = "test"; + + private AMQConnectionModel _connection; @Override public void setUp() throws Exception { super.setUp(); - _session = BrokerTestHelper.createSession("test"); - _subject = new ConnectionLogSubject(_session); - } - - @Override - public void tearDown() throws Exception - { - if (_session != null) - { - _session.getVirtualHost().close(); - } - super.tearDown(); + _connection = mock(AMQConnectionModel.class); + when(_connection.getConnectionId()).thenReturn(CONNECTION_ID); + when(_connection.getPrincipalAsString()).thenReturn(USER); + when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING); + when(_connection.getVirtualHostName()).thenReturn(VHOST); + _subject = new ConnectionLogSubject(_connection); } /** @@ -57,12 +58,12 @@ public class ConnectionLogSubjectTest extends AbstractTestLogSubject */ protected void validateLogStatement(String message) { - verifyConnection(_session.getSessionID(), "InternalTestProtocolSession", "127.0.0.1:1", "test", message); + verifyConnection(CONNECTION_ID, USER, IP_STRING, VHOST, message); } - public InternalTestProtocolSession getSession() + public AMQConnectionModel getConnection() { - return _session; + return _connection; } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java deleted file mode 100644 index e314361a89..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.subjects; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.flow.LimitlessCreditManager; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactory; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; - -/** - * Validate SubscriptionLogSubjects are logged as expected - */ -public class SubscriptionLogSubjectTest extends AbstractTestLogSubject -{ - - private AMQQueue _queue; - private VirtualHost _testVhost; - private int _channelID = 1; - private Subscription _subscription; - - @Override - public void setUp() throws Exception - { - super.setUp(); - - InternalTestProtocolSession session = BrokerTestHelper.createSession(); - _testVhost = session.getVirtualHost(); - - _queue = new MockAMQQueue("SubscriptionLogSubjectTest"); - ((MockAMQQueue) _queue).setVirtualHost(_testVhost); - - AMQChannel channel = new AMQChannel(session, _channelID, _testVhost.getMessageStore()); - - session.addChannel(channel); - - SubscriptionFactory factory = new SubscriptionFactoryImpl(); - - _subscription = factory.createSubscription(_channelID, session, new AMQShortString("cTag"), - false, null, false, - new LimitlessCreditManager()); - - _subscription.setQueue(_queue, false); - - _subject = new SubscriptionLogSubject(_subscription); - } - - @Override - public void tearDown() throws Exception - { - if (_testVhost != null) - { - _testVhost.close(); - } - super.tearDown(); - } - - /** - * Validate that the logged Subject message is as expected: - * MESSAGE [Blank][sub:0(vh(/test)/qu(SubscriptionLogSubjectTest))] - * - * @param message the message whos format needs validation - */ - @Override - protected void validateLogStatement(String message) - { - String subscriptionSlice = getSlice("sub:" - + _subscription.getSubscriptionID(), - message); - - assertNotNull("Unable to locate subscription 'sub:" + - _subscription.getSubscriptionID() + "'"); - - - - // Pull out the qu(..) section from the subscription message - // Split it into three parts - // MESSAGE [Blank][sub:0(vh(/ - // test)/ - // qu(SubscriptionLogSubjectTest))] - // Take the last bit and drop off the extra )] - String[] parts = message.split("/"); - assertEquals("Message part count wrong", 3, parts.length); - String subscription = parts[2].substring(0, parts[2].indexOf(")") + 1); - - // Adding the ')' is a bit ugly but SubscriptionLogSubject is the only - // Subject that nests () and so the simple parser of checking for the - // next ')' falls down. - verifyVirtualHost(subscriptionSlice+ ")", _queue.getVirtualHost()); - - verifyQueue(subscription, _queue); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolEngineTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolEngineTest.java deleted file mode 100644 index b523979387..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolEngineTest.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.qpid.server.protocol; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.properties.ConnectionStartProperties; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.network.NetworkConnection; - -public class AMQProtocolEngineTest extends QpidTestCase -{ - private Broker _broker; - private Port _port; - private NetworkConnection _network; - private Transport _transport; - - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _broker = BrokerTestHelper.createBrokerMock(); - when(_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(true); - - _port = mock(Port.class); - _network = mock(NetworkConnection.class); - _transport = Transport.TCP; - } - - public void tearDown() throws Exception - { - try - { - super.tearDown(); - } - finally - { - BrokerTestHelper.tearDown(); - } - } - - public void testSetClientPropertiesForNoRouteProvidedAsString() - { - AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); - assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute()); - - Map clientProperties = new HashMap(); - clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE.toString()); - engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties)); - - assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); - } - - public void testSetClientPropertiesForNoRouteProvidedAsBoolean() - { - AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); - assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute()); - - Map clientProperties = new HashMap(); - clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE); - engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties)); - - assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java index a5e5529ed5..b358c7c5c5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java @@ -35,7 +35,6 @@ import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java new file mode 100644 index 0000000000..f5e58cfd02 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java @@ -0,0 +1,72 @@ +package org.apache.qpid.server.protocol.v0_8; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.properties.ConnectionStartProperties; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.network.NetworkConnection; + +public class AMQProtocolEngineTest extends QpidTestCase +{ + private Broker _broker; + private Port _port; + private NetworkConnection _network; + private Transport _transport; + + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + _broker = BrokerTestHelper.createBrokerMock(); + when(_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(true); + + _port = mock(Port.class); + _network = mock(NetworkConnection.class); + _transport = Transport.TCP; + } + + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + BrokerTestHelper.tearDown(); + } + } + + public void testSetClientPropertiesForNoRouteProvidedAsString() + { + AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); + assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute()); + + Map clientProperties = new HashMap(); + clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE.toString()); + engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties)); + + assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); + } + + public void testSetClientPropertiesForNoRouteProvidedAsBoolean() + { + AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); + assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute()); + + Map clientProperties = new HashMap(); + clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE); + engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties)); + + assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 60b6b215c6..4ab64ca100 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -28,9 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; @@ -67,7 +65,7 @@ public class AckTest extends QpidTestCase { super.setUp(); BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(5); + _channel = BrokerTestHelper_0_8.createChannel(5); _protocolSession = _channel.getProtocolSession(); _virtualHost = _protocolSession.getVirtualHost(); _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java new file mode 100644 index 0000000000..a9eb0ebfe7 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -0,0 +1,181 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + + +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; + +import java.util.List; + +public class AcknowledgeTest extends QpidTestCase +{ + private AMQChannel _channel; + private SimpleAMQQueue _queue; + private MessageStore _messageStore; + private String _queueName; + + @Override + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + _channel = BrokerTestHelper_0_8.createChannel(); + VirtualHost virtualHost = _channel.getVirtualHost(); + _queueName = getTestName(); + _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); + _messageStore = virtualHost.getMessageStore(); + } + + @Override + public void tearDown() throws Exception + { + try + { + if (_channel != null) + { + _channel.getVirtualHost().close(); + } + } + finally + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + } + + private AMQChannel getChannel() + { + return _channel; + } + + private InternalTestProtocolSession getSession() + { + return (InternalTestProtocolSession)_channel.getProtocolSession(); + } + + private SimpleAMQQueue getQueue() + { + return _queue; + } + + public void testTransactionalSingleAck() throws AMQException + { + getChannel().setLocalTransactional(); + runMessageAck(1, 1, 1, false, 0); + } + + public void testTransactionalMultiAck() throws AMQException + { + getChannel().setLocalTransactional(); + runMessageAck(10, 1, 5, true, 5); + } + + public void testTransactionalAckAll() throws AMQException + { + getChannel().setLocalTransactional(); + runMessageAck(10, 1, 0, true, 0); + } + + public void testNonTransactionalSingleAck() throws AMQException + { + runMessageAck(1, 1, 1, false, 0); + } + + public void testNonTransactionalMultiAck() throws AMQException + { + runMessageAck(10, 1, 5, true, 5); + } + + public void testNonTransactionalAckAll() throws AMQException + { + runMessageAck(10, 1, 0, true, 0); + } + + protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int remainingUnackedMessages) throws AMQException + { + //Check store is empty + checkStoreContents(0); + + //Send required messsages to the queue + BrokerTestHelper_0_8.publishMessages(getChannel(), + sendMessageCount, + _queueName, + ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); + + if (getChannel().isTransactional()) + { + getChannel().commit(); + } + + //Ensure they are stored + checkStoreContents(sendMessageCount); + + //Check that there are no unacked messages + assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); + + //Subscribe to the queue + AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true); + + getQueue().deliverAsync(); + + //Wait for the messages to be delivered + getSession().awaitDelivery(sendMessageCount); + + //Check that they are all waiting to be acknoledged + assertEquals("Channel should have unacked msgs", sendMessageCount, getChannel().getUnacknowledgedMessageMap().size()); + + List messages = getSession().getDelivers(getChannel().getChannelId(), subscriber, sendMessageCount); + + //Double check we received the right number of messages + assertEquals(sendMessageCount, messages.size()); + + //Check that the first message has the expected deliveryTag + assertEquals("First message does not have expected deliveryTag", firstDeliveryTag, messages.get(0).getDeliveryTag()); + + //Send required Acknowledgement + getChannel().acknowledgeMessage(acknowledgeDeliveryTag, acknowldegeMultiple); + + if (getChannel().isTransactional()) + { + getChannel().commit(); + } + + // Check Remaining Acknowledgements + assertEquals("Channel unacked msgs count incorrect", remainingUnackedMessages, getChannel().getUnacknowledgedMessageMap().size()); + + //Check store contents are also correct. + checkStoreContents(remainingUnackedMessages); + } + + private void checkStoreContents(int messageCount) + { + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java new file mode 100644 index 0000000000..0919607bd7 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BrokerTestHelper_0_8 extends BrokerTestHelper +{ + + public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException + { + AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore()); + session.addChannel(channel); + return channel; + } + + public static AMQChannel createChannel(int channelId) throws Exception + { + InternalTestProtocolSession session = createProtocolSession(); + return createChannel(channelId, session); + } + + public static AMQChannel createChannel() throws Exception + { + return createChannel(1); + } + + public static InternalTestProtocolSession createProtocolSession() throws Exception + { + return createProtocolSession("test"); + } + + public static InternalTestProtocolSession createProtocolSession(String hostName) throws Exception + { + VirtualHost virtualHost = createVirtualHost(hostName); + return new InternalTestProtocolSession(virtualHost, createBrokerMock()); + } + + public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName) throws AMQException + { + AMQShortString rouningKey = new AMQShortString(queueName); + AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName); + MessagePublishInfo info = mock(MessagePublishInfo.class); + when(info.getExchange()).thenReturn(exchangeNameAsShortString); + when(info.getRoutingKey()).thenReturn(rouningKey); + + Exchange exchange = channel.getVirtualHost().getExchange(exchangeName); + for (int count = 0; count < numberOfMessages; count++) + { + channel.setPublishFrame(info, exchange); + + // Set the body size + ContentHeaderBody _headerBody = new ContentHeaderBody(); + _headerBody.setBodySize(0); + + // Set Minimum properties + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + + properties.setExpiration(0L); + properties.setTimestamp(System.currentTimeMillis()); + + // Make Message Persistent + properties.setDeliveryMode((byte) 2); + + _headerBody.setProperties(properties); + + channel.publishContentHeader(_headerBody); + } + channel.sync(); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java index 1cc872e177..a77475c05f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; @@ -37,7 +35,7 @@ public class MaxChannelsTest extends QpidTestCase { super.setUp(); BrokerTestHelper.setUp(); - _session = BrokerTestHelper.createSession(); + _session = BrokerTestHelper_0_8.createProtocolSession(); } public void testChannels() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java new file mode 100644 index 0000000000..21142e7ab6 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; + +import java.util.List; + +public class QueueBrowserUsesNoAckTest extends QpidTestCase +{ + private AMQChannel _channel; + private SimpleAMQQueue _queue; + private MessageStore _messageStore; + private String _queueName; + + @Override + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + _channel = BrokerTestHelper_0_8.createChannel(); + VirtualHost virtualHost = _channel.getVirtualHost(); + _queueName = getTestName(); + _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); + _messageStore = virtualHost.getMessageStore(); + } + + @Override + public void tearDown() throws Exception + { + try + { + if (_channel != null) + { + _channel.getVirtualHost().close(); + } + } + finally + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + } + + private AMQChannel getChannel() + { + return _channel; + } + + private InternalTestProtocolSession getSession() + { + return (InternalTestProtocolSession)_channel.getProtocolSession(); + } + + private SimpleAMQQueue getQueue() + { + return _queue; + } + + public void testQueueBrowserUsesNoAck() throws AMQException + { + int sendMessageCount = 2; + int prefetch = 1; + + //Check store is empty + checkStoreContents(0); + + //Send required messsages to the queue + BrokerTestHelper_0_8.publishMessages(getChannel(), + sendMessageCount, + _queueName, + ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); + + //Ensure they are stored + checkStoreContents(sendMessageCount); + + //Check that there are no unacked messages + assertEquals("Channel should have no unacked msgs ", 0, + getChannel().getUnacknowledgedMessageMap().size()); + + //Set the prefetch on the session to be less than the sent messages + getChannel().setCredit(0, prefetch); + + //browse the queue + AMQShortString browser = browse(getChannel(), getQueue()); + + getQueue().deliverAsync(); + + //Wait for messages to fill the prefetch + getSession().awaitDelivery(prefetch); + + //Get those messages + List messages = + getSession().getDelivers(getChannel().getChannelId(), browser, + prefetch); + + //Ensure we recevied the prefetched messages + assertEquals(prefetch, messages.size()); + + //Check the process didn't suspend the subscription as this would + // indicate we are using the prefetch credit. i.e. using acks not No-Ack + assertTrue("The subscription has been suspended", + !getChannel().getSubscription(browser).getState() + .equals(Subscription.State.SUSPENDED)); + } + + private void checkStoreContents(int messageCount) + { + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); + } + + private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws AMQException + { + FieldTable filters = new FieldTable(); + filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); + + return channel.subscribeToQueue(null, queue, true, filters, false, true); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java new file mode 100644 index 0000000000..e98dd63450 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.logging.UnitTestMessageLogger; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.test.utils.QpidTestCase; + +public class SubscriptionFactoryImplTest extends QpidTestCase +{ + private AMQChannel _channel; + private AMQProtocolSession _session; + + @Override + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + _channel = BrokerTestHelper_0_8.createChannel(); + _session = _channel.getProtocolSession(); + GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false)); + } + + @Override + public void tearDown() throws Exception + { + try + { + if (_channel != null) + { + _channel.getVirtualHost().close(); + } + } + finally + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + } + + /** + * Tests that while creating Subscriptions of various types, the + * ID numbers assigned are allocated from a common sequence + * (in increasing order). + */ + public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception + { + //create a No-Ack subscription, get the first Subscription ID + long previousId = 0; + Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager()); + previousId = noAckSub.getSubscriptionID(); + + //create an ack subscription, verify the next Subscription ID is used + Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID()); + previousId = ackSub.getSubscriptionID(); + + //create a browser subscription + FieldTable filters = new FieldTable(); + filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); + Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID()); + previousId = browerSub.getSubscriptionID(); + + //create an BasicGet NoAck subscription + Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false, + _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID()); + previousId = getNoAckSub.getSubscriptionID(); + + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionLogSubjectTest.java new file mode 100644 index 0000000000..c44fdebc03 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionLogSubjectTest.java @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject; +import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; +import org.apache.qpid.server.flow.LimitlessCreditManager; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; + +/** + * Validate SubscriptionLogSubjects are logged as expected + */ +public class SubscriptionLogSubjectTest extends AbstractTestLogSubject +{ + + private AMQQueue _queue; + private VirtualHost _testVhost; + private int _channelID = 1; + private Subscription _subscription; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + InternalTestProtocolSession session = BrokerTestHelper_0_8.createProtocolSession(); + _testVhost = session.getVirtualHost(); + + _queue = new MockAMQQueue("SubscriptionLogSubjectTest"); + ((MockAMQQueue) _queue).setVirtualHost(_testVhost); + + AMQChannel channel = new AMQChannel(session, _channelID, _testVhost.getMessageStore()); + + session.addChannel(channel); + + SubscriptionFactory factory = new SubscriptionFactoryImpl(); + + _subscription = factory.createSubscription(_channelID, session, new AMQShortString("cTag"), + false, null, false, + new LimitlessCreditManager()); + + _subscription.setQueue(_queue, false); + + _subject = new SubscriptionLogSubject(_subscription); + } + + @Override + public void tearDown() throws Exception + { + if (_testVhost != null) + { + _testVhost.close(); + } + super.tearDown(); + } + + /** + * Validate that the logged Subject message is as expected: + * MESSAGE [Blank][sub:0(vh(/test)/qu(SubscriptionLogSubjectTest))] + * + * @param message the message whos format needs validation + */ + @Override + protected void validateLogStatement(String message) + { + String subscriptionSlice = getSlice("sub:" + + _subscription.getSubscriptionID(), + message); + + assertNotNull("Unable to locate subscription 'sub:" + + _subscription.getSubscriptionID() + "'"); + + + + // Pull out the qu(..) section from the subscription message + // Split it into three parts + // MESSAGE [Blank][sub:0(vh(/ + // test)/ + // qu(SubscriptionLogSubjectTest))] + // Take the last bit and drop off the extra )] + String[] parts = message.split("/"); + assertEquals("Message part count wrong", 3, parts.length); + String subscription = parts[2].substring(0, parts[2].indexOf(")") + 1); + + // Adding the ')' is a bit ugly but SubscriptionLogSubject is the only + // Subject that nests () and so the simple parser of checking for the + // next ')' falls down. + verifyVirtualHost(subscriptionSlice+ ")", _queue.getVirtualHost()); + + verifyQueue(subscription, _queue); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 87f0c06c2b..07e72d3535 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -39,6 +39,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.protocol.v0_8.IncomingMessage; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java deleted file mode 100644 index d9c0b7055f..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.subscription; - -import org.apache.qpid.AMQException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -import java.util.List; - -public class QueueBrowserUsesNoAckTest extends QpidTestCase -{ - private AMQChannel _channel; - private SimpleAMQQueue _queue; - private MessageStore _messageStore; - private String _queueName; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(); - VirtualHost virtualHost = _channel.getVirtualHost(); - _queueName = getTestName(); - _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); - _messageStore = virtualHost.getMessageStore(); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - private AMQChannel getChannel() - { - return _channel; - } - - private InternalTestProtocolSession getSession() - { - return (InternalTestProtocolSession)_channel.getProtocolSession(); - } - - private SimpleAMQQueue getQueue() - { - return _queue; - } - - public void testQueueBrowserUsesNoAck() throws AMQException - { - int sendMessageCount = 2; - int prefetch = 1; - - //Check store is empty - checkStoreContents(0); - - //Send required messsages to the queue - BrokerTestHelper.publishMessages(getChannel(), sendMessageCount, _queueName, ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); - - //Ensure they are stored - checkStoreContents(sendMessageCount); - - //Check that there are no unacked messages - assertEquals("Channel should have no unacked msgs ", 0, - getChannel().getUnacknowledgedMessageMap().size()); - - //Set the prefetch on the session to be less than the sent messages - getChannel().setCredit(0, prefetch); - - //browse the queue - AMQShortString browser = browse(getChannel(), getQueue()); - - getQueue().deliverAsync(); - - //Wait for messages to fill the prefetch - getSession().awaitDelivery(prefetch); - - //Get those messages - List messages = - getSession().getDelivers(getChannel().getChannelId(), browser, - prefetch); - - //Ensure we recevied the prefetched messages - assertEquals(prefetch, messages.size()); - - //Check the process didn't suspend the subscription as this would - // indicate we are using the prefetch credit. i.e. using acks not No-Ack - assertTrue("The subscription has been suspended", - !getChannel().getSubscription(browser).getState() - .equals(Subscription.State.SUSPENDED)); - } - - private void checkStoreContents(int messageCount) - { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); - } - - private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws AMQException - { - FieldTable filters = new FieldTable(); - filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - - return channel.subscribeToQueue(null, queue, true, filters, false, true); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java deleted file mode 100644 index 731b1aadc4..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.subscription; - -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.logging.UnitTestMessageLogger; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; - -public class SubscriptionFactoryImplTest extends QpidTestCase -{ - private AMQChannel _channel; - private AMQProtocolSession _session; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(); - _session = _channel.getProtocolSession(); - GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false)); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - /** - * Tests that while creating Subscriptions of various types, the - * ID numbers assigned are allocated from a common sequence - * (in increasing order). - */ - public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception - { - //create a No-Ack subscription, get the first Subscription ID - long previousId = 0; - Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager()); - previousId = noAckSub.getSubscriptionID(); - - //create an ack subscription, verify the next Subscription ID is used - Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID()); - previousId = ackSub.getSubscriptionID(); - - //create a browser subscription - FieldTable filters = new FieldTable(); - filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID()); - previousId = browerSub.getSubscriptionID(); - - //create an BasicGet NoAck subscription - Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false, - _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID()); - previousId = getNoAckSub.getSubscriptionID(); - - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index bc4e67aefe..caf6acb4bb 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -31,11 +31,8 @@ import java.util.UUID; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.store.JsonConfigurationEntryStore; import org.apache.qpid.server.exchange.DefaultExchangeFactory; @@ -47,8 +44,6 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.security.SecurityManager; @@ -143,33 +138,35 @@ public class BrokerTestHelper return vhostConfig; } - public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException + public static AMQSessionModel createSession(int channelId, AMQConnectionModel connection) throws AMQException { - AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore()); - session.addChannel(channel); - return channel; + AMQSessionModel session = mock(AMQSessionModel.class); + when(session.getConnectionModel()).thenReturn(connection); + when(session.getChannelId()).thenReturn(channelId); + return session; } - public static AMQChannel createChannel(int channelId) throws Exception + public static AMQSessionModel createSession(int channelId) throws Exception { - InternalTestProtocolSession session = createSession(); - return createChannel(channelId, session); + AMQConnectionModel session = createConnection(); + return createSession(channelId, session); } - public static AMQChannel createChannel() throws Exception + public static AMQSessionModel createSession() throws Exception { - return createChannel(1); + return createSession(1); } - public static InternalTestProtocolSession createSession() throws Exception + public static AMQConnectionModel createConnection() throws Exception { - return createSession("test"); + return createConnection("test"); } - public static InternalTestProtocolSession createSession(String hostName) throws Exception + public static AMQConnectionModel createConnection(String hostName) throws Exception { VirtualHost virtualHost = createVirtualHost(hostName); - return new InternalTestProtocolSession(virtualHost, createBrokerMock()); + AMQConnectionModel connection = mock(AMQConnectionModel.class); + return connection; } public static Exchange createExchange(String hostName) throws Exception @@ -182,39 +179,6 @@ public class BrokerTestHelper return factory.createExchange("amp.direct", "direct", false, false); } - public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName) throws AMQException - { - AMQShortString rouningKey = new AMQShortString(queueName); - AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName); - MessagePublishInfo info = mock(MessagePublishInfo.class); - when(info.getExchange()).thenReturn(exchangeNameAsShortString); - when(info.getRoutingKey()).thenReturn(rouningKey); - - Exchange exchange = channel.getVirtualHost().getExchange(exchangeName); - for (int count = 0; count < numberOfMessages; count++) - { - channel.setPublishFrame(info, exchange); - - // Set the body size - ContentHeaderBody _headerBody = new ContentHeaderBody(); - _headerBody.setBodySize(0); - - // Set Minimum properties - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - - properties.setExpiration(0L); - properties.setTimestamp(System.currentTimeMillis()); - - // Make Message Persistent - properties.setDeliveryMode((byte) 2); - - _headerBody.setProperties(properties); - - channel.publishContentHeader(_headerBody); - } - channel.sync(); - } - public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException { SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, null, -- cgit v1.2.1