diff options
| author | Keith Wall <kwall@apache.org> | 2014-07-24 17:39:15 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-07-24 17:39:15 +0000 |
| commit | 9651210770b5c2ff04877739403687f06e72ba3c (patch) | |
| tree | 3e1f772cfca42acf0b8cb7f8d80255cf49556fed /qpid/java | |
| parent | f87df8d18a1fa100fc862ad2160cadd3d9de9a9f (diff) | |
| download | qpid-python-9651210770b5c2ff04877739403687f06e72ba3c.tar.gz | |
QPID-5915: [Java Broker] Ensure that closing a Connection model object also cause the underlying connection to close too
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613197 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
6 files changed, 127 insertions, 50 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 2355041f67..f17b67d053 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.AbstractConfiguredObject; @@ -43,24 +44,29 @@ import org.apache.qpid.server.util.Action; public final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter>, SessionModelListener { - private AMQConnectionModel _connection; + private final Action _underlyingConnectionDeleteTask; + private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false); + private AMQConnectionModel _underlyingConnection; private State _state = State.ACTIVE; public ConnectionAdapter(final AMQConnectionModel conn) { super(parentsMap(conn.getVirtualHost()),createAttributes(conn)); - _connection = conn; + _underlyingConnection = conn; - conn.addDeleteTask(new Action() + // Used to allow the protocol layers to tell the model they have been deleted + _underlyingConnectionDeleteTask = new Action() { @Override public void performAction(final Object object) { conn.removeDeleteTask(this); + _underlyingClosed.set(true); deleted(); } - }); + }; + conn.addDeleteTask(_underlyingConnectionDeleteTask); conn.addSessionListener(this); } @@ -77,13 +83,13 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection @Override public String getClientId() { - return _connection.getClientId(); + return _underlyingConnection.getClientId(); } @Override public String getClientVersion() { - return _connection.getClientVersion(); + return _underlyingConnection.getClientVersion(); } @Override @@ -101,14 +107,14 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection @Override public String getPrincipal() { - final Principal authorizedPrincipal = _connection.getAuthorizedPrincipal(); + final Principal authorizedPrincipal = _underlyingConnection.getAuthorizedPrincipal(); return authorizedPrincipal == null ? null : authorizedPrincipal.getName(); } @Override public String getRemoteAddress() { - return _connection.getRemoteAddressString(); + return _underlyingConnection.getRemoteAddressString(); } @Override @@ -126,19 +132,19 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection @Override public long getSessionCountLimit() { - return _connection.getSessionCountLimit(); + return _underlyingConnection.getSessionCountLimit(); } @Override public Transport getTransport() { - return _connection.getTransport(); + return _underlyingConnection.getTransport(); } @Override public Port getPort() { - return _connection.getPort(); + return _underlyingConnection.getPort(); } public Collection<Session> getSessions() @@ -149,17 +155,22 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED) private void doDelete() { - _connection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + closeUnderlyingConnection(); deleted(); _state = State.DELETED; } + @Override + protected void onClose() + { + closeUnderlyingConnection(); + } + public State getState() { return _state; } - @Override public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { @@ -177,37 +188,37 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection @Override public long getBytesIn() { - return _connection.getDataReceiptStatistics().getTotal(); + return _underlyingConnection.getDataReceiptStatistics().getTotal(); } @Override public long getBytesOut() { - return _connection.getDataDeliveryStatistics().getTotal(); + return _underlyingConnection.getDataDeliveryStatistics().getTotal(); } @Override public long getMessagesIn() { - return _connection.getMessageReceiptStatistics().getTotal(); + return _underlyingConnection.getMessageReceiptStatistics().getTotal(); } @Override public long getMessagesOut() { - return _connection.getMessageDeliveryStatistics().getTotal(); + return _underlyingConnection.getMessageDeliveryStatistics().getTotal(); } @Override public long getLastIoTime() { - return _connection.getLastIoTime(); + return _underlyingConnection.getLastIoTime(); } @Override public int getSessionCount() { - return _connection.getSessionModels().size(); + return _underlyingConnection.getSessionModels().size(); } @Override @@ -223,4 +234,14 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection { // SessionAdapter installs delete task to cause session model object to delete } + + private void closeUnderlyingConnection() + { + if (_underlyingClosed.compareAndSet(false, true)) + { + _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask); + _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + } + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 1d981beb54..479029093f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1727,13 +1727,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } - private QueueRunner _queueRunner = new QueueRunner(this); + private final QueueRunner _queueRunner = new QueueRunner(this); public void deliverAsync() { _stateChangeCount.incrementAndGet(); - _queueRunner.execute(_asyncDelivery); + _queueRunner.execute(); } @@ -1746,7 +1746,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { SubFlushRunner flusher = sub.getRunner(); - flusher.execute(_asyncDelivery); + flusher.execute(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index b4fb2aab9f..30f6fa66c6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -114,12 +114,12 @@ public class QueueRunner implements Runnable return "QueueRunner-" + _queue.getLogSubject(); } - public void execute(Executor executor) + public void execute() { _stateChange.set(true); if(_scheduled.compareAndSet(IDLE, SCHEDULED)) { - executor.execute(this); + _queue.execute(this); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index b4f0fde1f7..910727ce42 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -112,12 +112,12 @@ class SubFlushRunner implements Runnable return "SubFlushRunner-" + _sub.toLogString(); } - public void execute(Executor executor) + public void execute() { _stateChange.set(true); if(_scheduled.compareAndSet(IDLE,SCHEDULED)) { - executor.execute(this); + getQueue().execute(this); } } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index d12f0bc5f9..70fd95a608 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -31,14 +31,19 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.security.AccessControlException; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.mockito.ArgumentMatcher; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -152,6 +157,57 @@ public class VirtualHostTest extends QpidTestCase verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType())); } + public void testStopVirtualHost_ClosesConnections() + { + String virtualHostName = getName(); + + VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName); + assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); + + AMQConnectionModel connection = createMockProtocolConnection(virtualHost); + + assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getChildren(Connection.class).size()); + + ((RegistryChangeListener)virtualHost).connectionRegistered(connection); + + assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getChildren( + Connection.class).size()); + + virtualHost.stop(); + assertEquals("Unexpected state", State.STOPPED, virtualHost.getState()); + + assertEquals("Unexpected number of connections after virtualhost stopped", + 0, + virtualHost.getChildren(Connection.class).size()); + + verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + } + + public void testDeleteVirtualHost_ClosesConnections() + { + String virtualHostName = getName(); + + VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName); + assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); + + AMQConnectionModel connection = createMockProtocolConnection(virtualHost); + + assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getChildren(Connection.class).size()); + + ((RegistryChangeListener)virtualHost).connectionRegistered(connection); + + assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getChildren(Connection.class).size()); + + virtualHost.delete(); + assertEquals("Unexpected state", State.DELETED, virtualHost.getState()); + + assertEquals("Unexpected number of connections after virtualhost deleted", + 0, + virtualHost.getChildren(Connection.class).size()); + + verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + } + public void testCreateDurableQueue() { String virtualHostName = getName(); @@ -273,6 +329,14 @@ public class VirtualHostTest extends QpidTestCase return host; } + private AMQConnectionModel createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost) + { + final AMQConnectionModel connection = mock(AMQConnectionModel.class); + when(connection.getVirtualHost()).thenReturn(virtualHost); + when(connection.getRemoteAddressString()).thenReturn("peer:1234"); + return connection; + } + private static ConfiguredObjectRecord matchesRecord(UUID id, String type) { return argThat(new MinimalConfiguredObjectRecordMatcher(id, type)); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java index b7874ee85e..58f1bfe372 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.test.unit.client.channelclose; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQTopic; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; @@ -34,22 +34,12 @@ public class CloseWithBlockingReceiveTest extends QpidBrokerTestCase { - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - public void testReceiveReturnsNull() throws Exception { - final AMQConnection connection = (AMQConnection) getConnection("guest", "guest"); + final Connection connection = getConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(new AMQTopic(connection, "banana")); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); connection.start(); Runnable r = new Runnable() @@ -68,14 +58,16 @@ public class CloseWithBlockingReceiveTest extends QpidBrokerTestCase } }; long startTime = System.currentTimeMillis(); - new Thread(r).start(); - consumer.receive(10000); - assertTrue(System.currentTimeMillis() - startTime < 10000); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class); + Thread thread = new Thread(r); + thread.start(); + try + { + consumer.receive(10000); + assertTrue(System.currentTimeMillis() - startTime < 10000); + } + finally + { + thread.join(); + } } - } |
