summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-09-16 10:06:55 +0000
committerAidan Skinner <aidan@apache.org>2009-09-16 10:06:55 +0000
commit9c4ecc45da929750ff7f0e0a5d7ada4e674b9105 (patch)
tree3834f1b7f1fe3fbdd632a9c78f6295e54595abc5 /qpid/java/broker
parentc1ebe66bfab328c5192a35c21ea290b5c45f40f5 (diff)
downloadqpid-python-9c4ecc45da929750ff7f0e0a5d7ada4e674b9105.tar.gz
QPID-2105: Make NetworkDriver.open use a SSLContextFactory, not an SSLEngine.
Allow an existing SocketConnector to be passed into a MINANetworkDriver, for use with the ExistingSocket bit of TransportConnection. Move the ExistingSocket stuff to one place, use MINANetworkDriver in TransportConnection and make AMQProtocolHandler implement ProtocolEngine. Remove MINA specific gubbins from AMQProtocolHandler and AMQProtocolSession. Move fireAsynchEvent to Job Add getLocalAddress to AMQProtocolEngine Move TestNetworkDriver to common Use correct class for logger in AMQProtocolEngine Check the exception is thrown properly in SimpleACLTest, make it a little less prone to obscure race conditions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/build.xml1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java55
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java126
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java24
6 files changed, 34 insertions, 183 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml
index 3c63c459be..ae133d1a96 100644
--- a/qpid/java/broker/build.xml
+++ b/qpid/java/broker/build.xml
@@ -21,6 +21,7 @@
<project name="AMQ Broker" default="build">
<property name="module.depends" value="management/common common"/>
+ <property name="module.test.depends" value="common/test" />
<property name="module.main" value="org.apache.qpid.server.Main"/>
<import file="../module.xml"/>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 19d98161c6..16ebc76185 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -31,8 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
@@ -51,7 +49,6 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
@@ -94,7 +91,7 @@ import org.apache.qpid.transport.Sender;
public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
{
- private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+ private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
@@ -180,6 +177,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
_actor.message(ConnectionMessages.CON_1001(null, null, false, false));
+ _poolReference.acquireExecutorService();
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -212,7 +210,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- fireAsynchEvent(_readJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
{
@Override
public void run()
@@ -463,7 +461,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
final ByteBuffer buf = frame.toNioByteBuffer();
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- fireAsynchEvent(_writeJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable()
{
@Override
public void run()
@@ -687,6 +685,10 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
+ if (CurrentActor.get() == null)
+ {
+ CurrentActor.set(_actor);
+ }
if (!_closed)
{
if (_virtualHost != null)
@@ -907,6 +909,11 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public SocketAddress getRemoteAddress()
{
return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
}
public MethodRegistry getMethodRegistry()
@@ -990,7 +997,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
// Do nothing
}
-
+
@Override
public long getReadBytes()
{
@@ -1017,38 +1024,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
return (_clientVersion == null) ? null : _clientVersion.toString();
}
- /**
- * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
- *
- * @param job The job.
- * @param event The event to hand off asynchronously.
- */
- void fireAsynchEvent(Job job, Event event)
- {
-
- job.add(event);
-
- final ExecutorService pool = _poolReference .getPool();
-
- if(pool == null)
- {
- return;
- }
-
- // rather than perform additional checks on pool to check that it hasn't shutdown.
- // catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate())
- {
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
- }
-
- }
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 1162687741..2285f5256e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -35,11 +35,11 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.TestNetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
public class ServerConfigurationTest extends TestCase
{
@@ -793,12 +793,12 @@ public class ServerConfigurationTest extends TestCase
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
- testDriver.setAddress("127.1.2.3");
+ testDriver.setRemoteAddress("127.1.2.3");
session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost));
}
@@ -867,7 +867,7 @@ public class ServerConfigurationTest extends TestCase
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
session.setNetworkDriver(testDriver);
@@ -935,7 +935,7 @@ public class ServerConfigurationTest extends TestCase
// Test config
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index c4362f2c60..ec7bf1cb72 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TestNetworkDriver;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
deleted file mode 100644
index 098843d315..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.net.ssl.SSLEngine;
-
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.NetworkDriverConfiguration;
-import org.apache.qpid.transport.OpenException;
-
-/**
- * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
- * so if this class is being used and some methods are to be used, then please update those.
- */
-public class TestNetworkDriver implements NetworkDriver
-{
- private final ConcurrentMap attributes = new ConcurrentHashMap();
- private String _address = "127.0.0.1";
- private int _port = 1;
-
- public TestNetworkDriver()
- {
- }
-
- public void setAddress(String string)
- {
- this._address = string;
- }
-
- public String getAddress()
- {
- return _address;
- }
-
- public void setPort(int _port)
- {
- this._port = _port;
- }
-
- public int getPort()
- {
- return _port;
- }
-
- public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
- NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
- {
-
- }
-
- public SocketAddress getLocalAddress()
- {
- return new InetSocketAddress(_address, _port);
- }
-
- public SocketAddress getRemoteAddress()
- {
- return new InetSocketAddress(_address, _port);
- }
-
- public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
- SSLEngine sslEngine) throws OpenException
- {
-
- }
-
- public void setMaxReadIdle(int idleTime)
- {
-
- }
-
- public void setMaxWriteIdle(int idleTime)
- {
-
- }
-
- public void close()
- {
-
- }
-
- public void flush()
- {
-
- }
-
- public void send(ByteBuffer msg)
- {
-
- }
-
- public void setIdleTimeout(long l)
- {
-
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
index bda816f5ab..5d3335c001 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
@@ -32,12 +32,12 @@ import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
-import org.apache.qpid.server.protocol.TestNetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
public class FirewallPluginTest extends TestCase
{
@@ -90,7 +90,7 @@ public class FirewallPluginTest extends TestCase
super.setUp();
_store = new TestableMemoryMessageStore();
_testDriver = new TestNetworkDriver();
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
// Retreive VirtualHost from the Registry
VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
@@ -167,7 +167,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -182,7 +182,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -195,7 +195,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -208,7 +208,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -231,7 +231,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -254,7 +254,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -268,7 +268,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -282,7 +282,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -292,11 +292,11 @@ public class FirewallPluginTest extends TestCase
firstRule.setAccess("allow");
firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName());
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule});
- _testDriver.setAddress("10.0.0.1");
+ _testDriver.setRemoteAddress("10.0.0.1");
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}