summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-19 15:24:27 +0000
committerKeith Wall <kwall@apache.org>2015-02-19 15:24:27 +0000
commitdb70f1d2908f294fee0ed47cdb478c3ab0f3b252 (patch)
treebf08922f63b255a26182700f386bab4406db631d /qpid/java/broker-plugins
parentc1926054f005af5084e46e6bf8da0c30120c82b4 (diff)
downloadqpid-python-db70f1d2908f294fee0ed47cdb478c3ab0f3b252.tar.gz
Connection close is now performed by i/o thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660909 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java69
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java53
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java45
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java8
7 files changed, 146 insertions, 41 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 0d1fcb008a..10bf0e761e 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -291,12 +291,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
- for (AMQSessionModel session : _connection.getSessionModels())
- {
- session.processPendingMessages();
- }
+ _connection.processPending();
+
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index caa8b90485..e9aa57f480 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_10;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
+import static org.apache.qpid.transport.Connection.State.CLOSING;
import java.net.SocketAddress;
import java.security.Principal;
@@ -30,6 +31,8 @@ import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -81,9 +84,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private boolean _blocking;
private Transport _transport;
- private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
+ private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList =
new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+ private final Queue<Action<? super ServerConnection>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
new CopyOnWriteArrayList<SessionModelListener>();
@@ -368,25 +374,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
}
- public void close(AMQConstant cause, String message)
+ public void closeAsync(final AMQConstant cause, final String message)
{
- closeSubscriptions();
- performDeleteTasks();
- ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
- try
- {
- replyCode = ConnectionCloseCode.get(cause.getCode());
- }
- catch (IllegalArgumentException iae)
+
+ addAsyncTask(new Action<ServerConnection>()
{
- // Ignore
- }
- close(replyCode, message);
+ @Override
+ public void performAction(final ServerConnection object)
+ {
+ closeSubscriptions();
+ performDeleteTasks();
+
+ setState(CLOSING);
+ ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
+ try
+ {
+ replyCode = ConnectionCloseCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore
+ }
+ sendConnectionClose(replyCode, message);
+ }
+ });
}
protected void performDeleteTasks()
{
- for(Action<? super ServerConnection> task : _taskList)
+ for(Action<? super ServerConnection> task : _connectionCloseTaskList)
{
task.performAction(this);
}
@@ -659,13 +675,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
@Override
public void addDeleteTask(final Action<? super ServerConnection> task)
{
- _taskList.add(task);
+ _connectionCloseTaskList.add(task);
+ }
+
+ private void addAsyncTask(final Action<ServerConnection> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
}
@Override
public void removeDeleteTask(final Action<? super ServerConnection> task)
{
- _taskList.remove(task);
+ _connectionCloseTaskList.remove(task);
}
public int getMessageCompressionThreshold()
@@ -698,4 +720,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
return _serverProtocolEngine.isMessageAssignmentSuspended();
}
+
+ public void processPending()
+ {
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super ServerConnection> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPendingMessages();
+ }
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index d33297fbf6..95d54579a7 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -439,7 +439,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
catch (VirtualHostUnavailableException e)
{
- getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage());
+ getServerConnection(serverSession).closeAsync(AMQConstant.CONNECTION_FORCED, e.getMessage());
}
finally
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c0cc7d55b0..8736bbeb3b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1714,7 +1714,7 @@ public class AMQChannel
receivedLock.lock();
try
{
- _connection.close(AMQConstant.RESOURCE_ERROR, reason);
+ _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason);
}
finally
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 88412c3b70..08411b8581 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -36,8 +36,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -164,9 +166,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
- private final List<Action<? super AMQProtocolEngine>> _taskList =
+ private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList =
new CopyOnWriteArrayList<>();
+ private final Queue<Action<? super AMQProtocolEngine>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>();
private ProtocolOutputConverter _protocolOutputConverter;
private final Subject _authorizedSubject = new Subject();
@@ -849,6 +854,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
finally
{
_receivedLock.unlock();
+
finishClose(connectionDropped);
}
@@ -890,7 +896,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
try
{
- for (Action<? super AMQProtocolEngine> task : _taskList)
+ for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList)
{
task.performAction(this);
}
@@ -975,13 +981,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
try
{
markChannelAwaitingCloseOk(channelId);
- closeSession(false);
+ closeSession(false); // currently performs the delete actions.
}
finally
{
try
{
writeFrame(frame);
+
+ // add an async job and not
}
finally
{
@@ -1133,12 +1141,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
public void addDeleteTask(Action<? super AMQProtocolEngine> task)
{
- _taskList.add(task);
+ _connectionCloseTaskList.add(task);
}
public void removeDeleteTask(Action<? super AMQProtocolEngine> task)
{
- _taskList.remove(task);
+ _connectionCloseTaskList.remove(task);
}
public ProtocolOutputConverter getProtocolOutputConverter()
@@ -1388,11 +1396,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
writeFrame(responseBody.generateFrame(channelId));
}
- public void close(AMQConstant cause, String message)
+ public void closeAsync(final AMQConstant cause, final String message)
{
- closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
- getMethodRegistry(),
- null));
+ _logger.debug("KWDEBUG About to schedule close");
+
+ Action<AMQProtocolEngine> action = new Action<AMQProtocolEngine>()
+ {
+ @Override
+ public void performAction(final AMQProtocolEngine object)
+ {
+ _logger.debug("KWDEBUG About to perform close");
+ closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
+ getMethodRegistry(),
+ null));
+
+ }
+ };
+ addAsyncTask(action);
+ }
+
+ private void addAsyncTask(final Action<AMQProtocolEngine> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
}
public void block()
@@ -2080,8 +2106,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
for (AMQSessionModel session : getSessionModels())
{
session.processPendingMessages();
@@ -2100,7 +2132,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_stateChanged.set(true);
final Action<ServerProtocolEngine> listener = _workListener.get();
- _logger.info("Work lister is null? " + (listener == null));
if(listener != null)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 097abe9d8b..cd4f269029 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -30,11 +30,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.security.auth.Subject;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
@@ -58,6 +61,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.transport.Connection;
public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0>
{
@@ -100,6 +104,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private List<Action<? super Connection_1_0>> _closeTasks =
Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>());
+
+ private final Queue<Action<? super Connection_1_0>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
+
private boolean _closedOnOpen;
@@ -213,6 +222,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
_closeTasks.add( task );
}
+ private void addAsyncTask(final Action<Connection_1_0> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
+ }
+
+
public void closeReceived()
{
Collection<Session_1_0> sessions = new ArrayList(_sessions);
@@ -251,9 +267,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
@Override
- public void close(AMQConstant cause, String message)
+ public void closeAsync(AMQConstant cause, String message)
{
- _conn.close();
+ Action<Connection_1_0> action = new Action<Connection_1_0>()
+ {
+ @Override
+ public void performAction(final Connection_1_0 object)
+ {
+ _conn.close();
+
+ }
+ };
+ addAsyncTask(action);
+
}
@Override
@@ -510,4 +536,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
{
return _protocolEngine.isMessageAssignmentSuspended();
}
+
+ public void processPending()
+ {
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPendingMessages();
+ }
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 0078235990..a0f10eee65 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -620,12 +620,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
- for (AMQSessionModel session : _connection.getSessionModels())
- {
- session.processPendingMessages();
- }
+ _connection.processPending();
+
}
@Override