diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-19 15:24:27 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-19 15:24:27 +0000 |
| commit | db70f1d2908f294fee0ed47cdb478c3ab0f3b252 (patch) | |
| tree | bf08922f63b255a26182700f386bab4406db631d /qpid/java/broker-plugins | |
| parent | c1926054f005af5084e46e6bf8da0c30120c82b4 (diff) | |
| download | qpid-python-db70f1d2908f294fee0ed47cdb478c3ab0f3b252.tar.gz | |
Connection close is now performed by i/o thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660909 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
7 files changed, 146 insertions, 41 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 0d1fcb008a..10bf0e761e 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -291,12 +291,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } @Override - public void processPendingMessages() + public void processPending() { - for (AMQSessionModel session : _connection.getSessionModels()) - { - session.processPendingMessages(); - } + _connection.processPending(); + } @Override diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index caa8b90485..e9aa57f480 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; +import static org.apache.qpid.transport.Connection.State.CLOSING; import java.net.SocketAddress; import java.security.Principal; @@ -30,6 +31,8 @@ import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -81,9 +84,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private boolean _blocking; private Transport _transport; - private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList = + private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList = new CopyOnWriteArrayList<Action<? super ServerConnection>>(); + private final Queue<Action<? super ServerConnection>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = new CopyOnWriteArrayList<SessionModelListener>(); @@ -368,25 +374,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } } - public void close(AMQConstant cause, String message) + public void closeAsync(final AMQConstant cause, final String message) { - closeSubscriptions(); - performDeleteTasks(); - ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; - try - { - replyCode = ConnectionCloseCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) + + addAsyncTask(new Action<ServerConnection>() { - // Ignore - } - close(replyCode, message); + @Override + public void performAction(final ServerConnection object) + { + closeSubscriptions(); + performDeleteTasks(); + + setState(CLOSING); + ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; + try + { + replyCode = ConnectionCloseCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore + } + sendConnectionClose(replyCode, message); + } + }); } protected void performDeleteTasks() { - for(Action<? super ServerConnection> task : _taskList) + for(Action<? super ServerConnection> task : _connectionCloseTaskList) { task.performAction(this); } @@ -659,13 +675,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S @Override public void addDeleteTask(final Action<? super ServerConnection> task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); + } + + private void addAsyncTask(final Action<ServerConnection> action) + { + _asyncTaskList.add(action); + notifyWork(); } @Override public void removeDeleteTask(final Action<? super ServerConnection> task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public int getMessageCompressionThreshold() @@ -698,4 +720,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { return _serverProtocolEngine.isMessageAssignmentSuspended(); } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action<? super ServerConnection> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index d33297fbf6..95d54579a7 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -439,7 +439,7 @@ public class ServerSessionDelegate extends SessionDelegate } catch (VirtualHostUnavailableException e) { - getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage()); + getServerConnection(serverSession).closeAsync(AMQConstant.CONNECTION_FORCED, e.getMessage()); } finally { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index c0cc7d55b0..8736bbeb3b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1714,7 +1714,7 @@ public class AMQChannel receivedLock.lock(); try { - _connection.close(AMQConstant.RESOURCE_ERROR, reason); + _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason); } finally { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 88412c3b70..08411b8581 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -36,8 +36,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -164,9 +166,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); - private final List<Action<? super AMQProtocolEngine>> _taskList = + private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList = new CopyOnWriteArrayList<>(); + private final Queue<Action<? super AMQProtocolEngine>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>(); private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); @@ -849,6 +854,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, finally { _receivedLock.unlock(); + finishClose(connectionDropped); } @@ -890,7 +896,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { try { - for (Action<? super AMQProtocolEngine> task : _taskList) + for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList) { task.performAction(this); } @@ -975,13 +981,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, try { markChannelAwaitingCloseOk(channelId); - closeSession(false); + closeSession(false); // currently performs the delete actions. } finally { try { writeFrame(frame); + + // add an async job and not } finally { @@ -1133,12 +1141,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public void addDeleteTask(Action<? super AMQProtocolEngine> task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); } public void removeDeleteTask(Action<? super AMQProtocolEngine> task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public ProtocolOutputConverter getProtocolOutputConverter() @@ -1388,11 +1396,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine, writeFrame(responseBody.generateFrame(channelId)); } - public void close(AMQConstant cause, String message) + public void closeAsync(final AMQConstant cause, final String message) { - closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getMethodRegistry(), - null)); + _logger.debug("KWDEBUG About to schedule close"); + + Action<AMQProtocolEngine> action = new Action<AMQProtocolEngine>() + { + @Override + public void performAction(final AMQProtocolEngine object) + { + _logger.debug("KWDEBUG About to perform close"); + closeConnection(0, new AMQConnectionException(cause, message, 0, 0, + getMethodRegistry(), + null)); + + } + }; + addAsyncTask(action); + } + + private void addAsyncTask(final Action<AMQProtocolEngine> action) + { + _asyncTaskList.add(action); + notifyWork(); } public void block() @@ -2080,8 +2106,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } @Override - public void processPendingMessages() + public void processPending() { + while(_asyncTaskList.peek() != null) + { + Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + for (AMQSessionModel session : getSessionModels()) { session.processPendingMessages(); @@ -2100,7 +2132,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _stateChanged.set(true); final Action<ServerProtocolEngine> listener = _workListener.get(); - _logger.info("Work lister is null? " + (listener == null)); if(listener != null) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 097abe9d8b..cd4f269029 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -30,11 +30,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import javax.security.auth.Subject; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -58,6 +61,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.transport.Connection; public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0> { @@ -100,6 +104,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private List<Action<? super Connection_1_0>> _closeTasks = Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>()); + + private final Queue<Action<? super Connection_1_0>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + + private boolean _closedOnOpen; @@ -213,6 +222,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _closeTasks.add( task ); } + private void addAsyncTask(final Action<Connection_1_0> action) + { + _asyncTaskList.add(action); + notifyWork(); + } + + public void closeReceived() { Collection<Session_1_0> sessions = new ArrayList(_sessions); @@ -251,9 +267,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod @Override - public void close(AMQConstant cause, String message) + public void closeAsync(AMQConstant cause, String message) { - _conn.close(); + Action<Connection_1_0> action = new Action<Connection_1_0>() + { + @Override + public void performAction(final Connection_1_0 object) + { + _conn.close(); + + } + }; + addAsyncTask(action); + } @Override @@ -510,4 +536,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { return _protocolEngine.isMessageAssignmentSuspended(); } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 0078235990..a0f10eee65 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -620,12 +620,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } @Override - public void processPendingMessages() + public void processPending() { - for (AMQSessionModel session : _connection.getSessionModels()) - { - session.processPendingMessages(); - } + _connection.processPending(); + } @Override |
