diff options
| author | Aidan Skinner <aidan@apache.org> | 2009-09-16 10:06:55 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2009-09-16 10:06:55 +0000 |
| commit | 9c4ecc45da929750ff7f0e0a5d7ada4e674b9105 (patch) | |
| tree | 3834f1b7f1fe3fbdd632a9c78f6295e54595abc5 /qpid/java/broker | |
| parent | c1ebe66bfab328c5192a35c21ea290b5c45f40f5 (diff) | |
| download | qpid-python-9c4ecc45da929750ff7f0e0a5d7ada4e674b9105.tar.gz | |
QPID-2105: Make NetworkDriver.open use a SSLContextFactory, not an SSLEngine.
Allow an existing SocketConnector to be passed into a MINANetworkDriver, for
use with the ExistingSocket bit of TransportConnection.
Move the ExistingSocket stuff to one place, use MINANetworkDriver in
TransportConnection and make AMQProtocolHandler implement ProtocolEngine. Remove MINA specific gubbins from AMQProtocolHandler and AMQProtocolSession.
Move fireAsynchEvent to Job
Add getLocalAddress to AMQProtocolEngine
Move TestNetworkDriver to common
Use correct class for logger in AMQProtocolEngine
Check the exception is thrown properly in SimpleACLTest, make it a little less
prone to obscure race conditions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
6 files changed, 34 insertions, 183 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml index 3c63c459be..ae133d1a96 100644 --- a/qpid/java/broker/build.xml +++ b/qpid/java/broker/build.xml @@ -21,6 +21,7 @@ <project name="AMQ Broker" default="build"> <property name="module.depends" value="management/common common"/> + <property name="module.test.depends" value="common/test" /> <property name="module.main" value="org.apache.qpid.server.Main"/> <import file="../module.xml"/> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 19d98161c6..16ebc76185 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -31,8 +31,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; import javax.management.JMException; @@ -51,7 +49,6 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQProtocolHeaderException; -import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ConnectionCloseBody; @@ -94,7 +91,7 @@ import org.apache.qpid.transport.Sender; public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession { - private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); + private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); @@ -180,6 +177,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); _actor.message(ConnectionMessages.CON_1001(null, null, false, false)); + _poolReference.acquireExecutorService(); } private AMQProtocolSessionMBean createMBean() throws AMQException @@ -212,7 +210,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - fireAsynchEvent(_readJob, new Event(new Runnable() + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable() { @Override public void run() @@ -463,7 +461,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol final ByteBuffer buf = frame.toNioByteBuffer(); _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); - fireAsynchEvent(_writeJob, new Event(new Runnable() + Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable() { @Override public void run() @@ -687,6 +685,10 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol /** This must be called when the session is _closed in order to free up any resources managed by the session. */ public void closeSession() throws AMQException { + if (CurrentActor.get() == null) + { + CurrentActor.set(_actor); + } if (!_closed) { if (_virtualHost != null) @@ -907,6 +909,11 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public SocketAddress getRemoteAddress() { return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); } public MethodRegistry getMethodRegistry() @@ -990,7 +997,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { // Do nothing } - + @Override public long getReadBytes() { @@ -1017,38 +1024,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return (_clientVersion == null) ? null : _clientVersion.toString(); } - /** - * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. - * - * @param job The job. - * @param event The event to hand off asynchronously. - */ - void fireAsynchEvent(Job job, Event event) - { - - job.add(event); - - final ExecutorService pool = _poolReference .getPool(); - - if(pool == null) - { - return; - } - - // rather than perform additional checks on pool to check that it hasn't shutdown. - // catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate()) - { - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - } - - } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 1162687741..2285f5256e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -35,11 +35,11 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.server.protocol.AMQProtocolEngine; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.TestNetworkDriver; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.TestNetworkDriver; public class ServerConfigurationTest extends TestCase { @@ -793,12 +793,12 @@ public class ServerConfigurationTest extends TestCase VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test"); TestNetworkDriver testDriver = new TestNetworkDriver(); - testDriver.setAddress("127.0.0.1"); + testDriver.setRemoteAddress("127.0.0.1"); AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver); assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost)); - testDriver.setAddress("127.1.2.3"); + testDriver.setRemoteAddress("127.1.2.3"); session = new AMQProtocolEngine(virtualHostRegistry, testDriver); assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost)); } @@ -867,7 +867,7 @@ public class ServerConfigurationTest extends TestCase VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test"); TestNetworkDriver testDriver = new TestNetworkDriver(); - testDriver.setAddress("127.0.0.1"); + testDriver.setRemoteAddress("127.0.0.1"); AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver); session.setNetworkDriver(testDriver); @@ -935,7 +935,7 @@ public class ServerConfigurationTest extends TestCase // Test config TestNetworkDriver testDriver = new TestNetworkDriver(); - testDriver.setAddress("127.0.0.1"); + testDriver.setRemoteAddress("127.0.0.1"); VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry(); VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test"); AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index c4362f2c60..ec7bf1cb72 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.TestNetworkDriver; public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java deleted file mode 100644 index 098843d315..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import javax.net.ssl.SSLEngine; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.NetworkDriverConfiguration; -import org.apache.qpid.transport.OpenException; - -/** - * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, - * so if this class is being used and some methods are to be used, then please update those. - */ -public class TestNetworkDriver implements NetworkDriver -{ - private final ConcurrentMap attributes = new ConcurrentHashMap(); - private String _address = "127.0.0.1"; - private int _port = 1; - - public TestNetworkDriver() - { - } - - public void setAddress(String string) - { - this._address = string; - } - - public String getAddress() - { - return _address; - } - - public void setPort(int _port) - { - this._port = _port; - } - - public int getPort() - { - return _port; - } - - public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException - { - - } - - public SocketAddress getLocalAddress() - { - return new InetSocketAddress(_address, _port); - } - - public SocketAddress getRemoteAddress() - { - return new InetSocketAddress(_address, _port); - } - - public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, - SSLEngine sslEngine) throws OpenException - { - - } - - public void setMaxReadIdle(int idleTime) - { - - } - - public void setMaxWriteIdle(int idleTime) - { - - } - - public void close() - { - - } - - public void flush() - { - - } - - public void send(ByteBuffer msg) - { - - } - - public void setIdleTimeout(long l) - { - - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java index bda816f5ab..5d3335c001 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java @@ -32,12 +32,12 @@ import junit.framework.TestCase; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.server.protocol.AMQProtocolEngine; -import org.apache.qpid.server.protocol.TestNetworkDriver; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.TestNetworkDriver; public class FirewallPluginTest extends TestCase { @@ -90,7 +90,7 @@ public class FirewallPluginTest extends TestCase super.setUp(); _store = new TestableMemoryMessageStore(); _testDriver = new TestNetworkDriver(); - _testDriver.setAddress("127.0.0.1"); + _testDriver.setRemoteAddress("127.0.0.1"); // Retreive VirtualHost from the Registry VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry(); @@ -167,7 +167,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -182,7 +182,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -195,7 +195,7 @@ public class FirewallPluginTest extends TestCase FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule}); // Set session IP so that we're connected from the right address - _testDriver.setAddress("127.0.0.1"); + _testDriver.setRemoteAddress("127.0.0.1"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -208,7 +208,7 @@ public class FirewallPluginTest extends TestCase FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule}); // Set session IP so that we're connected from the right address - _testDriver.setAddress("127.0.0.1"); + _testDriver.setRemoteAddress("127.0.0.1"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -231,7 +231,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -254,7 +254,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -268,7 +268,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -282,7 +282,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -292,11 +292,11 @@ public class FirewallPluginTest extends TestCase firstRule.setAccess("allow"); firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName()); FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule}); - _testDriver.setAddress("10.0.0.1"); + _testDriver.setRemoteAddress("10.0.0.1"); assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("127.0.0.1"); + _testDriver.setRemoteAddress("127.0.0.1"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } |
