summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java82
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java63
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java4
4 files changed, 92 insertions, 64 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 9acc6ada0e..07b8d8d9b7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -70,11 +70,7 @@ import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -92,6 +88,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
private final Object _failoverMutex = new Object();
+ private final Object _sessionCreationLock = new Object();
+
/**
* A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
* and we must prevent the client from opening too many. Zero means unlimited.
@@ -509,6 +507,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
+ synchronized(_sessionCreationLock)
+ {
checkNotClosed();
if (channelLimitReached())
@@ -572,6 +572,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return session;
}
}, this).execute();
+ }
}
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
@@ -760,44 +761,63 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close(long timeout) throws JMSException
{
- synchronized (getFailoverMutex())
+ close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+ }
+
+ public void close(List<AMQSession> sessions, long timeout) throws JMSException
+ {
+ synchronized(_sessionCreationLock)
{
- if (!_closed.getAndSet(true))
+ if(!sessions.isEmpty())
{
- try
+ AMQSession session = sessions.remove(0);
+ synchronized(session.getMessageDeliveryLock())
+ {
+ close(sessions, timeout);
+ }
+ }
+ else
+ {
+ synchronized (getFailoverMutex())
+ {
+ if (!_closed.getAndSet(true))
{
- long startCloseTime = System.currentTimeMillis();
+ try
+ {
+ long startCloseTime = System.currentTimeMillis();
- _taskPool.shutdown();
- closeAllSessions(null, timeout, startCloseTime);
+ _taskPool.shutdown();
+ closeAllSessions(null, timeout, startCloseTime);
- if (!_taskPool.isTerminated())
- {
- try
+ if (!_taskPool.isTerminated())
{
- // adjust timeout
- long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+ try
+ {
+ // adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
- _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- _logger.info("Interrupted while shutting down connection thread pool.");
+ _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interrupted while shutting down connection thread pool.");
+ }
}
- }
- // adjust timeout
- timeout = adjustTimeout(timeout, startCloseTime);
+ // adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
- _protocolHandler.closeConnection(timeout);
+ _protocolHandler.closeConnection(timeout);
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
}
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error closing connection: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
+ }
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 74b4cc0618..f873801dc9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -106,6 +106,7 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -519,7 +520,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized (_messageDeliveryLock)
{
-
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session.
synchronized (_connection.getFailoverMutex())
@@ -666,11 +666,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
else
{
_logger.info("Dispatcher is null so created stopped dispatcher");
-
startDistpatcherIfNecessary(true);
}
- _dispatcher.rejectPending(consumer);
+ _dispatcher.rejectPending(consumer);
}
else
{
@@ -1954,11 +1953,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private void closeConsumers(Throwable error) throws JMSException
{
- if (_dispatcher != null)
- {
- _dispatcher.close();
- _dispatcher = null;
- }
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
@@ -1973,10 +1967,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- con.close();
+ con.close(false);
}
}
// at this point the _consumers map will be empty
+ if (_dispatcher != null)
+ {
+ _dispatcher.close();
+ _dispatcher = null;
+ }
}
/**
@@ -2557,12 +2556,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ Object getMessageDeliveryLock()
+ {
+ return _messageDeliveryLock;
+ }
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
/** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
- private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final AtomicBoolean _dispatcherClosed = new AtomicBoolean(false);
private final Object _lock = new Object();
private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -2578,7 +2582,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void close()
{
- _closed.set(true);
+ _dispatcherClosed.set(true);
interrupt();
// fixme awaitTermination
@@ -2673,30 +2677,33 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
- while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null))
+ while (!_dispatcherClosed.get())
{
- synchronized (_lock)
+ message = (UnprocessedMessage) _queue.poll(1000, TimeUnit.MILLISECONDS);
+ if (message != null)
{
-
- while (connectionStopped())
+ synchronized (_lock)
{
- _lock.wait(2000);
- }
- if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
- {
- rejectMessage(message, true);
- }
- else
- {
- synchronized (_messageDeliveryLock)
+ while (connectionStopped())
{
- dispatchMessage(message);
+ _lock.wait(2000);
}
- }
- }
+ if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
+ {
+ rejectMessage(message, true);
+ }
+ else
+ {
+ synchronized (_messageDeliveryLock)
+ {
+ dispatchMessage(message);
+ }
+ }
+ }
+ }
}
}
catch (InterruptedException e)
@@ -2760,7 +2767,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
// Don't reject if we're already closing
- if (!_closed.get())
+ if (!_dispatcherClosed.get())
{
rejectMessage(message, true);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index 0fc39a9318..579b0d9e90 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.util;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
@@ -69,10 +70,10 @@ public class FlowControllingBlockingQueue
_listener = listener;
}
- public Object take() throws InterruptedException
+ public Object poll(long time, TimeUnit unit) throws InterruptedException
{
- Object o = _queue.take();
- if (_listener != null)
+ Object o = _queue.poll(time, unit);
+ if (o != null && _listener != null)
{
synchronized (_listener)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index 588c82221e..56394fee27 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -47,12 +47,12 @@ public class ConnectionTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
-// TransportConnection.createVMBroker(1);
+ TransportConnection.createVMBroker(1);
}
protected void tearDown() throws Exception
{
-// TransportConnection.killVMBroker(1);
+ TransportConnection.killVMBroker(1);
}
public void testSimpleConnection()