summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java109
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java9
4 files changed, 121 insertions, 10 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index f4e3a10f92..7d17397b2d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -25,7 +25,6 @@ import static org.apache.qpid.transport.Connection.State.CLOSING;
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
-import static org.apache.qpid.transport.Connection.State.RESUMING;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -41,12 +40,13 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.io.IoNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
@@ -242,10 +242,9 @@ public class Connection extends ConnectionInvoker
userID = settings.getUsername();
delegate = new ClientDelegate(settings);
- securityLayer = new SecurityLayer();
- securityLayer.init(this);
+ securityLayer = new SecurityLayer(this);
- OutgoingNetworkTransport transport = new IoNetworkTransport();
+ OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
NetworkConnection network = transport.connect(settings, receiver, null);
sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize());
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
index 9371835e89..4610c2351e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.transport.network;
+/**
+ * A network transport is responsible for the establishment of network connections.
+ * NetworkTransport implementations are pluggable via the {@link Transport} class.
+ */
public interface NetworkTransport
{
public void close();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
index 742d6575df..2c10a30f10 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -20,7 +20,116 @@
*/
package org.apache.qpid.transport.network;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.TransportException;
+
public class Transport
{
+ public static final String QPID_TRANSPORT_PROPNAME = "qpid.transport";
+ public static final String QPID_TRANSPORT_V0_8_PROPNAME = "qpid.transport.v0_8";
+ public static final String QPID_TRANSPORT_V0_9_PROPNAME = "qpid.transport.v0_9";
+ public static final String QPID_TRANSPORT_V0_9_1_PROPNAME = "qpid.transport.v0_9_1";
+ public static final String QPID_TRANSPORT_V0_10_PROPNAME = "qpid.transport.v0_10";
+ public static final String QPID_BROKER_TRANSPORT_PROPNAME = "qpid.broker.transport";
+
+ // Can't reference the class directly here, as this would preclude the ability to bundle transports separately.
+ private static final String MINA_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.mina.MinaNetworkTransport";
+ private static final String IO_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.io.IoNetworkTransport";
+
public static final String TCP = "tcp";
+
+ private final static Map<ProtocolVersion,String> OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP;
+
+ static
+ {
+ final Map<ProtocolVersion,String> map = new HashMap<ProtocolVersion, String>();
+ map.put(ProtocolVersion.v8_0, MINA_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_9, MINA_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_91, MINA_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_10, IO_TRANSPORT_CLASSNAME);
+
+ OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map);
+ }
+
+ public static IncomingNetworkTransport getIncomingTransportInstance()
+ {
+ return (IncomingNetworkTransport) loadTransportClass(
+ System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, MINA_TRANSPORT_CLASSNAME));
+ }
+
+ public static OutgoingNetworkTransport getOutgoingTransportInstance(
+ final ProtocolVersion protocolVersion)
+ {
+
+ final String overrride = getOverrideClassNameFromSystemProperty(protocolVersion);
+ final String networkTransportClassName;
+ if (overrride != null)
+ {
+ networkTransportClassName = overrride;
+ }
+ else
+ {
+ networkTransportClassName = OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP.get(protocolVersion);
+ }
+
+ return (OutgoingNetworkTransport) loadTransportClass(networkTransportClassName);
+ }
+
+ private static NetworkTransport loadTransportClass(final String networkTransportClassName)
+ {
+ if (networkTransportClassName == null)
+ {
+ throw new IllegalArgumentException("transport class name must not be null");
+ }
+
+ try
+ {
+ final Class<?> clazz = Class.forName(networkTransportClassName);
+ return (NetworkTransport) clazz.newInstance();
+ }
+ catch (InstantiationException e)
+ {
+ throw new TransportException("Unable to instantiate transport class " + networkTransportClassName, e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new TransportException("Access exception " + networkTransportClassName, e);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new TransportException("Unable to load transport class " + networkTransportClassName, e);
+ }
+ }
+
+ private static String getOverrideClassNameFromSystemProperty(final ProtocolVersion protocolVersion)
+ {
+ final String protocolSpecificSystemProperty;
+
+ if (ProtocolVersion.v0_10.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_10_PROPNAME;
+ }
+ else if (ProtocolVersion.v0_91.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_1_PROPNAME;
+ }
+ else if (ProtocolVersion.v0_9.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_PROPNAME;
+ }
+ else if (ProtocolVersion.v8_0.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_8_PROPNAME;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown ProtocolVersion " + protocolVersion);
+ }
+
+ return System.getProperty(protocolSpecificSystemProperty, System.getProperty(QPID_TRANSPORT_PROPNAME));
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
index 3f0966903d..69e4b52edb 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
@@ -43,8 +43,8 @@ public class SecurityLayer
Connection con;
SSLSecurityLayer sslLayer;
SASLSecurityLayer saslLayer;
-
- public void init(Connection con) throws TransportException
+
+ public SecurityLayer(Connection con)
{
this.con = con;
this.settings = con.getConnectionSettings();
@@ -55,10 +55,9 @@ public class SecurityLayer
if (settings.isUseSASLEncryption())
{
saslLayer = new SASLSecurityLayer();
- }
-
+ }
}
-
+
public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
{
Sender<ByteBuffer> sender = delegate;