From db70f1d2908f294fee0ed47cdb478c3ab0f3b252 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Thu, 19 Feb 2015 15:24:27 +0000 Subject: Connection close is now performed by i/o thread git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660909 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/protocol/v0_10/ProtocolEngine_0_10.java | 8 +-- .../server/protocol/v0_10/ServerConnection.java | 69 +++++++++++++++++----- .../protocol/v0_10/ServerSessionDelegate.java | 2 +- .../qpid/server/protocol/v0_8/AMQChannel.java | 2 +- .../server/protocol/v0_8/AMQProtocolEngine.java | 53 +++++++++++++---- .../qpid/server/protocol/v1_0/Connection_1_0.java | 45 +++++++++++++- .../protocol/v1_0/ProtocolEngine_1_0_0_SASL.java | 8 +-- 7 files changed, 146 insertions(+), 41 deletions(-) (limited to 'qpid/java/broker-plugins') diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 0d1fcb008a..10bf0e761e 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -291,12 +291,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } @Override - public void processPendingMessages() + public void processPending() { - for (AMQSessionModel session : _connection.getSessionModels()) - { - session.processPendingMessages(); - } + _connection.processPending(); + } @Override diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index caa8b90485..e9aa57f480 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; +import static org.apache.qpid.transport.Connection.State.CLOSING; import java.net.SocketAddress; import java.security.Principal; @@ -30,6 +31,8 @@ import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -81,9 +84,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel> _taskList = + private final CopyOnWriteArrayList> _connectionCloseTaskList = new CopyOnWriteArrayList>(); + private final Queue> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private final CopyOnWriteArrayList _sessionListeners = new CopyOnWriteArrayList(); @@ -368,25 +374,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel() { - // Ignore - } - close(replyCode, message); + @Override + public void performAction(final ServerConnection object) + { + closeSubscriptions(); + performDeleteTasks(); + + setState(CLOSING); + ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; + try + { + replyCode = ConnectionCloseCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore + } + sendConnectionClose(replyCode, message); + } + }); } protected void performDeleteTasks() { - for(Action task : _taskList) + for(Action task : _connectionCloseTaskList) { task.performAction(this); } @@ -659,13 +675,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); + } + + private void addAsyncTask(final Action action) + { + _asyncTaskList.add(action); + notifyWork(); } @Override public void removeDeleteTask(final Action task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public int getMessageCompressionThreshold() @@ -698,4 +720,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index d33297fbf6..95d54579a7 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -439,7 +439,7 @@ public class ServerSessionDelegate extends SessionDelegate } catch (VirtualHostUnavailableException e) { - getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage()); + getServerConnection(serverSession).closeAsync(AMQConstant.CONNECTION_FORCED, e.getMessage()); } finally { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index c0cc7d55b0..8736bbeb3b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1714,7 +1714,7 @@ public class AMQChannel receivedLock.lock(); try { - _connection.close(AMQConstant.RESOURCE_ERROR, reason); + _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason); } finally { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 88412c3b70..08411b8581 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -36,8 +36,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -164,9 +166,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); - private final List> _taskList = + private final List> _connectionCloseTaskList = new CopyOnWriteArrayList<>(); + private final Queue> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private Map _closingChannelsList = new ConcurrentHashMap<>(); private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); @@ -849,6 +854,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, finally { _receivedLock.unlock(); + finishClose(connectionDropped); } @@ -890,7 +896,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { try { - for (Action task : _taskList) + for (Action task : _connectionCloseTaskList) { task.performAction(this); } @@ -975,13 +981,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, try { markChannelAwaitingCloseOk(channelId); - closeSession(false); + closeSession(false); // currently performs the delete actions. } finally { try { writeFrame(frame); + + // add an async job and not } finally { @@ -1133,12 +1141,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public void addDeleteTask(Action task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); } public void removeDeleteTask(Action task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public ProtocolOutputConverter getProtocolOutputConverter() @@ -1388,11 +1396,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine, writeFrame(responseBody.generateFrame(channelId)); } - public void close(AMQConstant cause, String message) + public void closeAsync(final AMQConstant cause, final String message) { - closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getMethodRegistry(), - null)); + _logger.debug("KWDEBUG About to schedule close"); + + Action action = new Action() + { + @Override + public void performAction(final AMQProtocolEngine object) + { + _logger.debug("KWDEBUG About to perform close"); + closeConnection(0, new AMQConnectionException(cause, message, 0, 0, + getMethodRegistry(), + null)); + + } + }; + addAsyncTask(action); + } + + private void addAsyncTask(final Action action) + { + _asyncTaskList.add(action); + notifyWork(); } public void block() @@ -2080,8 +2106,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } @Override - public void processPendingMessages() + public void processPending() { + while(_asyncTaskList.peek() != null) + { + Action asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + for (AMQSessionModel session : getSessionModels()) { session.processPendingMessages(); @@ -2100,7 +2132,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _stateChanged.set(true); final Action listener = _workListener.get(); - _logger.info("Work lister is null? " + (listener == null)); if(listener != null) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 097abe9d8b..cd4f269029 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -30,11 +30,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import javax.security.auth.Subject; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -58,6 +61,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.transport.Connection; public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel { @@ -100,6 +104,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private List> _closeTasks = Collections.synchronizedList(new ArrayList>()); + + private final Queue> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + + private boolean _closedOnOpen; @@ -213,6 +222,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _closeTasks.add( task ); } + private void addAsyncTask(final Action action) + { + _asyncTaskList.add(action); + notifyWork(); + } + + public void closeReceived() { Collection sessions = new ArrayList(_sessions); @@ -251,9 +267,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod @Override - public void close(AMQConstant cause, String message) + public void closeAsync(AMQConstant cause, String message) { - _conn.close(); + Action action = new Action() + { + @Override + public void performAction(final Connection_1_0 object) + { + _conn.close(); + + } + }; + addAsyncTask(action); + } @Override @@ -510,4 +536,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { return _protocolEngine.isMessageAssignmentSuspended(); } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 0078235990..a0f10eee65 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -620,12 +620,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } @Override - public void processPendingMessages() + public void processPending() { - for (AMQSessionModel session : _connection.getSessionModels()) - { - session.processPendingMessages(); - } + _connection.processPending(); + } @Override -- cgit v1.2.1