summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java8
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java19
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java13
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java2
6 files changed, 49 insertions, 25 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 15a517a6b2..416a9a4d7f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -857,9 +857,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
else
{
- if (_messages.size() > 0)
+ if (s.filtersMessages())
{
- _log.error("Direct delivery with queued msgs:" + _messages.size());
+ if (s.getPreDeliveryQueue().size() > 0)
+ {
+ _log.error("Direct delivery from PDQ with queued msgs:" + s.getPreDeliveryQueue().size());
+ }
+ }
+ else if (_messages.size() > 0)
+ {
+ _log.error("Direct delivery from MainQueue queued msgs:" + _messages.size());
}
//release lock now
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
index bfac194d5d..7c0803a61a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Hashtable;
+import java.util.UUID;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -125,21 +126,25 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
}
_defaultPassword = password;
}
-
+
/**
* Getter for SSLConfiguration
+ *
* @return SSLConfiguration if set, otherwise null
*/
- public final SSLConfiguration getSSLConfiguration() {
- return _sslConfig;
+ public final SSLConfiguration getSSLConfiguration()
+ {
+ return _sslConfig;
}
-
+
/**
* Setter for SSLConfiguration
+ *
* @param sslConfig config to store
*/
- public final void setSSLConfiguration(SSLConfiguration sslConfig) {
- _sslConfig = sslConfig;
+ public final void setSSLConfiguration(SSLConfiguration sslConfig)
+ {
+ _sslConfig = sslConfig;
}
/**
@@ -243,7 +248,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
}
catch (UnknownHostException e)
{
- return null;
+ return "UnknownHost" + UUID.randomUUID();
}
}
@@ -336,7 +341,9 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
* @param name
* @param ctx
* @param env
+ *
* @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory.
+ *
* @throws Exception
*/
public Object getObjectInstance(Object obj, Name name, Context ctx,
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index df3b5ed202..fee6690538 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -519,11 +519,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
}
- synchronized (_messageDeliveryLock)
+ synchronized (_connection.getFailoverMutex())
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session.
- synchronized (_connection.getFailoverMutex())
+ synchronized (_messageDeliveryLock)
{
// Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -577,9 +577,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e) throws JMSException
{
- synchronized (_messageDeliveryLock)
+ synchronized (_connection.getFailoverMutex())
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (_messageDeliveryLock)
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index a9c93d7227..a0a8eb10ed 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -143,17 +143,17 @@ public class DurableSubscriptionTest extends TestCase
producer.send(session1.createTextMessage("A"));
Message msg;
- msg = consumer1.receive(100);
+ msg = consumer1.receive(500);
assertNotNull("Message should be available", msg);
assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
- msg = consumer1.receive(100);
+ msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
msg = consumer2.receive();
assertNotNull(msg);
assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
- msg = consumer2.receive(100);
+ msg = consumer2.receive(500);
assertNull("There should be no more messages for consumption on consumer2.", msg);
consumer2.close();
@@ -164,21 +164,24 @@ public class DurableSubscriptionTest extends TestCase
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(100);
+ msg = consumer1.receive(500);
assertNotNull("Consumer 1 should get message 'B'.", msg);
assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(100);
+ msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
_logger.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(100);
+ msg = consumer3.receive(500);
assertNotNull("Consumer 3 should get message 'B'.", msg);
assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
- msg = consumer3.receive(100);
+ msg = consumer3.receive(500);
assertNull("There should be no more messages for consumption on consumer3.", msg);
-
+
+ consumer1.close();
+ consumer3.close();
+
con.close();
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 3d5a1096b4..a4c016916e 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -481,9 +481,16 @@ public class CommitRollbackTest extends TestCase
assertEquals("2", ((TextMessage) result).getText());
}
- result = _consumer.receive(1000);
- assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+ Message result2 = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result2);
+
+ // if this is message 1 then it should be marked as redelivered
+ if("1".equals(((TextMessage) result2).getText()))
+ {
+ assertTrue("Messasge is not marked as redelivered" + result2, result2.getJMSRedelivered());
+ }
+
+ assertNotSame("Messages should not have the same content",((TextMessage) result2).getText(), ((TextMessage) result).getText() );
result = _consumer.receive(1000);
assertNull("test message should be null:" + result, result);
diff --git a/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java b/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java
index d816065758..38b7758e7c 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java
@@ -342,7 +342,7 @@ public class LocalCircuitImpl implements Circuit
}
catch (JMSException e)
{
- throw new RuntimeException("Got JMSException during close.", e);
+ throw new RuntimeException("Got JMSException during close:" + e.getMessage(), e);
}
}