diff options
Diffstat (limited to 'qpid/java')
8 files changed, 148 insertions, 81 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 0b2f0303b0..34bc57a826 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.AMQPChannelActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; @@ -315,7 +316,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel try { _currentMessage.getStoredMessage().flushToStore(); - final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues(); if(!checkMessageUserId(_currentMessage.getContentHeader())) @@ -324,7 +324,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } else { - if(destinationQueues == null || _currentMessage.getDestinationQueues().isEmpty()) + if(destinationQueues == null || destinationQueues.isEmpty()) { if (_currentMessage.isMandatory() || _currentMessage.isImmediate()) { @@ -332,7 +332,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } else { - _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage)); + _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index d0231e4d80..d693c6962b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -356,7 +356,7 @@ public abstract class AbstractExchange implements Exchange, Managable _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); final ArrayList<? extends BaseQueue> queues = doRoute(message); - if(queues != null && !queues.isEmpty()) + if(!queues.isEmpty()) { _routedMessageCount.incrementAndGet(); _routedMessageSize.addAndGet(message.getSize()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 356a7f89b9..29a3611709 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -30,15 +30,12 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.ExchangeConfig; import javax.management.JMException; import java.util.ArrayList; -import java.util.List; import java.util.Collection; -import java.util.concurrent.CopyOnWriteArrayList; public interface Exchange extends ExchangeReferrer, ExchangeConfig { @@ -67,7 +64,12 @@ public interface Exchange extends ExchangeReferrer, ExchangeConfig void close() throws AMQException; - + /** + * Returns a list of queues to which to route this message. If there are + * no queues the empty list must be returned. + * + * @return list of queues to which to route the message. + */ ArrayList<? extends BaseQueue> route(InboundMessage message); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties index b9890d9f27..b25a6a7301 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties @@ -21,4 +21,5 @@ # 0 - type # 1 - name CREATED = EXH-1001 : Create :[ Durable] Type: {0} Name: {1} -DELETED = EXH-1002 : Deleted
\ No newline at end of file +DELETED = EXH-1002 : Deleted +DISCARDMSG = EXH-1003 : Discarded Message : Name: {0} Routing Key: {1} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java index cf8ae2166c..2297e4200d 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java @@ -34,7 +34,7 @@ import org.apache.qpid.transport.codec.BBDecoder; import java.nio.ByteBuffer; import java.lang.ref.SoftReference; -public class MessageMetaData_0_10 implements StorableMessageMetaData +public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage { private Header _header; private DeliveryProperties _deliveryProps; @@ -194,6 +194,12 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData return _deliveryProps == null ? 0L : _deliveryProps.getExpiration(); } + public boolean isRedelivered() + { + // The *Message* is never redelivered, only queue entries are... + return false; + } + public long getArrivalTime() { return _arrivalTime; @@ -239,4 +245,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData } } + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index f89e86e433..22760318b5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.queue.AMQQueue; @@ -167,7 +168,6 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageSubscribe(Session session, MessageSubscribe method) { - //TODO - work around broken Python tests if(!method.hasAcceptMode()) { @@ -284,25 +284,10 @@ public class ServerSessionDelegate extends SessionDelegate } } - @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { - ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn); - Exchange exchange; - if(xfr.hasDestination()) - { - exchange = exchangeRegistry.getExchange(xfr.getDestination()); - if(exchange == null) - { - exchange = exchangeRegistry.getDefaultExchange(); - } - } - else - { - exchange = exchangeRegistry.getDefaultExchange(); - } - + final Exchange exchange = getExchangeForMessage(ssn, xfr); DeliveryProperties delvProps = null; if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) @@ -310,7 +295,7 @@ public class ServerSessionDelegate extends SessionDelegate delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); } - MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); + final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName())) { @@ -320,66 +305,65 @@ public class ServerSessionDelegate extends SessionDelegate return; } - - final MessageStore store = getVirtualHost(ssn).getMessageStore(); - StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData); - ByteBuffer body = xfr.getBody(); - if(body != null) + + final Exchange exchangeInUse; + ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData); + if(queues.isEmpty() && exchange.getAlternateExchange() != null) { - storeMessage.addContent(0, body); + final Exchange alternateExchange = exchange.getAlternateExchange(); + queues = alternateExchange.route(messageMetaData); + if (!queues.isEmpty()) + { + exchangeInUse = alternateExchange; + } + else + { + exchangeInUse = exchange; + } + } + else + { + exchangeInUse = exchange; } - storeMessage.flushToStore(); - MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference()); - - ArrayList<? extends BaseQueue> queues = exchange.route(message); - - - if(queues != null && queues.size() != 0) + if(!queues.isEmpty()) { + final MessageStore store = getVirtualHost(ssn).getMessageStore(); + final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store); + MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference()); ((ServerSession) ssn).enqueue(message, queues); } else { - if(delvProps == null || !delvProps.hasDiscardUnroutable() || !delvProps.getDiscardUnroutable()) + if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) { - if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) - { - RangeSet rejects = new RangeSet(); - rejects.add(xfr.getId()); - MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); - ssn.invoke(reject); - } - else - { - Exchange alternate = exchange.getAlternateExchange(); - if(alternate != null) - { - queues = alternate.route(message); - if(queues != null && queues.size() != 0) - { - ((ServerSession) ssn).enqueue(message, queues); - } - else - { - //TODO - log the message discard - } - } - else - { - //TODO - log the message discard - } - - - } + RangeSet rejects = new RangeSet(); + rejects.add(xfr.getId()); + MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); + ssn.invoke(reject); + } + else + { + ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey())); } - - } ssn.processed(xfr); } + private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr, + final MessageMetaData_0_10 messageMetaData, final MessageStore store) + { + final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData); + ByteBuffer body = xfr.getBody(); + if(body != null) + { + storeMessage.addContent(0, body); + } + storeMessage.flushToStore(); + return storeMessage; + } + @Override public void messageCancel(Session session, MessageCancel method) { @@ -582,6 +566,25 @@ public class ServerSessionDelegate extends SessionDelegate } + private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr) + { + final ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn); + Exchange exchange; + if(xfr.hasDestination()) + { + exchange = exchangeRegistry.getExchange(xfr.getDestination()); + if(exchange == null) + { + exchange = exchangeRegistry.getDefaultExchange(); + } + } + else + { + exchange = exchangeRegistry.getDefaultExchange(); + } + return exchange; + } + private VirtualHost getVirtualHost(Session session) { ServerConnection conn = getServerConnection(session); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java index 728a98e009..4364376000 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java @@ -66,7 +66,6 @@ public class ExchangeMessagesTest extends AbstractTestMessages validateLogMessage(log, "EXH-1001", expected); } - public void testExchangeDeleted() { _logMessage = ExchangeMessages.DELETED(); @@ -77,4 +76,21 @@ public class ExchangeMessagesTest extends AbstractTestMessages validateLogMessage(log, "EXH-1002", expected); } + public void testExchangeDiscardedMessage() + { + // Get the Default Exchange on the Test Vhost for testing + final Exchange exchange = ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHost("test"). + getExchangeRegistry().getDefaultExchange(); + + final String name = exchange.getNameShortString().toString(); + final String routingKey = "routingKey"; + + _logMessage = ExchangeMessages.DISCARDMSG(name, routingKey); + List<Object> log = performLog(); + + String[] expected = {"Discarded Message :","Name:", name, "Routing Key:", routingKey}; + + validateLogMessage(log, "EXH-1003", expected); + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java index 1e48f34f99..b3c080b2e3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java @@ -20,6 +20,16 @@ */ package org.apache.qpid.server.logging; +import java.io.IOException; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.framing.AMQFrame; @@ -28,13 +38,6 @@ import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.ExchangeDeleteOkBody; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.Session; -import java.io.IOException; -import java.util.List; - /** * Exchange * @@ -214,4 +217,38 @@ public class ExchangeLoggingTest extends AbstractTestLogging } + public void testDiscardedMessage() throws Exception + { + //Ignore broker startup messages + _monitor.reset(); + + if (!isBroker010()) + { + // Default 0-8..-0-9-1 behaviour is for messages to be rejected (returned to client). + setTestClientSystemProperty("qpid.default_mandatory", "false"); + } + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Do not create consumer so queue is not created and message will be discarded. + final MessageProducer producer = _session.createProducer(_queue); + + // Sending message + final TextMessage msg = _session.createTextMessage("msg"); + producer.send(msg); + + final String expectedMessageBody = "Discarded Message : Name: " + _name + " Routing Key: " + _queue.getQueueName(); + + // Ensure we have received the EXH log msg. + waitForMessage("EXH-1003"); + + List<String> results = findMatches(EXH_PREFIX); + assertEquals("Result set larger than expected.", 2, results.size()); + + final String log = getLogMessage(results, 1); + validateMessageID("EXH-1003", log); + + final String message = getMessageString(fromMessage(log)); + assertEquals("Log Message not as expected", expectedMessageBody, message); + } } |
