summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-07-07 15:10:30 +0000
committerRobert Gemmell <robbie@apache.org>2011-07-07 15:10:30 +0000
commit7c964a16114a1731a384d86d3c26087a3133d9dc (patch)
treec3a15f8d7b32894beeaba7827740df928413b19e /qpid/java/common/src/main
parent093280efbbd98e0e73f2d45da22b3bca993acd0d (diff)
downloadqpid-python-7c964a16114a1731a384d86d3c26087a3133d9dc.tar.gz
QPID-3342: transition TCP based Mina transport for 0-8/0-9/0-9-1 protocols over to new IO interface model
Applied patch by Keith Wall and myself git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1143867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java63
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java)20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java30
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java27
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java51
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java368
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java81
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java149
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java250
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java79
16 files changed, 706 insertions, 448 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 31953ea6ab..48a3df734a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -22,8 +22,6 @@ package org.apache.qpid.protocol;
import java.net.SocketAddress;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Receiver;
/**
@@ -32,9 +30,6 @@ import org.apache.qpid.transport.Receiver;
*/
public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
{
- // Sets the network driver providing data for this ProtocolEngine
- void setNetworkDriver (NetworkDriver driver);
-
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
index 9df84eef90..4e40b78440 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.protocol;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.NetworkConnection;
public interface ProtocolEngineFactory
{
// Returns a new instance of a ProtocolEngine
- ProtocolEngine newProtocolEngine(NetworkDriver networkDriver);
+ ProtocolEngine newProtocolEngine(NetworkConnection network);
} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
index 08678b213b..2074c77a5b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
@@ -30,6 +30,8 @@ import java.util.Map;
*/
public class ConnectionSettings
{
+ public static final String WILDCARD_ADDRESS = "*";
+
String protocol = "tcp";
String host = "localhost";
String vhost;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
deleted file mode 100644
index 86af97bf7e..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport;
-
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.SocketAddress;
-
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-
-public interface NetworkDriver extends Sender<java.nio.ByteBuffer>
-{
- // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to
- // it using the SSLContextFactory if provided
- void open(int port, InetAddress destination, ProtocolEngine engine,
- NetworkDriverConfiguration config, SSLContextFactory sslFactory)
- throws OpenException;
-
- // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which
- // processes incoming connections with ProtocolEngines and SSLEngines created from the factories
- // (in the case of an SSLContextFactory, if provided)
- void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
- NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException;
-
- // Returns the remote address of the underlying socket
- SocketAddress getRemoteAddress();
-
- // Returns the local address of the underlying socket
- SocketAddress getLocalAddress();
-
- /**
- * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been
- * read in seconds
- */
- void setMaxReadIdle(int idleTime);
-
- /**
- * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been
- * written in seconds
- */
- void setMaxWriteIdle(int idleTime);
-
-} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
index c38afe5dd5..8d3f7a779a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
@@ -25,20 +25,22 @@ package org.apache.qpid.transport;
* buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned
* from here if the underlying implementation supports them.
*/
-public interface NetworkDriverConfiguration
+public interface NetworkTransportConfiguration
{
// Taken from Socket
- Boolean getKeepAlive();
- Boolean getOOBInline();
- Boolean getReuseAddress();
- Integer getSoLinger(); // null means off
- Integer getSoTimeout();
Boolean getTcpNoDelay();
- Integer getTrafficClass();
// The amount of memory in bytes to allocate to the incoming buffer
Integer getReceiveBufferSize();
// The amount of memory in bytes to allocate to the outgoing buffer
- Integer getSendBufferSize();
-}
+ Integer getSendBufferSize();
+
+ Integer getPort();
+
+ String getHost();
+
+ String getTransport();
+
+ Integer getConnectorProcessors();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java
new file mode 100644
index 0000000000..2c7652abeb
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java
@@ -0,0 +1,8 @@
+package org.apache.qpid.transport;
+
+import org.apache.mina.common.IoConnector;
+
+public interface SocketConnectorFactory
+{
+ IoConnector newConnector();
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
new file mode 100644
index 0000000000..7099916c33
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+
+public interface IncomingNetworkTransport extends NetworkTransport
+{
+ public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContextFactory sslFactory);
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
index 80b32ea909..1f69973b96 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
@@ -40,4 +40,8 @@ public interface NetworkConnection
* Returns the local address of the underlying socket.
*/
SocketAddress getLocalAddress();
+
+ void setMaxWriteIdle(int sec);
+
+ void setMaxReadIdle(int sec);
} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
new file mode 100644
index 0000000000..4b8a0baf75
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+public class Transport
+{
+ public static final String TCP = "tcp";
+ public static final String VM = "vm";
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java
new file mode 100644
index 0000000000..acc55c2e2d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
+public class VMBrokerMap
+{
+ private static final Map<Integer, VmPipeAddress> _map = new HashMap<Integer, VmPipeAddress>();
+
+ public static void add(int port, VmPipeAddress pipe)
+ {
+ _map.put(port, pipe);
+ }
+
+ public static VmPipeAddress remove(int port)
+ {
+ return _map.remove(port);
+ }
+
+ public static void clear()
+ {
+ _map.clear();
+ }
+
+ public static boolean contains(int port)
+ {
+ return _map.containsKey(port);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
index 3252544fee..cca1fc46c9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -78,5 +78,16 @@ public class IoNetworkConnection implements NetworkConnection
{
return _socket.getLocalSocketAddress();
}
-
+
+ public void setMaxWriteIdle(int sec)
+ {
+ // TODO implement support for setting heartbeating config in this way
+ // Currently a socket timeout is used in IoSender
+ }
+
+ public void setMaxReadIdle(int sec)
+ {
+ // TODO implement support for setting heartbeating config in this way
+ // Currently a socket timeout is used in IoSender
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
deleted file mode 100644
index 2206e0999e..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.transport.network.mina;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExecutorThreadModel;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.NewThreadExecutor;
-import org.apache.mina.util.SessionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.thread.QpidThreadExecutor;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.NetworkDriverConfiguration;
-import org.apache.qpid.transport.OpenException;
-
-import java.io.IOException;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
-public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
-{
-
- private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
-
- ProtocolEngine _protocolEngine;
- private int _processors = 4;
- private SSLContextFactory _sslFactory = null;
- private IoConnector _socketConnector;
- private IoAcceptor _acceptor;
- private IoSession _ioSession;
- private ProtocolEngineFactory _factory;
- private Throwable _lastException;
- private boolean _acceptingConnections = false;
-
- private WriteFuture _lastWriteFuture;
-
- private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class);
-
- static
- {
- org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
-
- //override the MINA defaults to prevent use of the PooledByteBufferAllocator
- org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
- }
-
- public MINANetworkDriver(int processors, ProtocolEngine protocolEngine, IoSession session)
- {
- _processors = processors;
- _protocolEngine = protocolEngine;
- _ioSession = session;
- _ioSession.setAttachment(_protocolEngine);
- }
-
- public MINANetworkDriver()
- {
-
- }
-
- public MINANetworkDriver(IoConnector ioConnector)
- {
- _socketConnector = ioConnector;
- }
-
- public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine)
- {
- _socketConnector = ioConnector;
- _protocolEngine = engine;
- }
-
- public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
- NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
- {
-
- _factory = factory;
-
- _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor());
-
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
- sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)"));
- SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
-
- if (config != null)
- {
- sc.setReceiveBufferSize(config.getReceiveBufferSize());
- sc.setSendBufferSize(config.getSendBufferSize());
- sc.setTcpNoDelay(config.getTcpNoDelay());
- }
-
- if (sslFactory != null)
- {
- _sslFactory = sslFactory;
- }
-
- if (addresses != null && addresses.length > 0)
- {
- for (InetAddress addr : addresses)
- {
- try
- {
- _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig);
- }
- catch (IOException e)
- {
- throw new BindException(String.format("Could not bind to %1s:%2s", addr, port));
- }
- }
- }
- else
- {
- try
- {
- _acceptor.bind(new InetSocketAddress(port), this, sconfig);
- }
- catch (IOException e)
- {
- throw new BindException(String.format("Could not bind to *:%1s", port));
- }
- }
- _acceptingConnections = true;
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _ioSession.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _ioSession.getLocalAddress();
- }
-
-
- public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
- SSLContextFactory sslFactory) throws OpenException
- {
- if (sslFactory != null)
- {
- _sslFactory = sslFactory;
- }
-
- _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
-
- SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
- String s = "";
- StackTraceElement[] trace = Thread.currentThread().getStackTrace();
- for(StackTraceElement elt : trace)
- {
- if(elt.getClassName().contains("Test"))
- {
- s = elt.getClassName();
- break;
- }
- }
- cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Client)-"+s));
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true);
- scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE);
- scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE);
-
- // Don't have the connector's worker thread wait around for other
- // connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows
- // short-running
- // clients (like unit tests) to complete quickly.
- if (_socketConnector instanceof SocketConnector)
- {
- ((SocketConnector) _socketConnector).setWorkerTimeout(0);
- }
-
- ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
- future.join();
- if (!future.isConnected())
- {
- throw new OpenException("Could not open connection", _lastException);
- }
- _ioSession = future.getSession();
- _ioSession.setAttachment(engine);
- engine.setNetworkDriver(this);
- _protocolEngine = engine;
- }
-
- public void setMaxReadIdle(int idleTime)
- {
- _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime);
- }
-
- public void setMaxWriteIdle(int idleTime)
- {
- _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime);
- }
-
- public void close()
- {
- if (_lastWriteFuture != null)
- {
- _lastWriteFuture.join();
- }
- if (_acceptor != null)
- {
- _acceptor.unbindAll();
- }
- if (_ioSession != null)
- {
- _ioSession.close();
- }
- }
-
- public void flush()
- {
- if (_lastWriteFuture != null)
- {
- _lastWriteFuture.join();
- }
- }
-
- public void send(ByteBuffer msg)
- {
- org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity());
- minaBuf.put(msg);
- minaBuf.flip();
- _lastWriteFuture = _ioSession.write(minaBuf);
- }
-
- public void setIdleTimeout(int i)
- {
- // MINA doesn't support setting SO_TIMEOUT
- }
-
- public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
- {
- if (_protocolEngine != null)
- {
- _protocolEngine.exception(throwable);
- }
- else
- {
- _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable);
- }
- _lastException = throwable;
- }
-
- /**
- * Invoked when a message is received on a particular protocol session. Note
- * that a protocol session is directly tied to a particular physical
- * connection.
- *
- * @param protocolSession
- * the protocol session that received the message
- * @param message
- * the message itself (i.e. a decoded frame)
- *
- * @throws Exception
- * if the message cannot be processed
- */
- public void messageReceived(IoSession protocolSession, Object message) throws Exception
- {
- if (message instanceof org.apache.mina.common.ByteBuffer)
- {
- ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf());
- }
- else
- {
- throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
- }
- }
-
- public void sessionClosed(IoSession protocolSession) throws Exception
- {
- ((ProtocolEngine) protocolSession.getAttachment()).closed();
- }
-
- public void sessionCreated(IoSession protocolSession) throws Exception
- {
- // Configure the session with SSL if necessary
- SessionUtil.initialize(protocolSession);
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
-
- if (_ioSession == null)
- {
- _ioSession = protocolSession;
- }
-
- if (_acceptingConnections)
- {
- // Set up the protocol engine
- ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
- MINANetworkDriver newDriver = new MINANetworkDriver(_processors, protocolEngine, protocolSession);
- protocolEngine.setNetworkDriver(newDriver);
- }
- }
-
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
- {
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- ((ProtocolEngine) session.getAttachment()).writerIdle();
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- ((ProtocolEngine) session.getAttachment()).readerIdle();
- }
- }
-
- private ProtocolEngine getProtocolEngine()
- {
- return _protocolEngine;
- }
-
- public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections)
- {
- _factory = engineFactory;
- _acceptingConnections = acceptingConnections;
- }
-
- public void setProtocolEngine(ProtocolEngine protocolEngine)
- {
- _protocolEngine = protocolEngine;
- if (_ioSession != null)
- {
- _ioSession.setAttachment(protocolEngine);
- }
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java
new file mode 100644
index 0000000000..0f433f6eeb
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java
@@ -0,0 +1,81 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.mina;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+public class MinaNetworkConnection implements NetworkConnection
+{
+ private IoSession _session;
+ private Sender<ByteBuffer> _sender;
+
+ public MinaNetworkConnection(IoSession session)
+ {
+ _session = session;
+ _sender = new MinaSender(_session);
+ }
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return _sender;
+ }
+
+ public void close()
+ {
+ _session.close();
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _session.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _session.getLocalAddress();
+ }
+
+ public long getReadBytes()
+ {
+ return _session.getReadBytes();
+ }
+
+ public long getWrittenBytes()
+ {
+ return _session.getWrittenBytes();
+ }
+
+ public void setMaxWriteIdle(int sec)
+ {
+ _session.setIdleTime(IdleStatus.WRITER_IDLE, sec);
+ }
+
+ public void setMaxReadIdle(int sec)
+ {
+ _session.setIdleTime(IdleStatus.READER_IDLE, sec);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
new file mode 100644
index 0000000000..c00187480c
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.util.SessionUtil;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MinaNetworkHandler extends IoHandlerAdapter
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(MinaNetworkHandler.class);
+
+ private ProtocolEngineFactory _factory;
+ private SSLContextFactory _sslFactory = null;
+
+ static
+ {
+ boolean directBuffers = Boolean.getBoolean("amqj.enableDirectBuffers");
+ LOGGER.debug("Using " + (directBuffers ? "direct" : "heap") + " buffers");
+ ByteBuffer.setUseDirectBuffers(directBuffers);
+
+ //override the MINA defaults to prevent use of the PooledByteBufferAllocator
+ ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ }
+
+ public MinaNetworkHandler(SSLContextFactory sslFactory, ProtocolEngineFactory factory)
+ {
+ _sslFactory = sslFactory;
+ _factory = factory;
+ }
+
+ public MinaNetworkHandler(SSLContextFactory sslFactory)
+ {
+ this(sslFactory, null);
+ }
+
+ public void messageReceived(IoSession session, Object message)
+ {
+ ProtocolEngine engine = (ProtocolEngine) session.getAttachment();
+ ByteBuffer buf = (ByteBuffer) message;
+ try
+ {
+ engine.received(buf.buf());
+ }
+ catch (RuntimeException re)
+ {
+ engine.exception(re);
+ }
+ }
+
+ public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception
+ {
+ ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment();
+ if(engine != null)
+ {
+ LOGGER.error("Exception caught by Mina", throwable);
+ engine.exception(throwable);
+ }
+ else
+ {
+ LOGGER.error("Exception caught by Mina but without protocol engine to handle it", throwable);
+ }
+ }
+
+ public void sessionCreated(IoSession ioSession) throws Exception
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Created session: " + ioSession.getRemoteAddress());
+ }
+
+ SessionUtil.initialize(ioSession);
+
+ if (_sslFactory != null)
+ {
+ ioSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
+ }
+
+ if (_factory != null)
+ {
+ NetworkConnection netConn = new MinaNetworkConnection(ioSession);
+
+ ProtocolEngine engine = _factory.newProtocolEngine(netConn);
+ ioSession.setAttachment(engine);
+ }
+ }
+
+ public void sessionClosed(IoSession ioSession) throws Exception
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("closed: " + ioSession.getRemoteAddress());
+ }
+
+ ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment();
+ if(engine != null)
+ {
+ engine.closed();
+ }
+ else
+ {
+ LOGGER.error("Unable to close ProtocolEngine as none was present");
+ }
+ }
+
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ {
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).writerIdle();
+ }
+ else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).readerIdle();
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
new file mode 100644
index 0000000000..62f9429f30
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
@@ -0,0 +1,250 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.mina;
+
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExecutorThreadModel;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.SocketConnectorFactory;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.VMBrokerMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+{
+ private static final int UNKNOWN = -1;
+ private static final int TCP = 0;
+ private static final int VM = 1;
+
+ public NetworkConnection _connection;
+ private SocketAcceptor _acceptor;
+ private InetSocketAddress _address;
+
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<java.nio.ByteBuffer> delegate, SSLContextFactory sslFactory)
+ {
+ int transport = getTransport(settings.getProtocol());
+
+ IoConnectorCreator stc;
+ switch(transport)
+ {
+ case TCP:
+ stc = new IoConnectorCreator(new SocketConnectorFactory()
+ {
+ public IoConnector newConnector()
+ {
+ return new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
+ }
+ });
+ _connection = stc.connect(delegate, settings, sslFactory);
+ break;
+ case VM:
+ stc = new IoConnectorCreator(new SocketConnectorFactory()
+ {
+ public IoConnector newConnector()
+ {
+ return new QpidVmPipeConnector();
+ }
+ });
+ _connection = stc.connect(delegate, settings, sslFactory);
+ break;
+ case UNKNOWN:
+ default:
+ throw new TransportException("Unknown protocol: " + settings.getProtocol());
+ }
+
+ return _connection;
+ }
+
+ private static int getTransport(String transport)
+ {
+ if (transport.equals(Transport.TCP))
+ {
+ return TCP;
+ }
+
+ if (transport.equals(Transport.VM))
+ {
+ return VM;
+ }
+
+ return -1;
+ }
+
+ public void close()
+ {
+ if(_connection != null)
+ {
+ _connection.close();
+ }
+ if (_acceptor != null)
+ {
+ _acceptor.unbindAll();
+ }
+ }
+
+ public NetworkConnection getConnection()
+ {
+ return _connection;
+ }
+
+ public void accept(final NetworkTransportConfiguration config, final ProtocolEngineFactory factory,
+ final SSLContextFactory sslFactory)
+ {
+ int processors = config.getConnectorProcessors();
+
+ if (Transport.TCP.equalsIgnoreCase(config.getTransport()))
+ {
+ _acceptor = new SocketAcceptor(processors, new NewThreadExecutor());
+
+ SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
+ sconfig.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Acceptor)"));
+ SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+ sc.setTcpNoDelay(config.getTcpNoDelay());
+ sc.setSendBufferSize(config.getSendBufferSize());
+ sc.setReceiveBufferSize(config.getReceiveBufferSize());
+
+ if (config.getHost().equals(WILDCARD_ADDRESS))
+ {
+ _address = new InetSocketAddress(config.getPort());
+ }
+ else
+ {
+ _address = new InetSocketAddress(config.getHost(), config.getPort());
+ }
+ }
+ else
+ {
+ throw new TransportException("Unknown transport: " + config.getTransport());
+ }
+
+ try
+ {
+ _acceptor.bind(_address, new MinaNetworkHandler(sslFactory, factory));
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Could not bind to " + _address, e);
+ }
+ }
+
+
+ private static class IoConnectorCreator
+ {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoConnectorCreator.class);
+
+ private static final int CLIENT_DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ private SocketConnectorFactory _ioConnectorFactory;
+
+ public IoConnectorCreator(SocketConnectorFactory socketConnectorFactory)
+ {
+ _ioConnectorFactory = socketConnectorFactory;
+ }
+
+ public NetworkConnection connect(Receiver<java.nio.ByteBuffer> receiver, ConnectionSettings settings, SSLContextFactory sslFactory)
+ {
+ final IoConnector ioConnector = _ioConnectorFactory.newConnector();
+ final SocketAddress address;
+ final String protocol = settings.getProtocol();
+ final int port = settings.getPort();
+
+ if (Transport.TCP.equalsIgnoreCase(protocol))
+ {
+ address = new InetSocketAddress(settings.getHost(), port);
+ }
+ else if(Transport.VM.equalsIgnoreCase(protocol))
+ {
+ synchronized (VMBrokerMap.class)
+ {
+ if(!VMBrokerMap.contains(port))
+ {
+ throw new TransportException("VM broker on port " + port + " does not exist.");
+ }
+ }
+
+ address = new VmPipeAddress(port);
+ }
+ else
+ {
+ throw new TransportException("Unknown transport: " + protocol);
+ }
+
+ LOGGER.info("Attempting connection to " + address);
+
+ if (ioConnector instanceof SocketConnector)
+ {
+ SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+ cfg.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Client)"));
+
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay(true);
+ scfg.setSendBufferSize(CLIENT_DEFAULT_BUFFER_SIZE);
+ scfg.setReceiveBufferSize(CLIENT_DEFAULT_BUFFER_SIZE);
+
+ // Don't have the connector's worker thread wait around for other
+ // connections (we only use one SocketConnector per connection
+ // at the moment anyway). This allows short-running
+ // clients (like unit tests) to complete quickly.
+ ((SocketConnector) ioConnector).setWorkerTimeout(0);
+ }
+
+ ConnectFuture future = ioConnector.connect(address, new MinaNetworkHandler(sslFactory), ioConnector.getDefaultConfig());
+ future.join();
+ if (!future.isConnected())
+ {
+ throw new TransportException("Could not open connection");
+ }
+
+ IoSession session = future.getSession();
+ session.setAttachment(receiver);
+
+ return new MinaNetworkConnection(session);
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
new file mode 100644
index 0000000000..be114e2fa1
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.mina;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.qpid.transport.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MinaSender
+ */
+public class MinaSender implements Sender<java.nio.ByteBuffer>
+{
+ private static final Logger _log = LoggerFactory.getLogger(MinaSender.class);
+
+ private final IoSession _session;
+ private WriteFuture _lastWrite;
+
+ public MinaSender(IoSession session)
+ {
+ _session = session;
+ }
+
+ public synchronized void send(java.nio.ByteBuffer msg)
+ {
+ _log.debug("sending data:");
+ ByteBuffer mina = ByteBuffer.allocate(msg.limit());
+ mina.put(msg);
+ mina.flip();
+ _lastWrite = _session.write(mina);
+ _log.debug("sent data:");
+ }
+
+ public synchronized void flush()
+ {
+ if (_lastWrite != null)
+ {
+ _lastWrite.join();
+ }
+ }
+
+ public void close()
+ {
+ // MINA will sometimes throw away in-progress writes when you ask it to close
+ flush();
+ CloseFuture closed = _session.close();
+ closed.join();
+ }
+
+ public void setIdleTimeout(int i)
+ {
+ //TODO:
+ //We are instead using the setMax[Read|Write]IdleTime methods in
+ //MinaNetworkConnection for this. Should remove this method from
+ //sender interface, but currently being used by IoSender for 0-10.
+ }
+}