summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-11-26 14:16:01 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-11-26 14:16:01 +0000
commit6ed85130d3560f7f39352fb164735721b11aa0be (patch)
tree7b363e9e6a5efe2839b142c555b05ddf48de0769
parent2e2693126d0b56d182221232ffdf5a75ddda01d1 (diff)
downloadqpid-python-6ed85130d3560f7f39352fb164735721b11aa0be.tar.gz
QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1@598285 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/etc/config.xml2
-rw-r--r--java/broker/pom.xml20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java47
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java74
-rw-r--r--java/common/pom.xml10
-rw-r--r--java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java48
-rw-r--r--java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java272
-rw-r--r--java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java197
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java547
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java487
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java67
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java1034
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java240
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java488
17 files changed, 3550 insertions, 79 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index 2257a612b3..b3b6a2877f 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -33,6 +33,7 @@
<keystorePassword>keystorepass</keystorePassword>
</ssl>-->
<qpidnio>true</qpidnio>
+ <protectio>true</protectio>
<transport>nio</transport>
<port>5672</port>
<sslport>8672</sslport>
@@ -174,3 +175,4 @@
</broker>
+
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
index 9e38751d62..dbc3b2aea9 100644
--- a/java/broker/pom.xml
+++ b/java/broker/pom.xml
@@ -6,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,16 +49,16 @@
<artifactId>log4j</artifactId>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.4.0</version>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.4.0</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.4.0</version>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.4.0</version>
</dependency>
<dependency>
@@ -208,7 +208,7 @@
<isset property="skip.python.tests"/>
</condition>
- <property name="command"
+ <property name="command"
value="python run-tests -v -I java_failing.txt -b localhost:2100"/>
<!--value="bash -c 'python run-tests -v -I java_failing.txt'"/>-->
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index a87c704cf8..ab9f40b31d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -436,7 +436,7 @@ public class Main
}
}
- //fixme qpid.AMQP should be using qpidproperties to get value
+ //fixme qpid.AMQP should be using qpidproperties to get value
_brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 476e608b01..c5ec28e626 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,10 +23,15 @@ package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.SessionUtil;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
@@ -51,7 +56,6 @@ import java.net.InetSocketAddress;
*
* We delegate all frame (message) processing to the AMQProtocolSession which wraps
* the state for the connection.
- *
*/
public class AMQPFastProtocolHandler extends IoHandlerAdapter
{
@@ -115,11 +119,41 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
}
}
+
+ if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false))
+ {
+ try
+ {
+// //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
+
+ int buf_size = 32768;
+ if (protocolSession.getConfig() instanceof SocketSessionConfig)
+ {
+ buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize();
+ }
+
+ protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.attach(chain);
+
+ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ _logger.info("Using IO Read/Write Filter Protection");
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+ }
+ }
}
- /**
- * Separated into its own, protected, method to allow easier reuse
- */
+ /** Separated into its own, protected, method to allow easier reuse */
protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
{
new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
@@ -193,8 +227,10 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
/**
* Invoked when a message is received on a particular protocol session. Note that a
* protocol session is directly tied to a particular physical connection.
+ *
* @param protocolSession the protocol session that received the message
- * @param message the message itself (i.e. a decoded frame)
+ * @param message the message itself (i.e. a decoded frame)
+ *
* @throws Exception if the message cannot be processed
*/
public void messageReceived(IoSession protocolSession, Object message) throws Exception
@@ -204,7 +240,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
if (message instanceof AMQDataBlock)
{
amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
-
+
}
else if (message instanceof ByteBuffer)
{
@@ -218,9 +254,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
/**
* Called after a message has been sent out on a particular protocol session
+ *
* @param protocolSession the protocol session (i.e. connection) on which this
- * message was sent
- * @param object the message (frame) that was encoded and sent
+ * message was sent
+ * @param object the message (frame) that was encoded and sent
+ *
* @throws Exception if we want to indicate an error
*/
public void messageSent(IoSession protocolSession, Object object) throws Exception
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
index a4ed859fa7..1dcbb02d5c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,9 +23,12 @@ package org.apache.qpid.server.transport;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.util.NewThreadExecutor;
import org.apache.qpid.configuration.Configured;
+import org.apache.log4j.Logger;
public class ConnectorConfiguration
{
+ private static final Logger _logger = Logger.getLogger(ConnectorConfiguration.class);
+
public static final String DEFAULT_PORT = "5672";
public static final String SSL_PORT = "8672";
@@ -41,7 +44,7 @@ public class ConnectorConfiguration
@Configured(path = "connector.bind",
defaultValue = "wildcard")
public String bindAddress;
-
+
@Configured(path = "connector.socketReceiveBuffer",
defaultValue = "32767")
public int socketReceiveBufferSize;
@@ -69,29 +72,43 @@ public class ConnectorConfiguration
@Configured(path = "connector.ssl.enabled",
defaultValue = "false")
public boolean enableSSL;
-
+
@Configured(path = "connector.ssl.sslOnly",
- defaultValue = "true")
+ defaultValue = "true")
public boolean sslOnly;
-
+
@Configured(path = "connector.ssl.port",
- defaultValue = SSL_PORT)
- public int sslPort;
-
+ defaultValue = SSL_PORT)
+ public int sslPort;
+
@Configured(path = "connector.ssl.keystorePath",
- defaultValue = "none")
+ defaultValue = "none")
public String keystorePath;
-
+
@Configured(path = "connector.ssl.keystorePassword",
- defaultValue = "none")
+ defaultValue = "none")
public String keystorePassword;
-
+
@Configured(path = "connector.ssl.certType",
- defaultValue = "SunX509")
+ defaultValue = "SunX509")
public String certType;
+ @Configured(path = "connector.qpidnio",
+ defaultValue = "true")
+ public boolean _multiThreadNIO;
+
+
public IoAcceptor createAcceptor()
{
- return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());
+ if (_multiThreadNIO)
+ {
+ _logger.warn("Using Qpid Multithreaded IO Processing");
+ return new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(processors, new NewThreadExecutor());
+ }
+ else
+ {
+ _logger.warn("Using Mina IO Processing");
+ return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 19142067cb..dd27f07a9a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -21,12 +21,16 @@
package org.apache.qpid.client.protocol;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
-
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -216,6 +220,36 @@ public class AMQProtocolHandler extends IoHandlerAdapter
e.printStackTrace();
}
+ if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio"))
+ {
+ try
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = session.getFilterChain();
+
+ int buf_size = 32768;
+ if (session.getConfig() instanceof SocketSessionConfig)
+ {
+ buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize();
+ }
+ session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.attach(chain);
+ session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+
+ _logger.info("Using IO Read/Write Filter Protection");
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+ }
+ }
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 1d0d6a3491..040b5f7b68 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -23,14 +23,13 @@ package org.apache.qpid.client.transport;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,40 +89,41 @@ public class TransportConnection
switch (transport)
{
- case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ case TCP:
+ _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
{
- public IoConnector newSocketConnector()
+ SocketConnector result;
+ // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
+ if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio"))
{
- SocketConnector result;
- // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
- if (Boolean.getBoolean("qpidnio"))
- {
- _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set.");
- // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
- }
- // else
-
- {
- _logger.info("Using Mina NIO");
- result = new SocketConnector(); // non-blocking connector
- }
-
- // Don't have the connector's worker thread wait around for other connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- result.setWorkerTimeout(0);
-
- return result;
+ _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
+ ? "Qpid NIO is new default"
+ : "Sysproperty 'qpidnio' is set"));
+ result = new MultiThreadSocketConnector();
+ }
+ else
+ {
+ _logger.info("Using Mina NIO");
+ result = new SocketConnector(); // non-blocking connector
}
- });
- break;
- case VM:
- {
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- break;
- }
+ // Don't have the connector's worker thread wait around for other connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows short-running
+ // clients (like unit tests) to complete quickly.
+ result.setWorkerTimeout(0);
+
+ return result;
+ }
+ });
+ break;
+
+ case VM:
+ {
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ break;
+ }
}
return _instance;
@@ -145,7 +145,7 @@ public class TransportConnection
}
private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
- throws AMQVMBrokerCreationException
+ throws AMQVMBrokerCreationException
{
int port = details.getPort();
@@ -160,7 +160,7 @@ public class TransportConnection
else
{
throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
- + " does not exist. Auto create disabled.", null);
+ + " does not exist. Auto create disabled.", null);
}
}
}
@@ -257,8 +257,8 @@ public class TransportConnection
IoHandlerAdapter provider;
try
{
- Class[] cnstr = { Integer.class };
- Object[] params = { port };
+ Class[] cnstr = {Integer.class};
+ Object[] params = {port};
provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
// Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
@@ -277,7 +277,7 @@ public class TransportConnection
}
AMQVMBrokerCreationException amqbce =
- new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
amqbce.initCause(e);
throw amqbce;
}
diff --git a/java/common/pom.xml b/java/common/pom.xml
index c4352a2a8c..4c94e3d1b5 100644
--- a/java/common/pom.xml
+++ b/java/common/pom.xml
@@ -6,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,7 +45,7 @@
</properties>
<build>
- <plugins>
+ <plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
@@ -114,14 +114,14 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.4.0</version>
+ <version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.4.0</version>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
diff --git a/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java b/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
new file mode 100644
index 0000000000..47f19aa76d
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
@@ -0,0 +1,48 @@
+package org.apache.mina.filter;
+
+import org.apache.mina.common.IoFilter;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class WriteBufferFullExeception extends RuntimeException
+{
+ private IoFilter.WriteRequest _writeRequest;
+
+ public WriteBufferFullExeception()
+ {
+ this(null);
+ }
+
+ public WriteBufferFullExeception(IoFilter.WriteRequest writeRequest)
+ {
+ _writeRequest = writeRequest;
+ }
+
+
+ public void setWriteRequest(IoFilter.WriteRequest writeRequest)
+ {
+ _writeRequest = writeRequest;
+ }
+
+ public IoFilter.WriteRequest getWriteRequest()
+ {
+ return _writeRequest;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java b/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
new file mode 100644
index 0000000000..d063200cf6
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than
+ * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the
+ * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to
+ * cause an Out of Memory exception due to a back log of unprocessed messages.
+ *
+ * This is should only be viewed as a temporary work around for DIRMINA-302.
+ *
+ * A true solution should not be implemented as a filter as this issue will always occur. On a machine
+ * where the network is slower than the local producer.
+ *
+ * Suggested improvement is to allow implementation of policices on what to do when buffer is full.
+ *
+ * They could be:
+ * Block - As this does
+ * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks
+ * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state
+ *
+ * <p/>
+ * <p>Usage:
+ * <p/>
+ * <pre><code>
+ * DefaultFilterChainBuilder builder = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( builder );
+ * </code></pre>
+ * <p/>
+ * or
+ * <p/>
+ * <pre><code>
+ * IoFilterChain chain = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( chain );
+ * </code></pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class WriteBufferLimitFilterBuilder
+{
+ public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize";
+
+ private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000;
+
+ private volatile boolean throwNotBlock = false;
+
+ private volatile int maximumConnectionBufferCount;
+ private volatile long maximumConnectionBufferSize;
+
+ private final Object _blockLock = new Object();
+
+ private int _blockWaiters = 0;
+
+
+ public WriteBufferLimitFilterBuilder()
+ {
+ this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT);
+ }
+
+ public WriteBufferLimitFilterBuilder(int maxWriteBufferSize)
+ {
+ setMaximumConnectionBufferCount(maxWriteBufferSize);
+ }
+
+
+ /**
+ * Set the maximum amount pending items in the writeQueue for a given session.
+ * Changing the value will only take effect when new data is received for a
+ * connection, including existing connections. Default value is 5000 msgs.
+ *
+ * @param maximumConnectionBufferCount New buffer size. Must be > 0
+ */
+ public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount)
+ {
+ this.maximumConnectionBufferCount = maximumConnectionBufferCount;
+ this.maximumConnectionBufferSize = 0;
+ }
+
+ public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize)
+ {
+ this.maximumConnectionBufferSize = maximumConnectionBufferSize;
+ this.maximumConnectionBufferCount = 0;
+ }
+
+ /**
+ * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself
+ * before and after that filter.
+ *
+ * @param chain {@link IoFilterChain} to attach self to.
+ */
+ public void attach(IoFilterChain chain)
+ {
+ String name = getThreadPoolFilterEntryName(chain.getAll());
+
+ chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
+ }
+
+ /**
+ * Attach this filter to the specified builder. It will search for the
+ * {@link ExecutorFilter}, and attach itself before and after that filter.
+ *
+ * @param builder {@link DefaultIoFilterChainBuilder} to attach self to.
+ */
+ public void attach(DefaultIoFilterChainBuilder builder)
+ {
+ String name = getThreadPoolFilterEntryName(builder.getAll());
+
+ builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
+ }
+
+ private String getThreadPoolFilterEntryName(List entries)
+ {
+ Iterator i = entries.iterator();
+
+ while (i.hasNext())
+ {
+ IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next();
+
+ if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class))
+ {
+ return entry.getName();
+ }
+ }
+
+ throw new IllegalStateException("Chain does not contain a ExecutorFilter");
+ }
+
+
+ public class SendLimit extends IoFilterAdapter
+ {
+ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+ {
+ try
+ {
+ waitTillSendAllowed(session);
+ }
+ catch (WriteBufferFullExeception wbfe)
+ {
+ nextFilter.exceptionCaught(session, wbfe);
+ }
+
+ if (writeRequest.getMessage() instanceof ByteBuffer)
+ {
+ increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage());
+ }
+
+ nextFilter.filterWrite(session, writeRequest);
+ }
+
+ private void increasePendingWriteSize(IoSession session, ByteBuffer message)
+ {
+ synchronized (session)
+ {
+ Long pendingSize = getScheduledWriteBytes(session) + message.remaining();
+ session.setAttribute(PENDING_SIZE, pendingSize);
+ }
+ }
+
+ private boolean sendAllowed(IoSession session)
+ {
+ if (session.isClosing())
+ {
+ return true;
+ }
+
+ int lmswm = maximumConnectionBufferCount;
+ long lmswb = maximumConnectionBufferSize;
+
+ return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm)
+ && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb);
+ }
+
+ private long getScheduledWriteBytes(IoSession session)
+ {
+ synchronized (session)
+ {
+ Long i = (Long) session.getAttribute(PENDING_SIZE);
+ return null == i ? 0 : i;
+ }
+ }
+
+ private void waitTillSendAllowed(IoSession session)
+ {
+ synchronized (_blockLock)
+ {
+ if (throwNotBlock)
+ {
+ throw new WriteBufferFullExeception();
+ }
+
+ _blockWaiters++;
+
+ while (!sendAllowed(session))
+ {
+ try
+ {
+ _blockLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore.
+ }
+ }
+ _blockWaiters--;
+ }
+ }
+
+ public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
+ {
+ if (message instanceof ByteBuffer)
+ {
+ decrementPendingWriteSize(session, (ByteBuffer) message);
+ }
+ notifyWaitingWriters();
+ nextFilter.messageSent(session, message);
+ }
+
+ private void decrementPendingWriteSize(IoSession session, ByteBuffer message)
+ {
+ synchronized (session)
+ {
+ session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining());
+ }
+ }
+
+ private void notifyWaitingWriters()
+ {
+ synchronized (_blockLock)
+ {
+ if (_blockWaiters != 0)
+ {
+ _blockLock.notifyAll();
+ }
+ }
+
+ }
+
+ }//SentLimit
+
+
+}
diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
new file mode 100644
index 0000000000..2ab3ddb50e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link ProtocolDecoder} that cumulates the content of received
+ * buffers to a <em>cumulative buffer</em> to help users implement decoders.
+ * <p>
+ * If the received {@link ByteBuffer} is only a part of a message.
+ * decoders should cumulate received buffers to make a message complete or
+ * to postpone decoding until more buffers arrive.
+ * <p>
+ * Here is an example decoder that decodes CRLF terminated lines into
+ * <code>Command</code> objects:
+ * <pre>
+ * public class CRLFTerminatedCommandLineDecoder
+ * extends CumulativeProtocolDecoder {
+ *
+ * private Command parseCommand(ByteBuffer in) {
+ * // Convert the bytes in the specified buffer to a
+ * // Command object.
+ * ...
+ * }
+ *
+ * protected boolean doDecode(IoSession session, ByteBuffer in,
+ * ProtocolDecoderOutput out)
+ * throws Exception {
+ *
+ * // Remember the initial position.
+ * int start = in.position();
+ *
+ * // Now find the first CRLF in the buffer.
+ * byte previous = 0;
+ * while (in.hasRemaining()) {
+ * byte current = in.get();
+ *
+ * if (previous == '\r' && current == '\n') {
+ * // Remember the current position and limit.
+ * int position = in.position();
+ * int limit = in.limit();
+ * try {
+ * in.position(start);
+ * in.limit(position);
+ * // The bytes between in.position() and in.limit()
+ * // now contain a full CRLF terminated line.
+ * out.write(parseCommand(in.slice()));
+ * } finally {
+ * // Set the position to point right after the
+ * // detected line and set the limit to the old
+ * // one.
+ * in.position(position);
+ * in.limit(limit);
+ * }
+ * // Decoded one line; CumulativeProtocolDecoder will
+ * // call me again until I return false. So just
+ * // return true until there are no more lines in the
+ * // buffer.
+ * return true;
+ * }
+ *
+ * previous = current;
+ * }
+ *
+ * // Could not find CRLF in the buffer. Reset the initial
+ * // position to the one we recorded above.
+ * in.position(start);
+ *
+ * return false;
+ * }
+ * }
+ * </pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter {
+
+ private static final String BUFFER = OurCumulativeProtocolDecoder.class
+ .getName()
+ + ".Buffer";
+
+ /**
+ * Creates a new instance.
+ */
+ protected OurCumulativeProtocolDecoder() {
+ }
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is NOT compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception {
+ boolean usingSessionBuffer = true;
+ ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER);
+ // If we have a session buffer, append data to that; otherwise
+ // use the buffer read from the network directly.
+ if (buf != null) {
+ buf.put(in);
+ buf.flip();
+ } else {
+ buf = in;
+ usingSessionBuffer = false;
+ }
+
+ for (;;) {
+ int oldPos = buf.position();
+ boolean decoded = doDecode(session, buf, out);
+ if (decoded) {
+ if (buf.position() == oldPos) {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed.");
+ }
+
+ if (!buf.hasRemaining()) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if (buf.hasRemaining()) {
+ storeRemainingInSession(buf, session);
+ } else {
+ if (usingSessionBuffer)
+ removeSessionBuffer(session);
+ }
+ }
+
+ /**
+ * Implement this method to consume the specified cumulative buffer and
+ * decode its content into message(s).
+ *
+ * @param in the cumulative buffer
+ * @return <tt>true</tt> if and only if there's more to decode in the buffer
+ * and you want to have <tt>doDecode</tt> method invoked again.
+ * Return <tt>false</tt> if remaining data is not enough to decode,
+ * then this method will be invoked again when more data is cumulated.
+ * @throws Exception if cannot decode <tt>in</tt>.
+ */
+ protected abstract boolean doDecode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception;
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose(IoSession session) throws Exception {
+ removeSessionBuffer(session);
+ }
+
+ private void removeSessionBuffer(IoSession session) {
+ ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER);
+ if (buf != null) {
+ buf.release();
+ }
+ }
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session) {
+ ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity());
+ remainingBuf.setAutoExpand(true);
+ remainingBuf.order(buf.order());
+ remainingBuf.put(buf);
+ session.setAttribute(BUFFER, remainingBuf);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
new file mode 100644
index 0000000000..b56a649baa
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.NamePreservingRunnable;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class MultiThreadSocketAcceptor extends SocketAcceptor
+{
+ /**
+ * @noinspection StaticNonFinalField
+ */
+ private static volatile int nextId = 0;
+
+ private final Executor executor;
+ private final Object lock = new Object();
+ private final int id = nextId ++;
+ private final String threadName = "SocketAcceptor-" + id;
+ private final Map channels = new HashMap();
+
+ private final Queue registerQueue = new Queue();
+ private final Queue cancelQueue = new Queue();
+
+ private final MultiThreadSocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+
+ /**
+ * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+ */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+
+ /**
+ * Create an acceptor with a single processing thread using a NewThreadExecutor
+ */
+ public MultiThreadSocketAcceptor()
+ {
+ this( 1, new NewThreadExecutor() );
+ }
+
+ /**
+ * Create an acceptor with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public MultiThreadSocketAcceptor( int processorCount, Executor executor )
+ {
+ if( processorCount < 1 )
+ {
+ throw new IllegalArgumentException( "Must have at least one processor" );
+ }
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
+
+ for( int i = 0; i < processorCount; i++ )
+ {
+ ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor );
+ }
+ }
+
+
+ /**
+ * Binds to the specified <code>address</code> and handles incoming connections with the specified
+ * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property.
+ *
+ * @throws IOException if failed to bind
+ */
+ public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException
+ {
+ if( handler == null )
+ {
+ throw new NullPointerException( "handler" );
+ }
+
+ if( address != null && !( address instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+ }
+
+ if( config == null )
+ {
+ config = getDefaultConfig();
+ }
+
+ RegistrationRequest request = new RegistrationRequest( address, handler, config );
+
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+
+ startupWorker();
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ throw request.exception;
+ }
+ }
+
+
+ private synchronized void startupWorker() throws IOException
+ {
+ synchronized( lock )
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+
+ executor.execute( new NamePreservingRunnable( worker ) );
+ }
+ }
+ }
+
+ public void unbind( SocketAddress address )
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ CancellationRequest request = new CancellationRequest( address );
+
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply throw
+ // IllegalArgumentException here because we can simply
+ // conclude that nothing is bound to the selector.
+ throw new IllegalArgumentException( "Address not bound: " + address );
+ }
+
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( request );
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ request.exception.fillInStackTrace();
+
+ throw request.exception;
+ }
+ }
+
+
+ private class Worker implements Runnable
+ {
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName );
+
+ for( ; ; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ cancelKeys();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( lock )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e1 );
+ }
+ }
+ }
+ }
+
+ private void processSessions( Set keys ) throws IOException
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ it.remove();
+
+ if( !key.isAcceptable() )
+ {
+ continue;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+
+ SocketChannel ch = ssc.accept();
+
+ if( ch == null )
+ {
+ continue;
+ }
+
+ boolean success = false;
+ try
+ {
+
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+
+ MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl(
+ MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(),
+ req.config, ch, req.handler, req.address );
+
+ // New Interface
+// SocketSessionImpl session = new SocketSessionImpl(
+// SocketAcceptor.this, nextProcessor(), getListeners(),
+// req.config, ch, req.handler, req.address );
+
+
+ getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
+ session.getIoProcessor().addNew( session );
+ success = true;
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ }
+ finally
+ {
+ if( !success )
+ {
+ ch.close();
+ }
+ }
+ }
+ }
+ }
+
+ private MultiThreadSocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ RegistrationRequest req;
+
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = null;
+
+ try
+ {
+ ssc = ServerSocketChannel.open();
+ ssc.configureBlocking( false );
+
+ // Configure the server socket,
+ SocketAcceptorConfig cfg;
+ if( req.config instanceof SocketAcceptorConfig )
+ {
+ cfg = ( SocketAcceptorConfig ) req.config;
+ }
+ else
+ {
+ cfg = ( SocketAcceptorConfig ) getDefaultConfig();
+ }
+
+ ssc.socket().setReuseAddress( cfg.isReuseAddress() );
+ ssc.socket().setReceiveBufferSize(
+ ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() );
+
+ // and bind.
+ ssc.socket().bind( req.address, cfg.getBacklog() );
+ if( req.address == null || req.address.getPort() == 0 )
+ {
+ req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress();
+ }
+ ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+ synchronized( channels )
+ {
+ channels.put( req.address, ssc );
+ }
+
+ getListeners().fireServiceActivated(
+ this, req.address, req.handler, req.config );
+ }
+ catch( IOException e )
+ {
+ req.exception = e;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+
+ req.notifyAll();
+ }
+
+ if( ssc != null && req.exception != null )
+ {
+ try
+ {
+ ssc.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+ }
+ }
+
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ CancellationRequest request;
+
+ synchronized( cancelQueue )
+ {
+ request = ( CancellationRequest ) cancelQueue.pop();
+ }
+
+ if( request == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc;
+ synchronized( channels )
+ {
+ ssc = ( ServerSocketChannel ) channels.remove( request.address );
+ }
+
+ // close the channel
+ try
+ {
+ if( ssc == null )
+ {
+ request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+ }
+ else
+ {
+ SelectionKey key = ssc.keyFor( selector );
+ request.registrationRequest = ( RegistrationRequest ) key.attachment();
+ key.cancel();
+
+ selector.wakeup(); // wake up again to trigger thread death
+
+ ssc.close();
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ synchronized( request )
+ {
+ request.done = true;
+ request.notifyAll();
+ }
+
+ if( request.exception == null )
+ {
+ getListeners().fireServiceDeactivated(
+ this, request.address,
+ request.registrationRequest.handler,
+ request.registrationRequest.config );
+ }
+ }
+ }
+ }
+
+ private static class RegistrationRequest
+ {
+ private InetSocketAddress address;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+ private IOException exception;
+ private boolean done;
+
+ private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config )
+ {
+ this.address = ( InetSocketAddress ) address;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+
+
+ private static class CancellationRequest
+ {
+ private final SocketAddress address;
+ private boolean done;
+ private RegistrationRequest registrationRequest;
+ private RuntimeException exception;
+
+ private CancellationRequest( SocketAddress address )
+ {
+ this.address = address;
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
new file mode 100644
index 0000000000..202ac1a530
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class MultiThreadSocketConnector extends SocketConnector
+{
+ /** @noinspection StaticNonFinalField */
+ private static volatile int nextId = 0;
+
+ private final Object lock = new Object();
+ private final int id = nextId++;
+ private final String threadName = "SocketConnector-" + id;
+ private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
+ private final Queue connectQueue = new Queue();
+ private final MultiThreadSocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+ private final Executor executor;
+
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+ private int workerTimeout = 60; // 1 min.
+
+ /** Create a connector with a single processing thread using a NewThreadExecutor */
+ public MultiThreadSocketConnector()
+ {
+ this(1, new NewThreadExecutor());
+ }
+
+ /**
+ * Create a connector with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public MultiThreadSocketConnector(int processorCount, Executor executor)
+ {
+ if (processorCount < 1)
+ {
+ throw new IllegalArgumentException("Must have at least one processor");
+ }
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
+
+ for (int i = 0; i < processorCount; i++)
+ {
+ ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
+ }
+ }
+
+ /**
+ * How many seconds to keep the connection thread alive between connection requests
+ *
+ * @return Number of seconds to keep connection thread alive
+ */
+ public int getWorkerTimeout()
+ {
+ return workerTimeout;
+ }
+
+ /**
+ * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
+ *
+ * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
+ */
+ public void setWorkerTimeout(int workerTimeout)
+ {
+ if (workerTimeout < 0)
+ {
+ throw new IllegalArgumentException("Must be >= 0");
+ }
+ this.workerTimeout = workerTimeout;
+ }
+
+ public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
+ {
+ return connect(address, null, handler, config);
+ }
+
+ public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
+ IoHandler handler, IoServiceConfig config)
+ {
+ if (address == null)
+ {
+ throw new NullPointerException("address");
+ }
+ if (handler == null)
+ {
+ throw new NullPointerException("handler");
+ }
+
+ if (!(address instanceof InetSocketAddress))
+ {
+ throw new IllegalArgumentException("Unexpected address type: "
+ + address.getClass());
+ }
+
+ if (localAddress != null && !(localAddress instanceof InetSocketAddress))
+ {
+ throw new IllegalArgumentException("Unexpected local address type: "
+ + localAddress.getClass());
+ }
+
+ if (config == null)
+ {
+ config = getDefaultConfig();
+ }
+
+ SocketChannel ch = null;
+ boolean success = false;
+ try
+ {
+ ch = SocketChannel.open();
+ ch.socket().setReuseAddress(true);
+ if (localAddress != null)
+ {
+ ch.socket().bind(localAddress);
+ }
+
+ ch.configureBlocking(false);
+
+ if (ch.connect(address))
+ {
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ newSession(ch, handler, config, future);
+ success = true;
+ return future;
+ }
+
+ success = true;
+ }
+ catch (IOException e)
+ {
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ finally
+ {
+ if (!success && ch != null)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+
+ ConnectionRequest request = new ConnectionRequest(ch, handler, config);
+ synchronized (lock)
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch (IOException e)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e2)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e2);
+ }
+
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ }
+
+ synchronized (connectQueue)
+ {
+ connectQueue.push(request);
+ }
+ selector.wakeup();
+
+ return request;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if (worker == null)
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ executor.execute(new NamePreservingRunnable(worker));
+ }
+ }
+
+ private void registerNew()
+ {
+ if (connectQueue.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ ConnectionRequest req;
+ synchronized (connectQueue)
+ {
+ req = (ConnectionRequest) connectQueue.pop();
+ }
+
+ if (req == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = req.channel;
+ try
+ {
+ ch.register(selector, SelectionKey.OP_CONNECT, req);
+ }
+ catch (IOException e)
+ {
+ req.setException(e);
+ }
+ }
+ }
+
+ private void processSessions(Set keys)
+ {
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isConnectable())
+ {
+ continue;
+ }
+
+ SocketChannel ch = (SocketChannel) key.channel();
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ boolean success = false;
+ try
+ {
+ ch.finishConnect();
+ newSession(ch, entry.handler, entry.config, entry);
+ success = true;
+ }
+ catch (Throwable e)
+ {
+ entry.setException(e);
+ }
+ finally
+ {
+ key.cancel();
+ if (!success)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ keys.clear();
+ }
+
+ private void processTimedOutSessions(Set keys)
+ {
+ long currentTime = System.currentTimeMillis();
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isValid())
+ {
+ continue;
+ }
+
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ if (currentTime >= entry.deadline)
+ {
+ entry.setException(new ConnectException());
+ try
+ {
+ key.channel().close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ key.cancel();
+ }
+ }
+ }
+ }
+
+ private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+ {
+ MultiThreadSocketSessionImpl session =
+ new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(),
+ config, ch, handler, ch.socket().getRemoteSocketAddress());
+
+ //new interface
+// SocketSessionImpl session = new SocketSessionImpl(
+// this, nextProcessor(), getListeners(),
+// config, ch, handler, ch.socket().getRemoteSocketAddress() );
+ try
+ {
+ getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getThreadModel().buildFilterChain(session.getFilterChain());
+ }
+ catch (Throwable e)
+ {
+ throw (IOException) new IOException("Failed to create a session.").initCause(e);
+ }
+
+ // Set the ConnectFuture of the specified session, which will be
+ // removed and notified by AbstractIoFilterChain eventually.
+// session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
+ session.setAttribute(AbstractIoFilterChain.class.getName() + ".connectFuture", connectFuture);
+
+ // Forward the remaining process to the SocketIoProcessor.
+ session.getIoProcessor().addNew(session);
+ }
+
+ private MultiThreadSocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+ private class Worker implements Runnable
+ {
+ private long lastActive = System.currentTimeMillis();
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName);
+
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = selector.select(1000);
+
+ registerNew();
+
+ if (nKeys > 0)
+ {
+ processSessions(selector.selectedKeys());
+ }
+
+ processTimedOutSessions(selector.keys());
+
+ if (selector.keys().isEmpty())
+ {
+ if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
+ {
+ synchronized (lock)
+ {
+ if (selector.keys().isEmpty() &&
+ connectQueue.isEmpty())
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ else
+ {
+ lastActive = System.currentTimeMillis();
+ }
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ }
+ }
+
+ private class ConnectionRequest extends DefaultConnectFuture
+ {
+ private final SocketChannel channel;
+ private final long deadline;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+
+ private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
+ {
+ this.channel = channel;
+ long timeout;
+ if (config instanceof IoConnectorConfig)
+ {
+ timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
+ }
+ else
+ {
+ timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
+ }
+ this.deadline = System.currentTimeMillis() + timeout;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
new file mode 100644
index 0000000000..67b8c8d820
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoFilterChain} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ */
+class MultiThreadSocketFilterChain extends AbstractIoFilterChain {
+
+ MultiThreadSocketFilterChain( IoSession parent )
+ {
+ super( parent );
+ }
+
+ protected void doWrite( IoSession session, WriteRequest writeRequest )
+ {
+ MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
+ Queue writeRequestQueue = s.getWriteRequestQueue();
+
+ // SocketIoProcessor.doFlush() will reset it after write is finished
+ // because the buffer will be passed with messageSent event.
+ ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.push( writeRequest );
+ if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
+ {
+ // Notify SocketIoProcessor only when writeRequestQueue was empty.
+ s.getIoProcessor().flush( s );
+ }
+ }
+ }
+
+ protected void doClose( IoSession session ) throws IOException
+ {
+ MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
+ s.getIoProcessor().remove( s );
+ }
+}
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
new file mode 100644
index 0000000000..11c54bb248
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
@@ -0,0 +1,1034 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.util.IdentityHashSet;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.Queue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$,
+ */
+class MultiThreadSocketIoProcessor extends SocketIoProcessor
+{
+ Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class);
+ Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader");
+ Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer");
+
+ private static final long SELECTOR_TIMEOUT = 1000L;
+
+ private int MAX_READ_BYTES_PER_SESSION = 524288; //512K
+ private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K
+
+ private final Object readLock = new Object();
+ private final Object writeLock = new Object();
+
+ private final String threadName;
+ private final Executor executor;
+
+ private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
+
+ /**
+ * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+ */
+ private volatile Selector selector, writeSelector;
+
+ private final Queue newSessions = new Queue();
+ private final Queue removingSessions = new Queue();
+ private final BlockingQueue flushingSessions = new LinkedBlockingQueue();
+ private final IdentityHashSet flushingSessionsSet = new IdentityHashSet();
+
+ private final Queue trafficControllingSessions = new Queue();
+
+ private ReadWorker readWorker;
+ private WriteWorker writeWorker;
+ private long lastIdleReadCheckTime = System.currentTimeMillis();
+ private long lastIdleWriteCheckTime = System.currentTimeMillis();
+
+ MultiThreadSocketIoProcessor(String threadName, Executor executor)
+ {
+ super(threadName, executor);
+ this.threadName = threadName;
+ this.executor = executor;
+ }
+
+ void addNew( SocketSessionImpl session ) throws IOException
+ {
+ synchronized( newSessions )
+ {
+ newSessions.push( session );
+ }
+
+ startupWorker();
+
+ selector.wakeup();
+ writeSelector.wakeup();
+ }
+
+ void remove( SocketSessionImpl session ) throws IOException
+ {
+ scheduleRemove( session );
+ startupWorker();
+ selector.wakeup();
+ }
+
+ private void startupWorker() throws IOException
+ {
+ synchronized(readLock)
+ {
+ if (readWorker == null)
+ {
+ selector = Selector.open();
+ readWorker = new ReadWorker();
+ executor.execute(new NamePreservingRunnable(readWorker));
+ }
+ }
+
+ synchronized(writeLock)
+ {
+ if (writeWorker == null)
+ {
+ writeSelector = Selector.open();
+ writeWorker = new WriteWorker();
+ executor.execute(new NamePreservingRunnable(writeWorker));
+ }
+ }
+
+ }
+
+ void flush( SocketSessionImpl session )
+ {
+ scheduleFlush( session );
+ Selector selector = this.writeSelector;
+
+ if( selector != null )
+ {
+ selector.wakeup();
+ }
+ }
+
+ void updateTrafficMask( SocketSessionImpl session )
+ {
+ scheduleTrafficControl( session );
+ Selector selector = this.selector;
+ if( selector != null )
+ {
+ selector.wakeup();
+ }
+ }
+
+ private void scheduleRemove( SocketSessionImpl session )
+ {
+ synchronized( removingSessions )
+ {
+ removingSessions.push( session );
+ }
+ }
+
+ private void scheduleFlush( SocketSessionImpl session )
+ {
+ synchronized(flushingSessionsSet)
+ {
+ //if flushingSessions grows to contain Integer.MAX_VALUE sessions
+ // then this will fail.
+ if (flushingSessionsSet.add(session))
+ {
+ flushingSessions.offer(session);
+ }
+ }
+ }
+
+ private void scheduleTrafficControl( SocketSessionImpl session )
+ {
+ synchronized( trafficControllingSessions )
+ {
+ trafficControllingSessions.push( session );
+ }
+ }
+
+ private void doAddNewReader() throws InterruptedException
+ {
+ if( newSessions.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized( newSessions )
+ {
+ session = (MultiThreadSocketSessionImpl) newSessions.peek();
+ }
+
+ if( session == null )
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+
+
+ try
+ {
+
+ ch.configureBlocking( false );
+ session.setSelectionKey( ch.register( selector,
+ SelectionKey.OP_READ,
+ session ) );
+
+
+ //System.out.println("ReadDebug:"+"Awaiting Registration");
+ session.awaitRegistration();
+ sessionCreated(session);
+ }
+ catch( IOException e )
+ {
+ // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+ // and call ConnectFuture.setException().
+ session.getFilterChain().fireExceptionCaught( session, e );
+ }
+ }
+ }
+
+
+ private void doAddNewWrite() throws InterruptedException
+ {
+ if (newSessions.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized(newSessions)
+ {
+ session = (MultiThreadSocketSessionImpl) newSessions.peek();
+ }
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+
+ try
+ {
+ ch.configureBlocking(false);
+ synchronized(flushingSessionsSet)
+ {
+ flushingSessionsSet.add(session);
+ }
+
+ session.setWriteSelectionKey(ch.register(writeSelector,
+ SelectionKey.OP_WRITE,
+ session));
+
+ //System.out.println("WriteDebug:"+"Awaiting Registration");
+ session.awaitRegistration();
+ sessionCreated(session);
+ }
+ catch (IOException e)
+ {
+
+ // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+ // and call ConnectFuture.setException().
+ session.getFilterChain().fireExceptionCaught( session, e );
+ }
+ }
+ }
+
+
+
+ private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
+ {
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
+ synchronized(newSessions)
+ {
+ if (!session.created())
+ {
+ _logger.debug("Popping new session");
+ newSessions.pop();
+
+ // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
+ // in AbstractIoFilterChain.fireSessionOpened().
+ session.getServiceListeners().fireSessionCreated( session );
+
+ session.doneCreation();
+ }
+ }
+ }
+
+ private void doRemove()
+ {
+ if( removingSessions.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized( removingSessions )
+ {
+ session = (MultiThreadSocketSessionImpl) removingSessions.pop();
+ }
+
+ if( session == null )
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+ SelectionKey key = session.getReadSelectionKey();
+ SelectionKey writeKey = session.getWriteSelectionKey();
+
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.close() is called before addSession() is processed)
+ if (key == null || writeKey == null)
+ {
+ scheduleRemove( session );
+ break;
+ }
+ // skip if channel is already closed
+ if (!key.isValid() || !writeKey.isValid())
+ {
+ continue;
+ }
+
+ try
+ {
+ //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
+ synchronized(readLock)
+ {
+ key.cancel();
+ }
+ synchronized(writeLock)
+ {
+ writeKey.cancel();
+ }
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ session.getFilterChain().fireExceptionCaught( session, e );
+ }
+ finally
+ {
+ releaseWriteBuffers( session );
+ session.getServiceListeners().fireSessionDestroyed( session );
+ }
+ }
+ }
+
+ private void processRead(Set selectedKeys)
+ {
+ Iterator it = selectedKeys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
+
+ synchronized(readLock)
+ {
+ if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
+ {
+ read( session );
+ }
+ }
+
+ }
+
+ selectedKeys.clear();
+ }
+
+ private void processWrite(Set selectedKeys)
+ {
+ Iterator it = selectedKeys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+
+ synchronized(writeLock)
+ {
+ if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
+ {
+
+ // Clear OP_WRITE
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+ synchronized(flushingSessionsSet)
+ {
+ flushingSessions.offer(session);
+ }
+ }
+ }
+ }
+
+ selectedKeys.clear();
+ }
+
+ private void read(SocketSessionImpl session)
+ {
+
+ //if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session));
+ }
+
+ int totalReadBytes = 0;
+
+ for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
+ SocketChannel ch = session.getChannel();
+
+ try
+ {
+ buf.clear();
+
+ int readBytes = 0;
+ int ret;
+
+ try
+ {
+ while ((ret = ch.read(buf.buf())) > 0)
+ {
+ readBytes += ret;
+ totalReadBytes += ret;
+ }
+ }
+ finally
+ {
+ buf.flip();
+ }
+
+
+ if (readBytes > 0)
+ {
+ session.increaseReadBytes(readBytes);
+
+ session.getFilterChain().fireMessageReceived(session, buf);
+ buf = null;
+ }
+
+ if (ret <= 0)
+ {
+ if (ret == 0)
+ {
+ if (readBytes == session.getReadBufferSize())
+ {
+ continue;
+ }
+ }
+ else
+ {
+ scheduleRemove(session);
+ }
+
+ break;
+ }
+ }
+ catch (Throwable e)
+ {
+ if (e instanceof IOException)
+ {
+ scheduleRemove(session);
+ }
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ finally
+ {
+ if (buf != null)
+ {
+ buf.release();
+ }
+ }
+ }//for
+
+ // if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes);
+ }
+ }
+
+
+ private void notifyReadIdleness()
+ {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastIdleReadCheckTime) >= 1000)
+ {
+ lastIdleReadCheckTime = currentTime;
+ Set keys = selector.keys();
+ if( keys != null )
+ {
+ for( Iterator it = keys.iterator(); it.hasNext(); )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+ notifyReadIdleness(session, currentTime);
+ }
+ }
+ }
+ }
+
+ private void notifyWriteIdleness()
+ {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastIdleWriteCheckTime) >= 1000)
+ {
+ lastIdleWriteCheckTime = currentTime;
+ Set keys = writeSelector.keys();
+ if (keys != null)
+ {
+ for (Iterator it = keys.iterator(); it.hasNext();)
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+ notifyWriteIdleness(session, currentTime);
+ }
+ }
+ }
+ }
+
+ private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
+ {
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
+ IdleStatus.BOTH_IDLE,
+ Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
+ IdleStatus.READER_IDLE,
+ Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
+
+ notifyWriteTimeout(session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime());
+ }
+
+ private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
+ {
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE,
+ Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
+ IdleStatus.WRITER_IDLE,
+ Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
+
+ notifyWriteTimeout( session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+ }
+
+ private void notifyIdleness0( SocketSessionImpl session, long currentTime,
+ long idleTime, IdleStatus status,
+ long lastIoTime )
+ {
+ if( idleTime > 0 && lastIoTime != 0
+ && ( currentTime - lastIoTime ) >= idleTime )
+ {
+ session.increaseIdleCount( status );
+ session.getFilterChain().fireSessionIdle( session, status );
+ }
+ }
+
+ private void notifyWriteTimeout( SocketSessionImpl session,
+ long currentTime,
+ long writeTimeout, long lastIoTime )
+ {
+
+ MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
+ SelectionKey key = sesh.getWriteSelectionKey();
+
+ synchronized(writeLock)
+ {
+ if( writeTimeout > 0
+ && ( currentTime - lastIoTime ) >= writeTimeout
+ && key != null && key.isValid()
+ && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
+ {
+ session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() );
+ }
+ }
+ }
+
+ private SocketSessionImpl getNextFlushingSession()
+ {
+ return (SocketSessionImpl) flushingSessions.poll();
+ }
+
+ private void releaseSession(SocketSessionImpl session)
+ {
+ synchronized(session.getWriteRequestQueue())
+ {
+ synchronized(flushingSessionsSet)
+ {
+ if (session.getScheduledWriteRequests() > 0)
+ {
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session));
+ }
+ flushingSessions.offer(session);
+ }
+ else
+ {
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session));
+ }
+ flushingSessionsSet.remove(session);
+ }
+ }
+ }
+ }
+
+ private void releaseWriteBuffers(SocketSessionImpl session)
+ {
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ WriteRequest req;
+
+ //Should this be synchronized?
+ synchronized(writeRequestQueue)
+ {
+ while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
+ {
+ try
+ {
+ ((ByteBuffer) req.getMessage()).release();
+ }
+ catch (IllegalStateException e)
+ {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ finally
+ {
+ req.getFuture().setWritten(false);
+ }
+ }
+ }
+ }
+
+ private void doFlush()
+ {
+ MultiThreadSocketSessionImpl session;
+
+ while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
+ {
+ if( !session.isConnected() )
+ {
+ releaseWriteBuffers( session );
+ releaseSession(session);
+ continue;
+ }
+
+ SelectionKey key = session.getWriteSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.write() is called before addSession() is processed)
+ if( key == null )
+ {
+ scheduleFlush( session );
+ releaseSession(session);
+ continue;
+ }
+ // skip if channel is already closed
+ if( !key.isValid() )
+ {
+ releaseSession(session);
+ continue;
+ }
+
+ try
+ {
+ if (doFlush(session))
+ {
+ releaseSession(session);
+ }
+ }
+ catch( IOException e )
+ {
+ releaseSession(session);
+ scheduleRemove( session );
+ session.getFilterChain().fireExceptionCaught( session, e );
+ }
+
+ }
+
+ }
+
+ private boolean doFlush(SocketSessionImpl sessionParam) throws IOException
+ {
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
+ // Clear OP_WRITE
+ SelectionKey key = session.getWriteSelectionKey();
+ synchronized(writeLock)
+ {
+ key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+ }
+ SocketChannel ch = session.getChannel();
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+
+ long totalFlushedBytes = 0;
+ for( ; ; )
+ {
+ WriteRequest req;
+
+ synchronized( writeRequestQueue )
+ {
+ req = ( WriteRequest ) writeRequestQueue.first();
+ }
+
+ if( req == null )
+ {
+ break;
+ }
+
+ ByteBuffer buf = ( ByteBuffer ) req.getMessage();
+ if( buf.remaining() == 0 )
+ {
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenMessages();
+
+ buf.reset();
+ session.getFilterChain().fireMessageSent( session, req );
+ continue;
+ }
+
+
+ int writtenBytes = 0;
+
+ // Reported as DIRMINA-362
+ //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
+// if (key.isWritable())
+ {
+ try
+ {
+ writtenBytes = ch.write(buf.buf());
+ totalFlushedBytes += writtenBytes;
+ }
+ catch (IOException ioe)
+ {
+ throw ioe;
+ }
+ }
+
+ if( writtenBytes > 0 )
+ {
+ session.increaseWrittenBytes( writtenBytes );
+ }
+
+ if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
+ {
+ // Kernel buffer is full
+ synchronized (writeLock)
+ {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ }
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
+ }
+ return false;
+ }
+ }
+
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
+ }
+ return true;
+ }
+
+ private void doUpdateTrafficMask()
+ {
+ if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked())
+ {
+ return;
+ }
+
+ // Synchronize over entire operation as this method should be called
+ // from both read and write thread and we don't want the order of the
+ // updates to get changed.
+ trafficMaskUpdateLock.lock();
+ try
+ {
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop();
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SelectionKey key = session.getReadSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ if (key == null)
+ {
+ scheduleTrafficControl(session);
+ break;
+ }
+ // skip if channel is already closed
+ if (!key.isValid())
+ {
+ continue;
+ }
+
+ // The normal is OP_READ and, if there are write requests in the
+ // session's write queue, set OP_WRITE to trigger flushing.
+
+ //Sset to Read and Write if there is nothing then the cost
+ // is one loop through the flusher.
+ int ops = SelectionKey.OP_READ;
+
+ // Now mask the preferred ops with the mask of the current session
+ int mask = session.getTrafficMask().getInterestOps();
+ synchronized (readLock)
+ {
+ key.interestOps(ops & mask);
+ }
+ //Change key to the WriteSelection Key
+ key = session.getWriteSelectionKey();
+ if (key != null && key.isValid())
+ {
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ synchronized (writeRequestQueue)
+ {
+ if (!writeRequestQueue.isEmpty())
+ {
+ ops = SelectionKey.OP_WRITE;
+ synchronized (writeLock)
+ {
+ key.interestOps(ops & mask);
+ }
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ trafficMaskUpdateLock.unlock();
+ }
+
+ }
+
+ private class WriteWorker implements Runnable
+ {
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer");
+
+ //System.out.println("WriteDebug:"+"Startup");
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = writeSelector.select(SELECTOR_TIMEOUT);
+
+ doAddNewWrite();
+ doUpdateTrafficMask();
+
+ if (nKeys > 0)
+ {
+ //System.out.println("WriteDebug:"+nKeys + " keys from writeselector");
+ processWrite(writeSelector.selectedKeys());
+ }
+ else
+ {
+ //System.out.println("WriteDebug:"+"No keys from writeselector");
+ }
+
+ doRemove();
+ notifyWriteIdleness();
+
+ if (flushingSessionsSet.size() > 0)
+ {
+ doFlush();
+ }
+
+ if (writeSelector.keys().isEmpty())
+ {
+ synchronized(writeLock)
+ {
+
+ if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
+ {
+ writeWorker = null;
+ try
+ {
+ writeSelector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ writeSelector = null;
+ }
+
+ break;
+ }
+ }
+ }
+
+ }
+ catch (Throwable t)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ //System.out.println("WriteDebug:"+"Shutdown");
+ }
+
+ }
+
+ private class ReadWorker implements Runnable
+ {
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
+
+ //System.out.println("ReadDebug:"+"Startup");
+ for( ; ; )
+ {
+ try
+ {
+ int nKeys = selector.select(SELECTOR_TIMEOUT);
+
+ doAddNewReader();
+ doUpdateTrafficMask();
+
+ if( nKeys > 0 )
+ {
+ //System.out.println("ReadDebug:"+nKeys + " keys from selector");
+
+ processRead(selector.selectedKeys());
+ }
+ else
+ {
+ //System.out.println("ReadDebug:"+"No keys from selector");
+ }
+
+
+ doRemove();
+ notifyReadIdleness();
+
+ if( selector.keys().isEmpty() )
+ {
+
+ synchronized(readLock)
+ {
+ if( selector.keys().isEmpty() && newSessions.isEmpty() )
+ {
+ readWorker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+
+ break;
+ }
+ }
+ }
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e1 );
+ }
+ }
+ }
+ //System.out.println("ReadDebug:"+"Shutdown");
+ }
+
+ }
+}
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
new file mode 100644
index 0000000000..003df5e73c
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * An {@link IoConnectorConfig} for {@link SocketConnector}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class MultiThreadSocketSessionConfigImpl extends org.apache.mina.transport.socket.nio.SocketSessionConfigImpl
+{
+ private static boolean SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false;
+ private static boolean SET_SEND_BUFFER_SIZE_AVAILABLE = false;
+ private static boolean GET_TRAFFIC_CLASS_AVAILABLE = false;
+ private static boolean SET_TRAFFIC_CLASS_AVAILABLE = false;
+
+ private static boolean DEFAULT_REUSE_ADDRESS;
+ private static int DEFAULT_RECEIVE_BUFFER_SIZE;
+ private static int DEFAULT_SEND_BUFFER_SIZE;
+ private static int DEFAULT_TRAFFIC_CLASS;
+ private static boolean DEFAULT_KEEP_ALIVE;
+ private static boolean DEFAULT_OOB_INLINE;
+ private static int DEFAULT_SO_LINGER;
+ private static boolean DEFAULT_TCP_NO_DELAY;
+
+ static
+ {
+ initialize();
+ }
+
+ private static void initialize()
+ {
+ Socket socket = null;
+
+ socket = new Socket();
+
+ try
+ {
+ DEFAULT_REUSE_ADDRESS = socket.getReuseAddress();
+ DEFAULT_RECEIVE_BUFFER_SIZE = socket.getReceiveBufferSize();
+ DEFAULT_SEND_BUFFER_SIZE = socket.getSendBufferSize();
+ DEFAULT_KEEP_ALIVE = socket.getKeepAlive();
+ DEFAULT_OOB_INLINE = socket.getOOBInline();
+ DEFAULT_SO_LINGER = socket.getSoLinger();
+ DEFAULT_TCP_NO_DELAY = socket.getTcpNoDelay();
+
+ // Check if setReceiveBufferSize is supported.
+ try
+ {
+ socket.setReceiveBufferSize(DEFAULT_RECEIVE_BUFFER_SIZE);
+ SET_RECEIVE_BUFFER_SIZE_AVAILABLE = true;
+ }
+ catch( SocketException e )
+ {
+ SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false;
+ }
+
+ // Check if setSendBufferSize is supported.
+ try
+ {
+ socket.setSendBufferSize(DEFAULT_SEND_BUFFER_SIZE);
+ SET_SEND_BUFFER_SIZE_AVAILABLE = true;
+ }
+ catch( SocketException e )
+ {
+ SET_SEND_BUFFER_SIZE_AVAILABLE = false;
+ }
+
+ // Check if getTrafficClass is supported.
+ try
+ {
+ DEFAULT_TRAFFIC_CLASS = socket.getTrafficClass();
+ GET_TRAFFIC_CLASS_AVAILABLE = true;
+ }
+ catch( SocketException e )
+ {
+ GET_TRAFFIC_CLASS_AVAILABLE = false;
+ DEFAULT_TRAFFIC_CLASS = 0;
+ }
+ }
+ catch( SocketException e )
+ {
+ throw new ExceptionInInitializerError(e);
+ }
+ finally
+ {
+ if( socket != null )
+ {
+ try
+ {
+ socket.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ public static boolean isSetReceiveBufferSizeAvailable() {
+ return SET_RECEIVE_BUFFER_SIZE_AVAILABLE;
+ }
+
+ public static boolean isSetSendBufferSizeAvailable() {
+ return SET_SEND_BUFFER_SIZE_AVAILABLE;
+ }
+
+ public static boolean isGetTrafficClassAvailable() {
+ return GET_TRAFFIC_CLASS_AVAILABLE;
+ }
+
+ public static boolean isSetTrafficClassAvailable() {
+ return SET_TRAFFIC_CLASS_AVAILABLE;
+ }
+
+ private boolean reuseAddress = DEFAULT_REUSE_ADDRESS;
+ private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
+ private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+ private int trafficClass = DEFAULT_TRAFFIC_CLASS;
+ private boolean keepAlive = DEFAULT_KEEP_ALIVE;
+ private boolean oobInline = DEFAULT_OOB_INLINE;
+ private int soLinger = DEFAULT_SO_LINGER;
+ private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
+
+ /**
+ * Creates a new instance.
+ */
+ MultiThreadSocketSessionConfigImpl()
+ {
+ }
+
+ public boolean isReuseAddress()
+ {
+ return reuseAddress;
+ }
+
+ public void setReuseAddress( boolean reuseAddress )
+ {
+ this.reuseAddress = reuseAddress;
+ }
+
+ public int getReceiveBufferSize()
+ {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize( int receiveBufferSize )
+ {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public int getSendBufferSize()
+ {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize( int sendBufferSize )
+ {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public int getTrafficClass()
+ {
+ return trafficClass;
+ }
+
+ public void setTrafficClass( int trafficClass )
+ {
+ this.trafficClass = trafficClass;
+ }
+
+ public boolean isKeepAlive()
+ {
+ return keepAlive;
+ }
+
+ public void setKeepAlive( boolean keepAlive )
+ {
+ this.keepAlive = keepAlive;
+ }
+
+ public boolean isOobInline()
+ {
+ return oobInline;
+ }
+
+ public void setOobInline( boolean oobInline )
+ {
+ this.oobInline = oobInline;
+ }
+
+ public int getSoLinger()
+ {
+ return soLinger;
+ }
+
+ public void setSoLinger( int soLinger )
+ {
+ this.soLinger = soLinger;
+ }
+
+ public boolean isTcpNoDelay()
+ {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay( boolean tcpNoDelay )
+ {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+
+}
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
new file mode 100644
index 0000000000..ee5c24b3ab
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.common.support.IoServiceListenerSupport;
+import org.apache.mina.util.Queue;
+
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An {@link IoSession} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+class MultiThreadSocketSessionImpl extends SocketSessionImpl
+{
+ private final IoService manager;
+ private final IoServiceConfig serviceConfig;
+ private final SocketSessionConfig config = new SessionConfigImpl();
+ private final MultiThreadSocketIoProcessor ioProcessor;
+ private final MultiThreadSocketFilterChain filterChain;
+ private final SocketChannel ch;
+ private final Queue writeRequestQueue;
+ private final IoHandler handler;
+ private final SocketAddress remoteAddress;
+ private final SocketAddress localAddress;
+ private final SocketAddress serviceAddress;
+ private final IoServiceListenerSupport serviceListeners;
+ private SelectionKey readKey, writeKey;
+ private int readBufferSize;
+ private CountDownLatch registeredReadyLatch = new CountDownLatch(2);
+ private AtomicBoolean created = new AtomicBoolean(false);
+
+ /**
+ * Creates a new instance.
+ */
+ MultiThreadSocketSessionImpl( IoService manager,
+ SocketIoProcessor ioProcessor,
+ IoServiceListenerSupport listeners,
+ IoServiceConfig serviceConfig,
+ SocketChannel ch,
+ IoHandler defaultHandler,
+ SocketAddress serviceAddress )
+ {
+ super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress);
+ this.manager = manager;
+ this.serviceListeners = listeners;
+ this.ioProcessor = (MultiThreadSocketIoProcessor) ioProcessor;
+ this.filterChain = new MultiThreadSocketFilterChain(this);
+ this.ch = ch;
+ this.writeRequestQueue = new Queue();
+ this.handler = defaultHandler;
+ this.remoteAddress = ch.socket().getRemoteSocketAddress();
+ this.localAddress = ch.socket().getLocalSocketAddress();
+ this.serviceAddress = serviceAddress;
+ this.serviceConfig = serviceConfig;
+
+ // Apply the initial session settings
+ IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
+ if( sessionConfig instanceof SocketSessionConfig )
+ {
+ SocketSessionConfig cfg = ( SocketSessionConfig ) sessionConfig;
+ this.config.setKeepAlive( cfg.isKeepAlive() );
+ this.config.setOobInline( cfg.isOobInline() );
+ this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() );
+ this.readBufferSize = cfg.getReceiveBufferSize();
+ this.config.setReuseAddress( cfg.isReuseAddress() );
+ this.config.setSendBufferSize( cfg.getSendBufferSize() );
+ this.config.setSoLinger( cfg.getSoLinger() );
+ this.config.setTcpNoDelay( cfg.isTcpNoDelay() );
+
+ if( this.config.getTrafficClass() != cfg.getTrafficClass() )
+ {
+ this.config.setTrafficClass( cfg.getTrafficClass() );
+ }
+ }
+ }
+
+ void awaitRegistration() throws InterruptedException
+ {
+ registeredReadyLatch.countDown();
+
+ registeredReadyLatch.await();
+ }
+
+ boolean created() throws InterruptedException
+ {
+ return created.get();
+ }
+
+ void doneCreation()
+ {
+ created.getAndSet(true);
+ }
+
+ public IoService getService()
+ {
+ return manager;
+ }
+
+ public IoServiceConfig getServiceConfig()
+ {
+ return serviceConfig;
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return config;
+ }
+
+ SocketIoProcessor getIoProcessor()
+ {
+ return ioProcessor;
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filterChain;
+ }
+
+ SocketChannel getChannel()
+ {
+ return ch;
+ }
+
+ IoServiceListenerSupport getServiceListeners()
+ {
+ return serviceListeners;
+ }
+
+ SelectionKey getSelectionKey()
+ {
+ return readKey;
+ }
+
+ SelectionKey getReadSelectionKey()
+ {
+ return readKey;
+ }
+
+ SelectionKey getWriteSelectionKey()
+ {
+ return writeKey;
+ }
+
+ void setSelectionKey(SelectionKey key)
+ {
+ this.readKey = key;
+ }
+
+ void setWriteSelectionKey(SelectionKey key)
+ {
+ this.writeKey = key;
+ }
+
+ public IoHandler getHandler()
+ {
+ return handler;
+ }
+
+ protected void close0()
+ {
+ filterChain.fireFilterClose( this );
+ }
+
+ Queue getWriteRequestQueue()
+ {
+ return writeRequestQueue;
+ }
+
+ /**
+ @return int Number of write scheduled write requests
+ @deprecated
+ */
+ public int getScheduledWriteMessages()
+ {
+ return getScheduledWriteRequests();
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ synchronized( writeRequestQueue )
+ {
+ return writeRequestQueue.size();
+ }
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ synchronized( writeRequestQueue )
+ {
+ return writeRequestQueue.byteSize();
+ }
+ }
+
+ protected void write0( WriteRequest writeRequest )
+ {
+ filterChain.fireFilterWrite( this, writeRequest );
+ }
+
+ public TransportType getTransportType()
+ {
+ return TransportType.SOCKET;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ //This is what I had previously
+// return ch.socket().getRemoteSocketAddress();
+ return remoteAddress;
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ //This is what I had previously
+// return ch.socket().getLocalSocketAddress();
+ return localAddress;
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return serviceAddress;
+ }
+
+ protected void updateTrafficMask()
+ {
+ this.ioProcessor.updateTrafficMask( this );
+ }
+
+ int getReadBufferSize()
+ {
+ return readBufferSize;
+ }
+
+ private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
+ {
+ public boolean isKeepAlive()
+ {
+ try
+ {
+ return ch.socket().getKeepAlive();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setKeepAlive( boolean on )
+ {
+ try
+ {
+ ch.socket().setKeepAlive( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public boolean isOobInline()
+ {
+ try
+ {
+ return ch.socket().getOOBInline();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setOobInline( boolean on )
+ {
+ try
+ {
+ ch.socket().setOOBInline( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public boolean isReuseAddress()
+ {
+ try
+ {
+ return ch.socket().getReuseAddress();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setReuseAddress( boolean on )
+ {
+ try
+ {
+ ch.socket().setReuseAddress( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public int getSoLinger()
+ {
+ try
+ {
+ return ch.socket().getSoLinger();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setSoLinger( int linger )
+ {
+ try
+ {
+ if( linger < 0 )
+ {
+ ch.socket().setSoLinger( false, 0 );
+ }
+ else
+ {
+ ch.socket().setSoLinger( true, linger );
+ }
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public boolean isTcpNoDelay()
+ {
+ try
+ {
+ return ch.socket().getTcpNoDelay();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setTcpNoDelay( boolean on )
+ {
+ try
+ {
+ ch.socket().setTcpNoDelay( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public int getTrafficClass()
+ {
+ if( SocketSessionConfigImpl.isGetTrafficClassAvailable() )
+ {
+ try
+ {
+ return ch.socket().getTrafficClass();
+ }
+ catch( SocketException e )
+ {
+ // Throw an exception only when setTrafficClass is also available.
+ if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+
+ return 0;
+ }
+
+ public void setTrafficClass( int tc )
+ {
+ if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
+ {
+ try
+ {
+ ch.socket().setTrafficClass( tc );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+
+ public int getSendBufferSize()
+ {
+ try
+ {
+ return ch.socket().getSendBufferSize();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setSendBufferSize( int size )
+ {
+ if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
+ {
+ try
+ {
+ ch.socket().setSendBufferSize( size );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+
+ public int getReceiveBufferSize()
+ {
+ try
+ {
+ return ch.socket().getReceiveBufferSize();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setReceiveBufferSize( int size )
+ {
+ if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
+ {
+ try
+ {
+ ch.socket().setReceiveBufferSize( size );
+ MultiThreadSocketSessionImpl.this.readBufferSize = size;
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+ }
+}