diff options
Diffstat (limited to 'qpid/java')
16 files changed, 902 insertions, 164 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 41d7f6c067..8ad2ace1b2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -34,7 +34,6 @@ import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.common.FixedSizeByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; @@ -421,7 +420,8 @@ public class Main bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port); } - acceptor.bind(bindAddress, handler, sconfig); + bind(acceptor, bindAddress, handler, sconfig); + //fixme qpid.AMQP should be using qpidproperties to get value _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); } @@ -432,7 +432,8 @@ public class Main try { - acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); + bind(acceptor, new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); + //fixme qpid.AMQP should be using qpidproperties to get value _brokerLogger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); @@ -455,6 +456,23 @@ public class Main } } + /** + * Ensure that any bound Acceptors are recorded in the registry so they can be closed later. + * + * @param acceptor + * @param bindAddress + * @param handler + * @param sconfig + * + * @throws IOException from the acceptor.bind command + */ + private void bind(IoAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException + { + acceptor.bind(bindAddress, handler, sconfig); + + ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor); + } + public static void main(String[] args) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java new file mode 100644 index 0000000000..d287595e2d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.connection; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.protocol.AMQConstant; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.List; + +public class ConnectionRegistry implements IConnectionRegistry +{ + private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>(); + + private VirtualHost _virtualHost; + + public ConnectionRegistry(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } + + public void initialise() + { + + } + + /** Close all of the currently open connections. */ + public void close() throws AMQException + { + while (!_registry.isEmpty()) + { + AMQProtocolSession connection = _registry.get(0); + + connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down", + 0, 0, + connection.getProtocolOutputConverter().getProtocolMajorVersion(), + connection.getProtocolOutputConverter().getProtocolMinorVersion(), + (Throwable) null), true); + } + } + + public void registerConnection(AMQProtocolSession connnection) + { + _registry.add(connnection); + } + + public void deregisterConnection(AMQProtocolSession connnection) + { + _registry.remove(connnection); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java new file mode 100644 index 0000000000..d64fde1c20 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.connection; + +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.AMQException; + +public interface IConnectionRegistry +{ + + public void initialise(); + + public void close() throws AMQException; + + public void registerConnection(AMQProtocolSession connnection); + + public void deregisterConnection(AMQProtocolSession connnection); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index bdb16d0fcb..b4075b81ac 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -25,6 +25,7 @@ import org.apache.log4j.Logger; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; +import org.apache.mina.common.CloseFuture; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; @@ -99,7 +100,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private Object _lastSent; - private boolean _closed; + protected boolean _closed; // maximum number of channels this session should have private long _maxNoOfChannels = 1000; @@ -115,13 +116,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private MethodDispatcher _dispatcher; private ProtocolSessionIdentifier _sessionIdentifier; + private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L; + private org.apache.mina.common.WriteFuture _lastWriteFuture; + public ManagedObject getManagedObject() { return _managedObject; } public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) - throws AMQException + throws AMQException { _stateManager = new AMQStateManager(virtualHostRegistry, this); _minaProtocolSession = session; @@ -145,7 +149,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, - AMQStateManager stateManager) throws AMQException + AMQStateManager stateManager) throws AMQException { _stateManager = stateManager; _minaProtocolSession = session; @@ -199,7 +203,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } private void frameReceived(AMQFrame frame) throws AMQException - { + { int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); @@ -222,15 +226,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { if (_logger.isInfoEnabled()) { - _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); + _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); } + closeProtocolSession(); return; } } - - try { body.handle(channelId, this); @@ -258,7 +261,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable String locales = "en_US"; - AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), (short) getProtocolMinorVersion(), null, @@ -266,7 +268,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable locales.getBytes()); _minaProtocolSession.write(responseBody.generateFrame(0)); - } catch (AMQException e) { @@ -332,27 +333,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _logger.info("Closing connection due to: " + e.getMessage()); } - closeSession(); - AMQConnectionException ce = - evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); + evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(ce.getCloseFrame(channelId)); + closeConnection(channelId, ce, false); } } catch (AMQConnectionException e) { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e.getMessage()); - } - - markChannelAwaitingCloseOk(channelId); - closeSession(); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); + closeConnection(channelId, e, false); } } catch (Exception e) @@ -365,7 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _logger.error("Unexpected exception while processing frame. Closing connection.", e); - _minaProtocolSession.close(); + closeProtocolSession(); } } @@ -399,7 +389,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void writeFrame(AMQDataBlock frame) { _lastSent = frame; - _minaProtocolSession.write(frame); + + _lastWriteFuture = _minaProtocolSession.write(frame); } public AMQShortString getContextKey() @@ -431,7 +422,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public AMQChannel getChannel(int channelId) throws AMQException { final AMQChannel channel = - ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); + ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); if ((channel == null) || channel.isClosing()) { return null; @@ -464,8 +455,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable if (_channelMap.size() == _maxNoOfChannels) { String errorMessage = - toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels - + "); can't create channel"; + toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + + "); can't create channel"; _logger.error(errorMessage); throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); } @@ -619,6 +610,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable if (!_closed) { _closed = true; + + if (_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } + closeAllChannels(); if (_managedObject != null) { @@ -632,9 +629,54 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } + public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } + + markChannelAwaitingCloseOk(channelId); + closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame(channelId)); + + if (closeProtocolSession) + { + closeProtocolSession(); + } + } + + public void closeProtocolSession() + { + closeProtocolSession(true); + } + + public void closeProtocolSession(boolean waitLast) + { + _logger.debug("Waiting for last write to join."); + if (waitLast && (_lastWriteFuture != null)) + { + _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + } + + _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession); + final CloseFuture future = _minaProtocolSession.close(); + future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + + try + { + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + } + catch (AMQException e) + { + _logger.info(e.getMessage()); + } + } + public String toString() { - return _minaProtocolSession.getRemoteAddress() + "("+(getAuthorizedID() == null ? "?" : getAuthorizedID().getName()+")"); + return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); } public String dump() @@ -752,6 +794,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void setVirtualHost(VirtualHost virtualHost) throws AMQException { _virtualHost = virtualHost; + + _virtualHost.getConnectionRegistry().registerConnection(this); + _managedObject = createMBean(); _managedObject.register(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index c3400029da..1bac601225 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -25,6 +25,7 @@ import javax.security.sasl.SaslServer; import org.apache.qpid.AMQException; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.*; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -150,6 +151,10 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession /** This must be called when the session is _closed in order to free up any resources managed by the session. */ void closeSession() throws AMQException; + /** This must be called to close the session in order to free up any resources managed by the session. */ + void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException; + + /** @return a key that uniquely identifies this session */ Object getKey(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index d0dcd051f0..863f837462 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -24,9 +24,17 @@ import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.management.ManagedObjectRegistry; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.mina.common.IoAcceptor; import java.util.HashMap; import java.util.Map; +import java.net.InetSocketAddress; /** * An abstract application registry that provides access to configuration information and handles the @@ -48,6 +56,21 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public static final String DEFAULT_APPLICATION_REGISTRY = "org.apache.qpid.server.util.NullApplicationRegistry"; public static String _APPLICATION_REGISTRY = DEFAULT_APPLICATION_REGISTRY; + protected final Map<InetSocketAddress, IoAcceptor> _acceptors = new HashMap<InetSocketAddress, IoAcceptor>(); + + protected ManagedObjectRegistry _managedObjectRegistry; + + protected AuthenticationManager _authenticationManager; + + protected VirtualHostRegistry _virtualHostRegistry; + + protected ACLPlugin _accessManager; + + protected PrincipalDatabaseManager _databaseManager; + + protected PluginManager _pluginManager; + + static { Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService())); @@ -57,7 +80,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { public void run() { - _logger.info("Shutting down application registries..."); removeAll(); } } @@ -90,6 +112,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + /** + * Method to cleanly shutdown specified registry running in this JVM + * + * @param instanceID the instance to shutdown + */ + public static void remove(int instanceID) { try @@ -111,8 +139,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + /** Method to cleanly shutdown all registries currently running in this JVM */ public static void removeAll() - { + { Object[] keys = _instanceMap.keySet().toArray(); for (Object k : keys) { @@ -162,6 +191,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public void close() throws Exception { + //Stop incomming connections + unbind(); + + //Shutdown virtualhosts for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) { virtualHost.close(); @@ -174,11 +207,31 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + private void unbind() + { + synchronized (_acceptors) + { + for (InetSocketAddress bindAddress : _acceptors.keySet()) + { + IoAcceptor acceptor = _acceptors.get(bindAddress); + acceptor.unbind(bindAddress); + } + } + } + public Configuration getConfiguration() { return _configuration; } + public void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor) + { + synchronized (_acceptors) + { + _acceptors.put(bindAddress, acceptor); + } + } + public <T> T getConfiguredObject(Class<T> instanceType) { T instance = (T) _configuredObjects.get(instanceType); @@ -204,4 +257,35 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _APPLICATION_REGISTRY = clazz; } + + public VirtualHostRegistry getVirtualHostRegistry() + { + return _virtualHostRegistry; + } + + public ACLPlugin getAccessManager() + { + return _accessManager; + } + + public ManagedObjectRegistry getManagedObjectRegistry() + { + return _managedObjectRegistry; + } + + public PrincipalDatabaseManager getDatabaseManager() + { + return _databaseManager; + } + + public AuthenticationManager getAuthenticationManager() + { + return _authenticationManager; + } + + public PluginManager getPluginManager() + { + return _pluginManager; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index fef958000a..baab56a4a4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -48,18 +48,6 @@ import org.apache.qpid.AMQException; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { - private ManagedObjectRegistry _managedObjectRegistry; - - private AuthenticationManager _authenticationManager; - - private ACLPlugin _accessManager; - - private PrincipalDatabaseManager _databaseManager; - - private VirtualHostRegistry _virtualHostRegistry; - - private PluginManager _pluginManager; - public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException { @@ -145,39 +133,9 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry } } - - public VirtualHostRegistry getVirtualHostRegistry() - { - return _virtualHostRegistry; - } - - public ACLPlugin getAccessManager() - { - return _accessManager; - } - - public ManagedObjectRegistry getManagedObjectRegistry() - { - return _managedObjectRegistry; - } - - public PrincipalDatabaseManager getDatabaseManager() - { - return _databaseManager; - } - - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - public Collection<String> getVirtualHostNames() { return getConfiguration().getList("virtualhosts.virtualhost.name"); } - public PluginManager getPluginManager() - { - return _pluginManager; - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index ca10fbdba2..597ef042f9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.registry; import java.util.Collection; +import java.net.InetSocketAddress; import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.management.ManagedObjectRegistry; @@ -29,6 +30,7 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.mina.common.IoAcceptor; public interface IApplicationRegistry { @@ -39,6 +41,10 @@ public interface IApplicationRegistry */ void initialise() throws Exception; + /** + * Shutdown this Registry + * @throws Exception - //fixme needs to be made more specific + */ void close() throws Exception; /** @@ -71,5 +77,12 @@ public interface IApplicationRegistry ACLPlugin getAccessManager(); PluginManager getPluginManager(); - + + /** + * Register any acceptors for this registry + * @param bindAddress The address that the acceptor has been bound with + * @param acceptor The acceptor in use + */ + void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 0acfa84f31..87aa15be84 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -42,19 +42,6 @@ import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class NullApplicationRegistry extends ApplicationRegistry { - private ManagedObjectRegistry _managedObjectRegistry; - - private AuthenticationManager _authenticationManager; - - private VirtualHostRegistry _virtualHostRegistry; - - private ACLPlugin _accessManager; - - private PrincipalDatabaseManager _databaseManager; - - private PluginManager _pluginManager; - - public NullApplicationRegistry() { super(new MapConfiguration(new HashMap())); @@ -84,47 +71,11 @@ public class NullApplicationRegistry extends ApplicationRegistry } - public Configuration getConfiguration() - { - return _configuration; - } - - - public ManagedObjectRegistry getManagedObjectRegistry() - { - return _managedObjectRegistry; - } - - public PrincipalDatabaseManager getDatabaseManager() - { - return _databaseManager; - } - - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - public Collection<String> getVirtualHostNames() { String[] hosts = {"test"}; return Arrays.asList(hosts); } - - public VirtualHostRegistry getVirtualHostRegistry() - { - return _virtualHostRegistry; - } - - public ACLPlugin getAccessManager() - { - return _accessManager; - } - - public PluginManager getPluginManager() - { - return _pluginManager; - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index f69751b708..977bd84491 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -26,6 +26,8 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.server.AMQBrokerManagerMBean; +import org.apache.qpid.server.connection.ConnectionRegistry; +import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.Accessable; @@ -55,6 +57,8 @@ public class VirtualHost implements Accessable private final String _name; + private ConnectionRegistry _connectionRegistry; + private QueueRegistry _queueRegistry; private ExchangeRegistry _exchangeRegistry; @@ -74,7 +78,8 @@ public class VirtualHost implements Accessable private final Timer _houseKeepingTimer; private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; - + + public void setAccessableName(String name) { _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" @@ -86,6 +91,10 @@ public class VirtualHost implements Accessable return _name; } + public IConnectionRegistry getConnectionRegistry() + { + return _connectionRegistry; + } /** * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any @@ -143,8 +152,8 @@ public class VirtualHost implements Accessable _name = name; _virtualHostMBean = new VirtualHostMBean(); - // This isn't needed to be registered - //_virtualHostMBean.register(); + + _connectionRegistry = new ConnectionRegistry(this); _houseKeepingTimer = new Timer("Queue-housekeeping-"+name, true); _queueRegistry = new DefaultQueueRegistry(this); @@ -283,14 +292,20 @@ public class VirtualHost implements Accessable public ACLPlugin getAccessManager() { return _accessManager; - } + } public void close() throws Exception { + //Stop Housekeeping if (_houseKeepingTimer != null) { _houseKeepingTimer.cancel(); } + + //Stop Connections + _connectionRegistry.close(); + + //Close MessageStore if (_messageStore != null) { _messageStore.close(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java new file mode 100644 index 0000000000..51012bc776 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -0,0 +1,173 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.AMQChannel; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter +{ + // ChannelID(LIST) -> LinkedList<Pair> + final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers; + private AtomicInteger _deliveryCount = new AtomicInteger(0); + + public InternalTestProtocolSession() throws AMQException + { + super(new TestIoSession(), + ApplicationRegistry.getInstance().getVirtualHostRegistry(), + new AMQCodecFactory(true)); + + _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); + + } + + public ProtocolOutputConverter getProtocolOutputConverter() + { + return this; + } + + public byte getProtocolMajorVersion() + { + return (byte) 8; + } + + public byte getProtocolMinorVersion() + { + return (byte) 0; + } + + // *** + + public List<DeliveryPair> getDelivers(int channelId, AMQShortString consumerTag, int count) + { + synchronized (_channelDelivers) + { + List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count); + + List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs); + + //Remove the msgs from the receivedList. + msgs.clear(); + + return response; + } + } + + // *** ProtocolOutputConverter Implementation + public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException + { + } + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + } + + public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException + { + _deliveryCount.incrementAndGet(); + + synchronized (_channelDelivers) + { + Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId); + + if (consumers == null) + { + consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); + _channelDelivers.put(channelId, consumers); + } + + LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag); + + if (consumerDelivers == null) + { + consumerDelivers = new LinkedList<DeliveryPair>(); + consumers.put(consumerTag, consumerDelivers); + } + + consumerDelivers.add(new DeliveryPair(deliveryTag, message)); + } + } + + public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException + { + } + + public void awaitDelivery(int msgs) + { + while (msgs > _deliveryCount.get()) + { + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + } + + public class DeliveryPair + { + private long _deliveryTag; + private AMQMessage _message; + + public DeliveryPair(long deliveryTag, AMQMessage message) + { + _deliveryTag = deliveryTag; + _message = message; + } + + public AMQMessage getMessage() + { + return _message; + } + + public long getDeliveryTag() + { + return _deliveryTag; + } + } + + public boolean isClosed() + { + return _closed; + } + + public void closeProtocolSession(boolean waitLast) + { + // Override as we don't have a real IOSession to close. + // The alternative is to fully implement the TestIOSession to return a CloseFuture from close(); + // Then the AMQMinaProtocolSession can join on the returning future without a NPE. + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java new file mode 100644 index 0000000000..a695a67eea --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; + +import java.util.List; + +public class MessageStoreShutdownTest extends InternalBrokerBaseCase +{ + + public void test() + { + subscribe(_session, _channel, _queue); + + try + { + publishMessages(_session, _channel, 1); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + fail(e.getMessage()); + } + + try + { + _registry.close(); + } + catch (Exception e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + fail(e.getMessage()); + } + + assertTrue("Session should now be closed", _session.isClosed()); + + + //Test attempting to modify the broker state after session has been closed. + + //The Message should have been removed from the unacked list. + + //Ack Messages + List<InternalTestProtocolSession.DeliveryPair> list = _session.getDelivers(_channel.getChannelId(), new AMQShortString("sgen_1"), 1); + + InternalTestProtocolSession.DeliveryPair pair = list.get(0); + + try + { + // The message should now be requeued and so unable to ack it. + _channel.acknowledgeMessage(pair.getDeliveryTag(), false); + } + catch (AMQException e) + { + assertEquals("Incorrect exception thrown", "Single ack on delivery tag 1 not known for channel:1", e.getMessage()); + } + + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java new file mode 100644 index 0000000000..c6cd5da01d --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import junit.framework.TestCase; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.ConsumerTagNotUniqueException; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; + +public class InternalBrokerBaseCase extends TestCase +{ + protected IApplicationRegistry _registry; + protected MessageStore _messageStore; + protected AMQChannel _channel; + protected InternalTestProtocolSession _session; + protected VirtualHost _virtualHost; + protected StoreContext _storeContext = new StoreContext(); + protected AMQQueue _queue; + protected AMQShortString QUEUE_NAME; + + public void setUp() throws Exception + { + super.setUp(); + _registry = new TestApplicationRegistry(); + ApplicationRegistry.initialise(_registry); + _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); + _messageStore = _virtualHost.getMessageStore(); + + QUEUE_NAME = new AMQShortString("test"); + _queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"), + false, _virtualHost, null); + + _virtualHost.getQueueRegistry().registerQueue(_queue); + + Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); + + _queue.bind(defaultExchange, QUEUE_NAME, null); + + _session = new InternalTestProtocolSession(); + + _session.setVirtualHost(_virtualHost); + + _channel = new AMQChannel(_session, 1, _messageStore); + + _session.addChannel(_channel); + } + + public void tearDown() throws Exception + { + ApplicationRegistry.removeAll(); + super.tearDown(); + } + + protected void checkStoreContents(int messageCount) + { + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size()); + + //The above publish message is sufficiently small not to fit in the header so no Body is required. + //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size()); + } + + protected AMQShortString subscribe(InternalTestProtocolSession session, AMQChannel channel, AMQQueue queue) + { + try + { + return channel.subscribeToQueue(null, queue, true, null, false, true); + } + catch (AMQException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + catch (ConsumerTagNotUniqueException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + //Keep the compiler happy + return null; + } + + public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException + { + MessagePublishInfo info = new MessagePublishInfo() + { + public AMQShortString getExchange() + { + return ExchangeDefaults.DEFAULT_EXCHANGE_NAME; + } + + public void setExchange(AMQShortString exchange) + { + + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return QUEUE_NAME; + } + }; + + for (int count = 0; count < messages; count++) + { + channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange())); + + //Set the body size + ContentHeaderBody _headerBody = new ContentHeaderBody(); + _headerBody.bodySize = 0; + + //Set Minimum properties + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + + properties.setExpiration(0L); + properties.setTimestamp(System.currentTimeMillis()); + + //Make Message Persistent + properties.setDeliveryMode((byte) 2); + + _headerBody.properties = properties; + + channel.publishContentHeader(_headerBody); + } + + } + + public void acknowledge(AMQChannel channel, long deliveryTag) + { + try + { + channel.acknowledgeMessage(deliveryTag, false); + } + catch (AMQException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java new file mode 100644 index 0000000000..471912c85a --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.MapConfiguration; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.ManagedObjectRegistry; +import org.apache.qpid.server.management.NoopManagedObjectRegistry; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.plugins.AllowAll; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Properties; +import java.util.Arrays; + +public class TestApplicationRegistry extends ApplicationRegistry +{ + private QueueRegistry _queueRegistry; + + private ExchangeRegistry _exchangeRegistry; + + private ExchangeFactory _exchangeFactory; + + private MessageStore _messageStore; + + private VirtualHost _vHost; + + + public TestApplicationRegistry() + { + super(new MapConfiguration(new HashMap())); + } + + public void initialise() throws Exception + { + Properties users = new Properties(); + + users.put("guest", "guest"); + + _databaseManager = new PropertiesPrincipalDatabaseManager("default", users); + + _accessManager = new AllowAll(); + + _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); + + _managedObjectRegistry = new NoopManagedObjectRegistry(); + + _messageStore = new TestableMemoryMessageStore(); + + _virtualHostRegistry = new VirtualHostRegistry(); + + _vHost = new VirtualHost("test", _messageStore); + + _virtualHostRegistry.registerVirtualHost(_vHost); + + _queueRegistry = _vHost.getQueueRegistry(); + _exchangeFactory = _vHost.getExchangeFactory(); + _exchangeRegistry = _vHost.getExchangeRegistry(); + + _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public Collection<String> getVirtualHostNames() + { + String[] hosts = {"test"}; + return Arrays.asList(hosts); + } + + public void setAccessManager(ACLPlugin newManager) + { + _accessManager = newManager; + } + + public MessageStore getMessageStore() + { + return _messageStore; + } + +} + + diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index a1a405c313..14020299f6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -117,6 +118,10 @@ public class MockProtocolSession implements AMQProtocolSession { } + public void closeConnection(int channelId, AMQConnectionException e, boolean closeIoSession) throws AMQException + { + } + public Object getKey() { return null; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java index 83b4665be6..6b5ab632b0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -52,15 +52,8 @@ public class TestApplicationRegistry extends ApplicationRegistry private ExchangeFactory _exchangeFactory; - private ManagedObjectRegistry _managedObjectRegistry; - - private ACLPlugin _accessManager; - - private PrincipalDatabaseManager _databaseManager; - - private AuthenticationManager _authenticationManager; - private MessageStore _messageStore; + private VirtualHost _vHost; public TestApplicationRegistry() @@ -92,11 +85,6 @@ public class TestApplicationRegistry extends ApplicationRegistry _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes } - public Configuration getConfiguration() - { - return _configuration; - } - public QueueRegistry getQueueRegistry() { return _queueRegistry; @@ -112,21 +100,6 @@ public class TestApplicationRegistry extends ApplicationRegistry return _exchangeFactory; } - public ManagedObjectRegistry getManagedObjectRegistry() - { - return _managedObjectRegistry; - } - - public PrincipalDatabaseManager getDatabaseManager() - { - return _databaseManager; - } - - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - public Collection<String> getVirtualHostNames() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -137,11 +110,6 @@ public class TestApplicationRegistry extends ApplicationRegistry return null; //To change body of implemented methods use File | Settings | File Templates. } - public ACLPlugin getAccessManager() - { - return _accessManager; - } - public void setAccessManager(ACLPlugin newManager) { _accessManager = newManager; |
