diff options
Diffstat (limited to 'java')
6 files changed, 49 insertions, 25 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 15a517a6b2..416a9a4d7f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -857,9 +857,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager else { - if (_messages.size() > 0) + if (s.filtersMessages()) { - _log.error("Direct delivery with queued msgs:" + _messages.size()); + if (s.getPreDeliveryQueue().size() > 0) + { + _log.error("Direct delivery from PDQ with queued msgs:" + s.getPreDeliveryQueue().size()); + } + } + else if (_messages.size() > 0) + { + _log.error("Direct delivery from MainQueue queued msgs:" + _messages.size()); } //release lock now diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index bfac194d5d..7c0803a61a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.client; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Hashtable; +import java.util.UUID; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -125,21 +126,25 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF } _defaultPassword = password; } - + /** * Getter for SSLConfiguration + * * @return SSLConfiguration if set, otherwise null */ - public final SSLConfiguration getSSLConfiguration() { - return _sslConfig; + public final SSLConfiguration getSSLConfiguration() + { + return _sslConfig; } - + /** * Setter for SSLConfiguration + * * @param sslConfig config to store */ - public final void setSSLConfiguration(SSLConfiguration sslConfig) { - _sslConfig = sslConfig; + public final void setSSLConfiguration(SSLConfiguration sslConfig) + { + _sslConfig = sslConfig; } /** @@ -243,7 +248,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF } catch (UnknownHostException e) { - return null; + return "UnknownHost" + UUID.randomUUID(); } } @@ -336,7 +341,9 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF * @param name * @param ctx * @param env + * * @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory. + * * @throws Exception */ public Object getObjectInstance(Object obj, Name name, Context ctx, diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index df3b5ed202..fee6690538 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -519,11 +519,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } - synchronized (_messageDeliveryLock) + synchronized (_connection.getFailoverMutex()) { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session. - synchronized (_connection.getFailoverMutex()) + synchronized (_messageDeliveryLock) { // Ensure we only try and close an open session. if (!_closed.getAndSet(true)) @@ -577,9 +577,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) throws JMSException { - synchronized (_messageDeliveryLock) + synchronized (_connection.getFailoverMutex()) { - synchronized (_connection.getFailoverMutex()) + synchronized (_messageDeliveryLock) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index a9c93d7227..a0a8eb10ed 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -143,17 +143,17 @@ public class DurableSubscriptionTest extends TestCase producer.send(session1.createTextMessage("A")); Message msg; - msg = consumer1.receive(100); + msg = consumer1.receive(500); assertNotNull("Message should be available", msg); assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText()); - msg = consumer1.receive(100); + msg = consumer1.receive(500); assertNull("There should be no more messages for consumption on consumer1.", msg); msg = consumer2.receive(); assertNotNull(msg); assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText()); - msg = consumer2.receive(100); + msg = consumer2.receive(500); assertNull("There should be no more messages for consumption on consumer2.", msg); consumer2.close(); @@ -164,21 +164,24 @@ public class DurableSubscriptionTest extends TestCase producer.send(session1.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); - msg = consumer1.receive(100); + msg = consumer1.receive(500); assertNotNull("Consumer 1 should get message 'B'.", msg); assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); - msg = consumer1.receive(100); + msg = consumer1.receive(500); assertNull("There should be no more messages for consumption on consumer1.", msg); _logger.info("Receive message on consumer 3 :expecting B"); - msg = consumer3.receive(100); + msg = consumer3.receive(500); assertNotNull("Consumer 3 should get message 'B'.", msg); assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); - msg = consumer3.receive(100); + msg = consumer3.receive(500); assertNull("There should be no more messages for consumption on consumer3.", msg); - + + consumer1.close(); + consumer3.close(); + con.close(); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 3d5a1096b4..a4c016916e 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -481,9 +481,16 @@ public class CommitRollbackTest extends TestCase assertEquals("2", ((TextMessage) result).getText()); } - result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); - assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + Message result2 = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result2); + + // if this is message 1 then it should be marked as redelivered + if("1".equals(((TextMessage) result2).getText())) + { + assertTrue("Messasge is not marked as redelivered" + result2, result2.getJMSRedelivered()); + } + + assertNotSame("Messages should not have the same content",((TextMessage) result2).getText(), ((TextMessage) result).getText() ); result = _consumer.receive(1000); assertNull("test message should be null:" + result, result); diff --git a/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java b/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java index d816065758..38b7758e7c 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java +++ b/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java @@ -342,7 +342,7 @@ public class LocalCircuitImpl implements Circuit }
catch (JMSException e)
{
- throw new RuntimeException("Got JMSException during close.", e);
+ throw new RuntimeException("Got JMSException during close:" + e.getMessage(), e);
}
}
|
