summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java73
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java38
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java105
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java88
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java49
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java23
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java173
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java81
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java183
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java128
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java34
16 files changed, 902 insertions, 164 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 41d7f6c067..8ad2ace1b2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -34,7 +34,6 @@ import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.common.FixedSizeByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
@@ -421,7 +420,8 @@ public class Main
bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
}
- acceptor.bind(bindAddress, handler, sconfig);
+ bind(acceptor, bindAddress, handler, sconfig);
+
//fixme qpid.AMQP should be using qpidproperties to get value
_brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
}
@@ -432,7 +432,8 @@ public class Main
try
{
- acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+ bind(acceptor, new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+
//fixme qpid.AMQP should be using qpidproperties to get value
_brokerLogger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
@@ -455,6 +456,23 @@ public class Main
}
}
+ /**
+ * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
+ *
+ * @param acceptor
+ * @param bindAddress
+ * @param handler
+ * @param sconfig
+ *
+ * @throws IOException from the acceptor.bind command
+ */
+ private void bind(IoAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
+ {
+ acceptor.bind(bindAddress, handler, sconfig);
+
+ ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
+ }
+
public static void main(String[] args)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
new file mode 100644
index 0000000000..d287595e2d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.connection;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.protocol.AMQConstant;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.List;
+
+public class ConnectionRegistry implements IConnectionRegistry
+{
+ private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>();
+
+ private VirtualHost _virtualHost;
+
+ public ConnectionRegistry(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public void initialise()
+ {
+
+ }
+
+ /** Close all of the currently open connections. */
+ public void close() throws AMQException
+ {
+ while (!_registry.isEmpty())
+ {
+ AMQProtocolSession connection = _registry.get(0);
+
+ connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down",
+ 0, 0,
+ connection.getProtocolOutputConverter().getProtocolMajorVersion(),
+ connection.getProtocolOutputConverter().getProtocolMinorVersion(),
+ (Throwable) null), true);
+ }
+ }
+
+ public void registerConnection(AMQProtocolSession connnection)
+ {
+ _registry.add(connnection);
+ }
+
+ public void deregisterConnection(AMQProtocolSession connnection)
+ {
+ _registry.remove(connnection);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
new file mode 100644
index 0000000000..d64fde1c20
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.connection;
+
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+
+public interface IConnectionRegistry
+{
+
+ public void initialise();
+
+ public void close() throws AMQException;
+
+ public void registerConnection(AMQProtocolSession connnection);
+
+ public void deregisterConnection(AMQProtocolSession connnection);
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index bdb16d0fcb..b4075b81ac 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -25,6 +25,7 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.CloseFuture;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
@@ -99,7 +100,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private Object _lastSent;
- private boolean _closed;
+ protected boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
@@ -115,13 +116,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
+ private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
+ private org.apache.mina.common.WriteFuture _lastWriteFuture;
+
public ManagedObject getManagedObject()
{
return _managedObject;
}
public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
- throws AMQException
+ throws AMQException
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
@@ -145,7 +149,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
- AMQStateManager stateManager) throws AMQException
+ AMQStateManager stateManager) throws AMQException
{
_stateManager = stateManager;
_minaProtocolSession = session;
@@ -199,7 +203,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
private void frameReceived(AMQFrame frame) throws AMQException
- {
+ {
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
@@ -222,15 +226,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
if (_logger.isInfoEnabled())
{
- _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
+ _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
}
+ closeProtocolSession();
return;
}
}
-
-
try
{
body.handle(channelId, this);
@@ -258,7 +261,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
String locales = "en_US";
-
AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
(short) getProtocolMinorVersion(),
null,
@@ -266,7 +268,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
locales.getBytes());
_minaProtocolSession.write(responseBody.generateFrame(0));
-
}
catch (AMQException e)
{
@@ -332,27 +333,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_logger.info("Closing connection due to: " + e.getMessage());
}
- closeSession();
-
AMQConnectionException ce =
- evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
- AMQConstant.CHANNEL_ERROR.getName().toString());
+ evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString());
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(ce.getCloseFrame(channelId));
+ closeConnection(channelId, ce, false);
}
}
catch (AMQConnectionException e)
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing connection due to: " + e.getMessage());
- }
-
- markChannelAwaitingCloseOk(channelId);
- closeSession();
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
+ closeConnection(channelId, e, false);
}
}
catch (Exception e)
@@ -365,7 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_logger.error("Unexpected exception while processing frame. Closing connection.", e);
- _minaProtocolSession.close();
+ closeProtocolSession();
}
}
@@ -399,7 +389,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
- _minaProtocolSession.write(frame);
+
+ _lastWriteFuture = _minaProtocolSession.write(frame);
}
public AMQShortString getContextKey()
@@ -431,7 +422,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public AMQChannel getChannel(int channelId) throws AMQException
{
final AMQChannel channel =
- ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+ ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
if ((channel == null) || channel.isClosing())
{
return null;
@@ -464,8 +455,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
if (_channelMap.size() == _maxNoOfChannels)
{
String errorMessage =
- toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
- + "); can't create channel";
+ toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+ + "); can't create channel";
_logger.error(errorMessage);
throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
}
@@ -619,6 +610,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
if (!_closed)
{
_closed = true;
+
+ if (_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
+
closeAllChannels();
if (_managedObject != null)
{
@@ -632,9 +629,54 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
+ public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e.getMessage());
+ }
+
+ markChannelAwaitingCloseOk(channelId);
+ closeSession();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame(channelId));
+
+ if (closeProtocolSession)
+ {
+ closeProtocolSession();
+ }
+ }
+
+ public void closeProtocolSession()
+ {
+ closeProtocolSession(true);
+ }
+
+ public void closeProtocolSession(boolean waitLast)
+ {
+ _logger.debug("Waiting for last write to join.");
+ if (waitLast && (_lastWriteFuture != null))
+ {
+ _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ }
+
+ _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
+ final CloseFuture future = _minaProtocolSession.close();
+ future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+
+ try
+ {
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+ catch (AMQException e)
+ {
+ _logger.info(e.getMessage());
+ }
+ }
+
public String toString()
{
- return _minaProtocolSession.getRemoteAddress() + "("+(getAuthorizedID() == null ? "?" : getAuthorizedID().getName()+")");
+ return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
}
public String dump()
@@ -752,6 +794,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void setVirtualHost(VirtualHost virtualHost) throws AMQException
{
_virtualHost = virtualHost;
+
+ _virtualHost.getConnectionRegistry().registerConnection(this);
+
_managedObject = createMBean();
_managedObject.register();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index c3400029da..1bac601225 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -25,6 +25,7 @@ import javax.security.sasl.SaslServer;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -150,6 +151,10 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
void closeSession() throws AMQException;
+ /** This must be called to close the session in order to free up any resources managed by the session. */
+ void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
+
+
/** @return a key that uniquely identifies this session */
Object getKey();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index d0dcd051f0..863f837462 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -24,9 +24,17 @@ import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.mina.common.IoAcceptor;
import java.util.HashMap;
import java.util.Map;
+import java.net.InetSocketAddress;
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -48,6 +56,21 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public static final String DEFAULT_APPLICATION_REGISTRY = "org.apache.qpid.server.util.NullApplicationRegistry";
public static String _APPLICATION_REGISTRY = DEFAULT_APPLICATION_REGISTRY;
+ protected final Map<InetSocketAddress, IoAcceptor> _acceptors = new HashMap<InetSocketAddress, IoAcceptor>();
+
+ protected ManagedObjectRegistry _managedObjectRegistry;
+
+ protected AuthenticationManager _authenticationManager;
+
+ protected VirtualHostRegistry _virtualHostRegistry;
+
+ protected ACLPlugin _accessManager;
+
+ protected PrincipalDatabaseManager _databaseManager;
+
+ protected PluginManager _pluginManager;
+
+
static
{
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -57,7 +80,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
public void run()
{
- _logger.info("Shutting down application registries...");
removeAll();
}
}
@@ -90,6 +112,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
}
+ /**
+ * Method to cleanly shutdown specified registry running in this JVM
+ *
+ * @param instanceID the instance to shutdown
+ */
+
public static void remove(int instanceID)
{
try
@@ -111,8 +139,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
}
+ /** Method to cleanly shutdown all registries currently running in this JVM */
public static void removeAll()
- {
+ {
Object[] keys = _instanceMap.keySet().toArray();
for (Object k : keys)
{
@@ -162,6 +191,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public void close() throws Exception
{
+ //Stop incomming connections
+ unbind();
+
+ //Shutdown virtualhosts
for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
{
virtualHost.close();
@@ -174,11 +207,31 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
}
+ private void unbind()
+ {
+ synchronized (_acceptors)
+ {
+ for (InetSocketAddress bindAddress : _acceptors.keySet())
+ {
+ IoAcceptor acceptor = _acceptors.get(bindAddress);
+ acceptor.unbind(bindAddress);
+ }
+ }
+ }
+
public Configuration getConfiguration()
{
return _configuration;
}
+ public void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor)
+ {
+ synchronized (_acceptors)
+ {
+ _acceptors.put(bindAddress, acceptor);
+ }
+ }
+
public <T> T getConfiguredObject(Class<T> instanceType)
{
T instance = (T) _configuredObjects.get(instanceType);
@@ -204,4 +257,35 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
_APPLICATION_REGISTRY = clazz;
}
+
+ public VirtualHostRegistry getVirtualHostRegistry()
+ {
+ return _virtualHostRegistry;
+ }
+
+ public ACLPlugin getAccessManager()
+ {
+ return _accessManager;
+ }
+
+ public ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return _managedObjectRegistry;
+ }
+
+ public PrincipalDatabaseManager getDatabaseManager()
+ {
+ return _databaseManager;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public PluginManager getPluginManager()
+ {
+ return _pluginManager;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index fef958000a..baab56a4a4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -48,18 +48,6 @@ import org.apache.qpid.AMQException;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
- private ManagedObjectRegistry _managedObjectRegistry;
-
- private AuthenticationManager _authenticationManager;
-
- private ACLPlugin _accessManager;
-
- private PrincipalDatabaseManager _databaseManager;
-
- private VirtualHostRegistry _virtualHostRegistry;
-
- private PluginManager _pluginManager;
-
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
@@ -145,39 +133,9 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
}
}
-
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return _virtualHostRegistry;
- }
-
- public ACLPlugin getAccessManager()
- {
- return _accessManager;
- }
-
- public ManagedObjectRegistry getManagedObjectRegistry()
- {
- return _managedObjectRegistry;
- }
-
- public PrincipalDatabaseManager getDatabaseManager()
- {
- return _databaseManager;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
public Collection<String> getVirtualHostNames()
{
return getConfiguration().getList("virtualhosts.virtualhost.name");
}
- public PluginManager getPluginManager()
- {
- return _pluginManager;
- }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index ca10fbdba2..597ef042f9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.registry;
import java.util.Collection;
+import java.net.InetSocketAddress;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.ManagedObjectRegistry;
@@ -29,6 +30,7 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.mina.common.IoAcceptor;
public interface IApplicationRegistry
{
@@ -39,6 +41,10 @@ public interface IApplicationRegistry
*/
void initialise() throws Exception;
+ /**
+ * Shutdown this Registry
+ * @throws Exception - //fixme needs to be made more specific
+ */
void close() throws Exception;
/**
@@ -71,5 +77,12 @@ public interface IApplicationRegistry
ACLPlugin getAccessManager();
PluginManager getPluginManager();
-
+
+ /**
+ * Register any acceptors for this registry
+ * @param bindAddress The address that the acceptor has been bound with
+ * @param acceptor The acceptor in use
+ */
+ void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor);
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index 0acfa84f31..87aa15be84 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -42,19 +42,6 @@ import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public class NullApplicationRegistry extends ApplicationRegistry
{
- private ManagedObjectRegistry _managedObjectRegistry;
-
- private AuthenticationManager _authenticationManager;
-
- private VirtualHostRegistry _virtualHostRegistry;
-
- private ACLPlugin _accessManager;
-
- private PrincipalDatabaseManager _databaseManager;
-
- private PluginManager _pluginManager;
-
-
public NullApplicationRegistry()
{
super(new MapConfiguration(new HashMap()));
@@ -84,47 +71,11 @@ public class NullApplicationRegistry extends ApplicationRegistry
}
- public Configuration getConfiguration()
- {
- return _configuration;
- }
-
-
- public ManagedObjectRegistry getManagedObjectRegistry()
- {
- return _managedObjectRegistry;
- }
-
- public PrincipalDatabaseManager getDatabaseManager()
- {
- return _databaseManager;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
public Collection<String> getVirtualHostNames()
{
String[] hosts = {"test"};
return Arrays.asList(hosts);
}
-
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return _virtualHostRegistry;
- }
-
- public ACLPlugin getAccessManager()
- {
- return _accessManager;
- }
-
- public PluginManager getPluginManager()
- {
- return _pluginManager;
- }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index f69751b708..977bd84491 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -26,6 +26,8 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.connection.ConnectionRegistry;
+import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
@@ -55,6 +57,8 @@ public class VirtualHost implements Accessable
private final String _name;
+ private ConnectionRegistry _connectionRegistry;
+
private QueueRegistry _queueRegistry;
private ExchangeRegistry _exchangeRegistry;
@@ -74,7 +78,8 @@ public class VirtualHost implements Accessable
private final Timer _houseKeepingTimer;
private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
-
+
+
public void setAccessableName(String name)
{
_logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
@@ -86,6 +91,10 @@ public class VirtualHost implements Accessable
return _name;
}
+ public IConnectionRegistry getConnectionRegistry()
+ {
+ return _connectionRegistry;
+ }
/**
* Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
@@ -143,8 +152,8 @@ public class VirtualHost implements Accessable
_name = name;
_virtualHostMBean = new VirtualHostMBean();
- // This isn't needed to be registered
- //_virtualHostMBean.register();
+
+ _connectionRegistry = new ConnectionRegistry(this);
_houseKeepingTimer = new Timer("Queue-housekeeping-"+name, true);
_queueRegistry = new DefaultQueueRegistry(this);
@@ -283,14 +292,20 @@ public class VirtualHost implements Accessable
public ACLPlugin getAccessManager()
{
return _accessManager;
- }
+ }
public void close() throws Exception
{
+ //Stop Housekeeping
if (_houseKeepingTimer != null)
{
_houseKeepingTimer.cancel();
}
+
+ //Stop Connections
+ _connectionRegistry.close();
+
+ //Close MessageStore
if (_messageStore != null)
{
_messageStore.close();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
new file mode 100644
index 0000000000..51012bc776
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.AMQChannel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
+{
+ // ChannelID(LIST) -> LinkedList<Pair>
+ final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
+ private AtomicInteger _deliveryCount = new AtomicInteger(0);
+
+ public InternalTestProtocolSession() throws AMQException
+ {
+ super(new TestIoSession(),
+ ApplicationRegistry.getInstance().getVirtualHostRegistry(),
+ new AMQCodecFactory(true));
+
+ _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+
+ }
+
+ public ProtocolOutputConverter getProtocolOutputConverter()
+ {
+ return this;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return (byte) 8;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return (byte) 0;
+ }
+
+ // ***
+
+ public List<DeliveryPair> getDelivers(int channelId, AMQShortString consumerTag, int count)
+ {
+ synchronized (_channelDelivers)
+ {
+ List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count);
+
+ List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
+
+ //Remove the msgs from the receivedList.
+ msgs.clear();
+
+ return response;
+ }
+ }
+
+ // *** ProtocolOutputConverter Implementation
+ public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ {
+ }
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+ }
+
+ public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+ {
+ _deliveryCount.incrementAndGet();
+
+ synchronized (_channelDelivers)
+ {
+ Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
+
+ if (consumers == null)
+ {
+ consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ _channelDelivers.put(channelId, consumers);
+ }
+
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
+
+ if (consumerDelivers == null)
+ {
+ consumerDelivers = new LinkedList<DeliveryPair>();
+ consumers.put(consumerTag, consumerDelivers);
+ }
+
+ consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+ }
+ }
+
+ public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ }
+
+ public void awaitDelivery(int msgs)
+ {
+ while (msgs > _deliveryCount.get())
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public class DeliveryPair
+ {
+ private long _deliveryTag;
+ private AMQMessage _message;
+
+ public DeliveryPair(long deliveryTag, AMQMessage message)
+ {
+ _deliveryTag = deliveryTag;
+ _message = message;
+ }
+
+ public AMQMessage getMessage()
+ {
+ return _message;
+ }
+
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+ }
+
+ public boolean isClosed()
+ {
+ return _closed;
+ }
+
+ public void closeProtocolSession(boolean waitLast)
+ {
+ // Override as we don't have a real IOSession to close.
+ // The alternative is to fully implement the TestIOSession to return a CloseFuture from close();
+ // Then the AMQMinaProtocolSession can join on the returning future without a NPE.
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java
new file mode 100644
index 0000000000..a695a67eea
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+import java.util.List;
+
+public class MessageStoreShutdownTest extends InternalBrokerBaseCase
+{
+
+ public void test()
+ {
+ subscribe(_session, _channel, _queue);
+
+ try
+ {
+ publishMessages(_session, _channel, 1);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ fail(e.getMessage());
+ }
+
+ try
+ {
+ _registry.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ fail(e.getMessage());
+ }
+
+ assertTrue("Session should now be closed", _session.isClosed());
+
+
+ //Test attempting to modify the broker state after session has been closed.
+
+ //The Message should have been removed from the unacked list.
+
+ //Ack Messages
+ List<InternalTestProtocolSession.DeliveryPair> list = _session.getDelivers(_channel.getChannelId(), new AMQShortString("sgen_1"), 1);
+
+ InternalTestProtocolSession.DeliveryPair pair = list.get(0);
+
+ try
+ {
+ // The message should now be requeued and so unable to ack it.
+ _channel.acknowledgeMessage(pair.getDeliveryTag(), false);
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Incorrect exception thrown", "Single ack on delivery tag 1 not known for channel:1", e.getMessage());
+ }
+
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
new file mode 100644
index 0000000000..c6cd5da01d
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+public class InternalBrokerBaseCase extends TestCase
+{
+ protected IApplicationRegistry _registry;
+ protected MessageStore _messageStore;
+ protected AMQChannel _channel;
+ protected InternalTestProtocolSession _session;
+ protected VirtualHost _virtualHost;
+ protected StoreContext _storeContext = new StoreContext();
+ protected AMQQueue _queue;
+ protected AMQShortString QUEUE_NAME;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _registry = new TestApplicationRegistry();
+ ApplicationRegistry.initialise(_registry);
+ _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = _virtualHost.getMessageStore();
+
+ QUEUE_NAME = new AMQShortString("test");
+ _queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"),
+ false, _virtualHost, null);
+
+ _virtualHost.getQueueRegistry().registerQueue(_queue);
+
+ Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
+ _queue.bind(defaultExchange, QUEUE_NAME, null);
+
+ _session = new InternalTestProtocolSession();
+
+ _session.setVirtualHost(_virtualHost);
+
+ _channel = new AMQChannel(_session, 1, _messageStore);
+
+ _session.addChannel(_channel);
+ }
+
+ public void tearDown() throws Exception
+ {
+ ApplicationRegistry.removeAll();
+ super.tearDown();
+ }
+
+ protected void checkStoreContents(int messageCount)
+ {
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size());
+
+ //The above publish message is sufficiently small not to fit in the header so no Body is required.
+ //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());
+ }
+
+ protected AMQShortString subscribe(InternalTestProtocolSession session, AMQChannel channel, AMQQueue queue)
+ {
+ try
+ {
+ return channel.subscribeToQueue(null, queue, true, null, false, true);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ catch (ConsumerTagNotUniqueException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ //Keep the compiler happy
+ return null;
+ }
+
+ public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException
+ {
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+ public AMQShortString getExchange()
+ {
+ return ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return QUEUE_NAME;
+ }
+ };
+
+ for (int count = 0; count < messages; count++)
+ {
+ channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange()));
+
+ //Set the body size
+ ContentHeaderBody _headerBody = new ContentHeaderBody();
+ _headerBody.bodySize = 0;
+
+ //Set Minimum properties
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+
+ properties.setExpiration(0L);
+ properties.setTimestamp(System.currentTimeMillis());
+
+ //Make Message Persistent
+ properties.setDeliveryMode((byte) 2);
+
+ _headerBody.properties = properties;
+
+ channel.publishContentHeader(_headerBody);
+ }
+
+ }
+
+ public void acknowledge(AMQChannel channel, long deliveryTag)
+ {
+ try
+ {
+ channel.acknowledgeMessage(deliveryTag, false);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
new file mode 100644
index 0000000000..471912c85a
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.Arrays;
+
+public class TestApplicationRegistry extends ApplicationRegistry
+{
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ private VirtualHost _vHost;
+
+
+ public TestApplicationRegistry()
+ {
+ super(new MapConfiguration(new HashMap()));
+ }
+
+ public void initialise() throws Exception
+ {
+ Properties users = new Properties();
+
+ users.put("guest", "guest");
+
+ _databaseManager = new PropertiesPrincipalDatabaseManager("default", users);
+
+ _accessManager = new AllowAll();
+
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
+
+ _managedObjectRegistry = new NoopManagedObjectRegistry();
+
+ _messageStore = new TestableMemoryMessageStore();
+
+ _virtualHostRegistry = new VirtualHostRegistry();
+
+ _vHost = new VirtualHost("test", _messageStore);
+
+ _virtualHostRegistry.registerVirtualHost(_vHost);
+
+ _queueRegistry = _vHost.getQueueRegistry();
+ _exchangeFactory = _vHost.getExchangeFactory();
+ _exchangeRegistry = _vHost.getExchangeRegistry();
+
+ _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public Collection<String> getVirtualHostNames()
+ {
+ String[] hosts = {"test"};
+ return Arrays.asList(hosts);
+ }
+
+ public void setAccessManager(ACLPlugin newManager)
+ {
+ _accessManager = newManager;
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+}
+
+
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index a1a405c313..14020299f6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -117,6 +118,10 @@ public class MockProtocolSession implements AMQProtocolSession
{
}
+ public void closeConnection(int channelId, AMQConnectionException e, boolean closeIoSession) throws AMQException
+ {
+ }
+
public Object getKey()
{
return null;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 83b4665be6..6b5ab632b0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -52,15 +52,8 @@ public class TestApplicationRegistry extends ApplicationRegistry
private ExchangeFactory _exchangeFactory;
- private ManagedObjectRegistry _managedObjectRegistry;
-
- private ACLPlugin _accessManager;
-
- private PrincipalDatabaseManager _databaseManager;
-
- private AuthenticationManager _authenticationManager;
-
private MessageStore _messageStore;
+
private VirtualHost _vHost;
public TestApplicationRegistry()
@@ -92,11 +85,6 @@ public class TestApplicationRegistry extends ApplicationRegistry
_configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
}
- public Configuration getConfiguration()
- {
- return _configuration;
- }
-
public QueueRegistry getQueueRegistry()
{
return _queueRegistry;
@@ -112,21 +100,6 @@ public class TestApplicationRegistry extends ApplicationRegistry
return _exchangeFactory;
}
- public ManagedObjectRegistry getManagedObjectRegistry()
- {
- return _managedObjectRegistry;
- }
-
- public PrincipalDatabaseManager getDatabaseManager()
- {
- return _databaseManager;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
public Collection<String> getVirtualHostNames()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -137,11 +110,6 @@ public class TestApplicationRegistry extends ApplicationRegistry
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public ACLPlugin getAccessManager()
- {
- return _accessManager;
- }
-
public void setAccessManager(ACLPlugin newManager)
{
_accessManager = newManager;