diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-11-26 14:16:01 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-11-26 14:16:01 +0000 |
commit | 6ed85130d3560f7f39352fb164735721b11aa0be (patch) | |
tree | 7b363e9e6a5efe2839b142c555b05ddf48de0769 | |
parent | 2e2693126d0b56d182221232ffdf5a75ddda01d1 (diff) | |
download | qpid-python-6ed85130d3560f7f39352fb164735721b11aa0be.tar.gz |
QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1@598285 13f79535-47bb-0310-9956-ffa450edef68
17 files changed, 3550 insertions, 79 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 2257a612b3..b3b6a2877f 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -33,6 +33,7 @@ <keystorePassword>keystorepass</keystorePassword> </ssl>--> <qpidnio>true</qpidnio> + <protectio>true</protectio> <transport>nio</transport> <port>5672</port> <sslport>8672</sslport> @@ -174,3 +175,4 @@ </broker> + diff --git a/java/broker/pom.xml b/java/broker/pom.xml index 9e38751d62..dbc3b2aea9 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -6,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,16 +49,16 @@ <artifactId>log4j</artifactId> </dependency> - <dependency> + <dependency> <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>1.4.0</version> + <artifactId>slf4j-api</artifactId> + <version>1.4.0</version> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.4.0</version> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.4.0</version> </dependency> <dependency> @@ -208,7 +208,7 @@ <isset property="skip.python.tests"/> </condition> - <property name="command" + <property name="command" value="python run-tests -v -I java_failing.txt -b localhost:2100"/> <!--value="bash -c 'python run-tests -v -I java_failing.txt'"/>--> diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index a87c704cf8..ab9f40b31d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -436,7 +436,7 @@ public class Main } } - //fixme qpid.AMQP should be using qpidproperties to get value + //fixme qpid.AMQP should be using qpidproperties to get value _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 476e608b01..c5ec28e626 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,10 +23,15 @@ package org.apache.qpid.server.protocol; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; +import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.SessionUtil; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; @@ -51,7 +56,6 @@ import java.net.InetSocketAddress; * * We delegate all frame (message) processing to the AMQProtocolSession which wraps * the state for the connection. - * */ public class AMQPFastProtocolHandler extends IoHandlerAdapter { @@ -115,11 +119,41 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } } + + if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false)) + { + try + { +// //Add IO Protection Filters + IoFilterChain chain = protocolSession.getFilterChain(); + + int buf_size = 32768; + if (protocolSession.getConfig() instanceof SocketSessionConfig) + { + buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize(); + } + + protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.attach(chain); + + protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + _logger.info("Using IO Read/Write Filter Protection"); + } + catch (Exception e) + { + _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); + } + } } - /** - * Separated into its own, protected, method to allow easier reuse - */ + /** Separated into its own, protected, method to allow easier reuse */ protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException { new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec); @@ -193,8 +227,10 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter /** * Invoked when a message is received on a particular protocol session. Note that a * protocol session is directly tied to a particular physical connection. + * * @param protocolSession the protocol session that received the message - * @param message the message itself (i.e. a decoded frame) + * @param message the message itself (i.e. a decoded frame) + * * @throws Exception if the message cannot be processed */ public void messageReceived(IoSession protocolSession, Object message) throws Exception @@ -204,7 +240,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter if (message instanceof AMQDataBlock) { amqProtocolSession.dataBlockReceived((AMQDataBlock) message); - + } else if (message instanceof ByteBuffer) { @@ -218,9 +254,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter /** * Called after a message has been sent out on a particular protocol session + * * @param protocolSession the protocol session (i.e. connection) on which this - * message was sent - * @param object the message (frame) that was encoded and sent + * message was sent + * @param object the message (frame) that was encoded and sent + * * @throws Exception if we want to indicate an error */ public void messageSent(IoSession protocolSession, Object object) throws Exception diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java index a4ed859fa7..1dcbb02d5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,9 +23,12 @@ package org.apache.qpid.server.transport; import org.apache.mina.common.IoAcceptor; import org.apache.mina.util.NewThreadExecutor; import org.apache.qpid.configuration.Configured; +import org.apache.log4j.Logger; public class ConnectorConfiguration { + private static final Logger _logger = Logger.getLogger(ConnectorConfiguration.class); + public static final String DEFAULT_PORT = "5672"; public static final String SSL_PORT = "8672"; @@ -41,7 +44,7 @@ public class ConnectorConfiguration @Configured(path = "connector.bind", defaultValue = "wildcard") public String bindAddress; - + @Configured(path = "connector.socketReceiveBuffer", defaultValue = "32767") public int socketReceiveBufferSize; @@ -69,29 +72,43 @@ public class ConnectorConfiguration @Configured(path = "connector.ssl.enabled", defaultValue = "false") public boolean enableSSL; - + @Configured(path = "connector.ssl.sslOnly", - defaultValue = "true") + defaultValue = "true") public boolean sslOnly; - + @Configured(path = "connector.ssl.port", - defaultValue = SSL_PORT) - public int sslPort; - + defaultValue = SSL_PORT) + public int sslPort; + @Configured(path = "connector.ssl.keystorePath", - defaultValue = "none") + defaultValue = "none") public String keystorePath; - + @Configured(path = "connector.ssl.keystorePassword", - defaultValue = "none") + defaultValue = "none") public String keystorePassword; - + @Configured(path = "connector.ssl.certType", - defaultValue = "SunX509") + defaultValue = "SunX509") public String certType; + @Configured(path = "connector.qpidnio", + defaultValue = "true") + public boolean _multiThreadNIO; + + public IoAcceptor createAcceptor() { - return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor()); + if (_multiThreadNIO) + { + _logger.warn("Using Qpid Multithreaded IO Processing"); + return new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(processors, new NewThreadExecutor()); + } + else + { + _logger.warn("Using Mina IO Processing"); + return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor()); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 19142067cb..dd27f07a9a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -21,12 +21,16 @@ package org.apache.qpid.client.protocol; import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; +import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolCodecFilter; - +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -216,6 +220,36 @@ public class AMQProtocolHandler extends IoHandlerAdapter e.printStackTrace(); } + if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio")) + { + try + { + //Add IO Protection Filters + IoFilterChain chain = session.getFilterChain(); + + int buf_size = 32768; + if (session.getConfig() instanceof SocketSessionConfig) + { + buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize(); + } + session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.attach(chain); + session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + + _logger.info("Using IO Read/Write Filter Protection"); + } + catch (Exception e) + { + _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); + } + } _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 1d0d6a3491..040b5f7b68 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -23,14 +23,13 @@ package org.apache.qpid.client.transport; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; - import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,40 +89,41 @@ public class TransportConnection switch (transport) { - case TCP: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + case TCP: + _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() { - public IoConnector newSocketConnector() + SocketConnector result; + // FIXME - this needs to be sorted to use the new Mina MultiThread SA. + if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio")) { - SocketConnector result; - // FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (Boolean.getBoolean("qpidnio")) - { - _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set."); - // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector - } - // else - - { - _logger.info("Using Mina NIO"); - result = new SocketConnector(); // non-blocking connector - } - - // Don't have the connector's worker thread wait around for other connections (we only use - // one SocketConnector per connection at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - result.setWorkerTimeout(0); - - return result; + _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") + ? "Qpid NIO is new default" + : "Sysproperty 'qpidnio' is set")); + result = new MultiThreadSocketConnector(); + } + else + { + _logger.info("Using Mina NIO"); + result = new SocketConnector(); // non-blocking connector } - }); - break; - case VM: - { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - break; - } + // Don't have the connector's worker thread wait around for other connections (we only use + // one SocketConnector per connection at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + result.setWorkerTimeout(0); + + return result; + } + }); + break; + + case VM: + { + _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + break; + } } return _instance; @@ -145,7 +145,7 @@ public class TransportConnection } private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) - throws AMQVMBrokerCreationException + throws AMQVMBrokerCreationException { int port = details.getPort(); @@ -160,7 +160,7 @@ public class TransportConnection else { throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port - + " does not exist. Auto create disabled.", null); + + " does not exist. Auto create disabled.", null); } } } @@ -257,8 +257,8 @@ public class TransportConnection IoHandlerAdapter provider; try { - Class[] cnstr = { Integer.class }; - Object[] params = { port }; + Class[] cnstr = {Integer.class}; + Object[] params = {port}; provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); @@ -277,7 +277,7 @@ public class TransportConnection } AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); amqbce.initCause(e); throw amqbce; } diff --git a/java/common/pom.xml b/java/common/pom.xml index c4352a2a8c..4c94e3d1b5 100644 --- a/java/common/pom.xml +++ b/java/common/pom.xml @@ -6,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -45,7 +45,7 @@ </properties> <build> - <plugins> + <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-antrun-plugin</artifactId> @@ -114,14 +114,14 @@ <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> - <version>1.4.0</version> + <version>1.4.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.4.0</version> - <scope>test</scope> + <scope>test</scope> </dependency> <dependency> diff --git a/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java b/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java new file mode 100644 index 0000000000..47f19aa76d --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java @@ -0,0 +1,48 @@ +package org.apache.mina.filter; + +import org.apache.mina.common.IoFilter;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +public class WriteBufferFullExeception extends RuntimeException +{ + private IoFilter.WriteRequest _writeRequest; + + public WriteBufferFullExeception() + { + this(null); + } + + public WriteBufferFullExeception(IoFilter.WriteRequest writeRequest) + { + _writeRequest = writeRequest; + } + + + public void setWriteRequest(IoFilter.WriteRequest writeRequest) + { + _writeRequest = writeRequest; + } + + public IoFilter.WriteRequest getWriteRequest() + { + return _writeRequest; + } +} diff --git a/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java b/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java new file mode 100644 index 0000000000..d063200cf6 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.DefaultIoFilterChainBuilder; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.executor.ExecutorFilter; + +import java.util.Iterator; +import java.util.List; + +/** + * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than + * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the + * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to + * cause an Out of Memory exception due to a back log of unprocessed messages. + * + * This is should only be viewed as a temporary work around for DIRMINA-302. + * + * A true solution should not be implemented as a filter as this issue will always occur. On a machine + * where the network is slower than the local producer. + * + * Suggested improvement is to allow implementation of policices on what to do when buffer is full. + * + * They could be: + * Block - As this does + * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks + * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state + * + * <p/> + * <p>Usage: + * <p/> + * <pre><code> + * DefaultFilterChainBuilder builder = ... + * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder(); + * filter.attach( builder ); + * </code></pre> + * <p/> + * or + * <p/> + * <pre><code> + * IoFilterChain chain = ... + * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder(); + * filter.attach( chain ); + * </code></pre> + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class WriteBufferLimitFilterBuilder +{ + public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize"; + + private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000; + + private volatile boolean throwNotBlock = false; + + private volatile int maximumConnectionBufferCount; + private volatile long maximumConnectionBufferSize; + + private final Object _blockLock = new Object(); + + private int _blockWaiters = 0; + + + public WriteBufferLimitFilterBuilder() + { + this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT); + } + + public WriteBufferLimitFilterBuilder(int maxWriteBufferSize) + { + setMaximumConnectionBufferCount(maxWriteBufferSize); + } + + + /** + * Set the maximum amount pending items in the writeQueue for a given session. + * Changing the value will only take effect when new data is received for a + * connection, including existing connections. Default value is 5000 msgs. + * + * @param maximumConnectionBufferCount New buffer size. Must be > 0 + */ + public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount) + { + this.maximumConnectionBufferCount = maximumConnectionBufferCount; + this.maximumConnectionBufferSize = 0; + } + + public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize) + { + this.maximumConnectionBufferSize = maximumConnectionBufferSize; + this.maximumConnectionBufferCount = 0; + } + + /** + * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself + * before and after that filter. + * + * @param chain {@link IoFilterChain} to attach self to. + */ + public void attach(IoFilterChain chain) + { + String name = getThreadPoolFilterEntryName(chain.getAll()); + + chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit()); + } + + /** + * Attach this filter to the specified builder. It will search for the + * {@link ExecutorFilter}, and attach itself before and after that filter. + * + * @param builder {@link DefaultIoFilterChainBuilder} to attach self to. + */ + public void attach(DefaultIoFilterChainBuilder builder) + { + String name = getThreadPoolFilterEntryName(builder.getAll()); + + builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit()); + } + + private String getThreadPoolFilterEntryName(List entries) + { + Iterator i = entries.iterator(); + + while (i.hasNext()) + { + IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next(); + + if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class)) + { + return entry.getName(); + } + } + + throw new IllegalStateException("Chain does not contain a ExecutorFilter"); + } + + + public class SendLimit extends IoFilterAdapter + { + public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception + { + try + { + waitTillSendAllowed(session); + } + catch (WriteBufferFullExeception wbfe) + { + nextFilter.exceptionCaught(session, wbfe); + } + + if (writeRequest.getMessage() instanceof ByteBuffer) + { + increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage()); + } + + nextFilter.filterWrite(session, writeRequest); + } + + private void increasePendingWriteSize(IoSession session, ByteBuffer message) + { + synchronized (session) + { + Long pendingSize = getScheduledWriteBytes(session) + message.remaining(); + session.setAttribute(PENDING_SIZE, pendingSize); + } + } + + private boolean sendAllowed(IoSession session) + { + if (session.isClosing()) + { + return true; + } + + int lmswm = maximumConnectionBufferCount; + long lmswb = maximumConnectionBufferSize; + + return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm) + && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb); + } + + private long getScheduledWriteBytes(IoSession session) + { + synchronized (session) + { + Long i = (Long) session.getAttribute(PENDING_SIZE); + return null == i ? 0 : i; + } + } + + private void waitTillSendAllowed(IoSession session) + { + synchronized (_blockLock) + { + if (throwNotBlock) + { + throw new WriteBufferFullExeception(); + } + + _blockWaiters++; + + while (!sendAllowed(session)) + { + try + { + _blockLock.wait(); + } + catch (InterruptedException e) + { + // Ignore. + } + } + _blockWaiters--; + } + } + + public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception + { + if (message instanceof ByteBuffer) + { + decrementPendingWriteSize(session, (ByteBuffer) message); + } + notifyWaitingWriters(); + nextFilter.messageSent(session, message); + } + + private void decrementPendingWriteSize(IoSession session, ByteBuffer message) + { + synchronized (session) + { + session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining()); + } + } + + private void notifyWaitingWriters() + { + synchronized (_blockLock) + { + if (_blockWaiters != 0) + { + _blockLock.notifyAll(); + } + } + + } + + }//SentLimit + + +} diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java new file mode 100644 index 0000000000..2ab3ddb50e --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.codec; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; + +/** + * A {@link ProtocolDecoder} that cumulates the content of received + * buffers to a <em>cumulative buffer</em> to help users implement decoders. + * <p> + * If the received {@link ByteBuffer} is only a part of a message. + * decoders should cumulate received buffers to make a message complete or + * to postpone decoding until more buffers arrive. + * <p> + * Here is an example decoder that decodes CRLF terminated lines into + * <code>Command</code> objects: + * <pre> + * public class CRLFTerminatedCommandLineDecoder + * extends CumulativeProtocolDecoder { + * + * private Command parseCommand(ByteBuffer in) { + * // Convert the bytes in the specified buffer to a + * // Command object. + * ... + * } + * + * protected boolean doDecode(IoSession session, ByteBuffer in, + * ProtocolDecoderOutput out) + * throws Exception { + * + * // Remember the initial position. + * int start = in.position(); + * + * // Now find the first CRLF in the buffer. + * byte previous = 0; + * while (in.hasRemaining()) { + * byte current = in.get(); + * + * if (previous == '\r' && current == '\n') { + * // Remember the current position and limit. + * int position = in.position(); + * int limit = in.limit(); + * try { + * in.position(start); + * in.limit(position); + * // The bytes between in.position() and in.limit() + * // now contain a full CRLF terminated line. + * out.write(parseCommand(in.slice())); + * } finally { + * // Set the position to point right after the + * // detected line and set the limit to the old + * // one. + * in.position(position); + * in.limit(limit); + * } + * // Decoded one line; CumulativeProtocolDecoder will + * // call me again until I return false. So just + * // return true until there are no more lines in the + * // buffer. + * return true; + * } + * + * previous = current; + * } + * + * // Could not find CRLF in the buffer. Reset the initial + * // position to the one we recorded above. + * in.position(start); + * + * return false; + * } + * } + * </pre> + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter { + + private static final String BUFFER = OurCumulativeProtocolDecoder.class + .getName() + + ".Buffer"; + + /** + * Creates a new instance. + */ + protected OurCumulativeProtocolDecoder() { + } + + /** + * Cumulates content of <tt>in</tt> into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> + * and the cumulative buffer is NOT compacted after decoding ends. + * + * @throws IllegalStateException if your <tt>doDecode()</tt> returned + * <tt>true</tt> not consuming the cumulative buffer. + */ + public void decode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception { + boolean usingSessionBuffer = true; + ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER); + // If we have a session buffer, append data to that; otherwise + // use the buffer read from the network directly. + if (buf != null) { + buf.put(in); + buf.flip(); + } else { + buf = in; + usingSessionBuffer = false; + } + + for (;;) { + int oldPos = buf.position(); + boolean decoded = doDecode(session, buf, out); + if (decoded) { + if (buf.position() == oldPos) { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed."); + } + + if (!buf.hasRemaining()) { + break; + } + } else { + break; + } + } + + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if (buf.hasRemaining()) { + storeRemainingInSession(buf, session); + } else { + if (usingSessionBuffer) + removeSessionBuffer(session); + } + } + + /** + * Implement this method to consume the specified cumulative buffer and + * decode its content into message(s). + * + * @param in the cumulative buffer + * @return <tt>true</tt> if and only if there's more to decode in the buffer + * and you want to have <tt>doDecode</tt> method invoked again. + * Return <tt>false</tt> if remaining data is not enough to decode, + * then this method will be invoked again when more data is cumulated. + * @throws Exception if cannot decode <tt>in</tt>. + */ + protected abstract boolean doDecode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception; + + /** + * Releases the cumulative buffer used by the specified <tt>session</tt>. + * Please don't forget to call <tt>super.dispose( session )</tt> when + * you override this method. + */ + public void dispose(IoSession session) throws Exception { + removeSessionBuffer(session); + } + + private void removeSessionBuffer(IoSession session) { + ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER); + if (buf != null) { + buf.release(); + } + } + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) { + ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity()); + remainingBuf.setAutoExpand(true); + remainingBuf.order(buf.order()); + remainingBuf.put(buf); + session.setAttribute(BUFFER, remainingBuf); + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java new file mode 100644 index 0000000000..b56a649baa --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.support.BaseIoAcceptor; +import org.apache.mina.util.Queue; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.NamePreservingRunnable; +import edu.emory.mathcs.backport.java.util.concurrent.Executor; + +/** + * {@link IoAcceptor} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class MultiThreadSocketAcceptor extends SocketAcceptor +{ + /** + * @noinspection StaticNonFinalField + */ + private static volatile int nextId = 0; + + private final Executor executor; + private final Object lock = new Object(); + private final int id = nextId ++; + private final String threadName = "SocketAcceptor-" + id; + private final Map channels = new HashMap(); + + private final Queue registerQueue = new Queue(); + private final Queue cancelQueue = new Queue(); + + private final MultiThreadSocketIoProcessor[] ioProcessors; + private final int processorCount; + + /** + * @noinspection FieldAccessedSynchronizedAndUnsynchronized + */ + private Selector selector; + private Worker worker; + private int processorDistributor = 0; + + /** + * Create an acceptor with a single processing thread using a NewThreadExecutor + */ + public MultiThreadSocketAcceptor() + { + this( 1, new NewThreadExecutor() ); + } + + /** + * Create an acceptor with the desired number of processing threads + * + * @param processorCount Number of processing threads + * @param executor Executor to use for launching threads + */ + public MultiThreadSocketAcceptor( int processorCount, Executor executor ) + { + if( processorCount < 1 ) + { + throw new IllegalArgumentException( "Must have at least one processor" ); + } + + this.executor = executor; + this.processorCount = processorCount; + ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; + + for( int i = 0; i < processorCount; i++ ) + { + ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor ); + } + } + + + /** + * Binds to the specified <code>address</code> and handles incoming connections with the specified + * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property. + * + * @throws IOException if failed to bind + */ + public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException + { + if( handler == null ) + { + throw new NullPointerException( "handler" ); + } + + if( address != null && !( address instanceof InetSocketAddress ) ) + { + throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() ); + } + + if( config == null ) + { + config = getDefaultConfig(); + } + + RegistrationRequest request = new RegistrationRequest( address, handler, config ); + + synchronized( registerQueue ) + { + registerQueue.push( request ); + } + + startupWorker(); + + selector.wakeup(); + + synchronized( request ) + { + while( !request.done ) + { + try + { + request.wait(); + } + catch( InterruptedException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + + if( request.exception != null ) + { + throw request.exception; + } + } + + + private synchronized void startupWorker() throws IOException + { + synchronized( lock ) + { + if( worker == null ) + { + selector = Selector.open(); + worker = new Worker(); + + executor.execute( new NamePreservingRunnable( worker ) ); + } + } + } + + public void unbind( SocketAddress address ) + { + if( address == null ) + { + throw new NullPointerException( "address" ); + } + + CancellationRequest request = new CancellationRequest( address ); + + try + { + startupWorker(); + } + catch( IOException e ) + { + // IOException is thrown only when Worker thread is not + // running and failed to open a selector. We simply throw + // IllegalArgumentException here because we can simply + // conclude that nothing is bound to the selector. + throw new IllegalArgumentException( "Address not bound: " + address ); + } + + synchronized( cancelQueue ) + { + cancelQueue.push( request ); + } + + selector.wakeup(); + + synchronized( request ) + { + while( !request.done ) + { + try + { + request.wait(); + } + catch( InterruptedException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + + if( request.exception != null ) + { + request.exception.fillInStackTrace(); + + throw request.exception; + } + } + + + private class Worker implements Runnable + { + public void run() + { + Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName ); + + for( ; ; ) + { + try + { + int nKeys = selector.select(); + + registerNew(); + + if( nKeys > 0 ) + { + processSessions( selector.selectedKeys() ); + } + + cancelKeys(); + + if( selector.keys().isEmpty() ) + { + synchronized( lock ) + { + if( selector.keys().isEmpty() && + registerQueue.isEmpty() && + cancelQueue.isEmpty() ) + { + worker = null; + try + { + selector.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + selector = null; + } + break; + } + } + } + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e1 ) + { + ExceptionMonitor.getInstance().exceptionCaught( e1 ); + } + } + } + } + + private void processSessions( Set keys ) throws IOException + { + Iterator it = keys.iterator(); + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + + it.remove(); + + if( !key.isAcceptable() ) + { + continue; + } + + ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel(); + + SocketChannel ch = ssc.accept(); + + if( ch == null ) + { + continue; + } + + boolean success = false; + try + { + + RegistrationRequest req = ( RegistrationRequest ) key.attachment(); + + MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl( + MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(), + req.config, ch, req.handler, req.address ); + + // New Interface +// SocketSessionImpl session = new SocketSessionImpl( +// SocketAcceptor.this, nextProcessor(), getListeners(), +// req.config, ch, req.handler, req.address ); + + + getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); + req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); + req.config.getThreadModel().buildFilterChain( session.getFilterChain() ); + session.getIoProcessor().addNew( session ); + success = true; + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + } + finally + { + if( !success ) + { + ch.close(); + } + } + } + } + } + + private MultiThreadSocketIoProcessor nextProcessor() + { + return ioProcessors[processorDistributor++ % processorCount]; + } + + + private void registerNew() + { + if( registerQueue.isEmpty() ) + { + return; + } + + for( ; ; ) + { + RegistrationRequest req; + + synchronized( registerQueue ) + { + req = ( RegistrationRequest ) registerQueue.pop(); + } + + if( req == null ) + { + break; + } + + ServerSocketChannel ssc = null; + + try + { + ssc = ServerSocketChannel.open(); + ssc.configureBlocking( false ); + + // Configure the server socket, + SocketAcceptorConfig cfg; + if( req.config instanceof SocketAcceptorConfig ) + { + cfg = ( SocketAcceptorConfig ) req.config; + } + else + { + cfg = ( SocketAcceptorConfig ) getDefaultConfig(); + } + + ssc.socket().setReuseAddress( cfg.isReuseAddress() ); + ssc.socket().setReceiveBufferSize( + ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() ); + + // and bind. + ssc.socket().bind( req.address, cfg.getBacklog() ); + if( req.address == null || req.address.getPort() == 0 ) + { + req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress(); + } + ssc.register( selector, SelectionKey.OP_ACCEPT, req ); + + synchronized( channels ) + { + channels.put( req.address, ssc ); + } + + getListeners().fireServiceActivated( + this, req.address, req.handler, req.config ); + } + catch( IOException e ) + { + req.exception = e; + } + finally + { + synchronized( req ) + { + req.done = true; + + req.notifyAll(); + } + + if( ssc != null && req.exception != null ) + { + try + { + ssc.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + } + } + + + private void cancelKeys() + { + if( cancelQueue.isEmpty() ) + { + return; + } + + for( ; ; ) + { + CancellationRequest request; + + synchronized( cancelQueue ) + { + request = ( CancellationRequest ) cancelQueue.pop(); + } + + if( request == null ) + { + break; + } + + ServerSocketChannel ssc; + synchronized( channels ) + { + ssc = ( ServerSocketChannel ) channels.remove( request.address ); + } + + // close the channel + try + { + if( ssc == null ) + { + request.exception = new IllegalArgumentException( "Address not bound: " + request.address ); + } + else + { + SelectionKey key = ssc.keyFor( selector ); + request.registrationRequest = ( RegistrationRequest ) key.attachment(); + key.cancel(); + + selector.wakeup(); // wake up again to trigger thread death + + ssc.close(); + } + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + synchronized( request ) + { + request.done = true; + request.notifyAll(); + } + + if( request.exception == null ) + { + getListeners().fireServiceDeactivated( + this, request.address, + request.registrationRequest.handler, + request.registrationRequest.config ); + } + } + } + } + + private static class RegistrationRequest + { + private InetSocketAddress address; + private final IoHandler handler; + private final IoServiceConfig config; + private IOException exception; + private boolean done; + + private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config ) + { + this.address = ( InetSocketAddress ) address; + this.handler = handler; + this.config = config; + } + } + + + private static class CancellationRequest + { + private final SocketAddress address; + private boolean done; + private RegistrationRequest registrationRequest; + private RuntimeException exception; + + private CancellationRequest( SocketAddress address ) + { + this.address = address; + } + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java new file mode 100644 index 0000000000..202ac1a530 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoConnectorConfig; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.util.NamePreservingRunnable; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.Queue; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; + +/** + * {@link IoConnector} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class MultiThreadSocketConnector extends SocketConnector +{ + /** @noinspection StaticNonFinalField */ + private static volatile int nextId = 0; + + private final Object lock = new Object(); + private final int id = nextId++; + private final String threadName = "SocketConnector-" + id; + private SocketConnectorConfig defaultConfig = new SocketConnectorConfig(); + private final Queue connectQueue = new Queue(); + private final MultiThreadSocketIoProcessor[] ioProcessors; + private final int processorCount; + private final Executor executor; + + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ + private Selector selector; + private Worker worker; + private int processorDistributor = 0; + private int workerTimeout = 60; // 1 min. + + /** Create a connector with a single processing thread using a NewThreadExecutor */ + public MultiThreadSocketConnector() + { + this(1, new NewThreadExecutor()); + } + + /** + * Create a connector with the desired number of processing threads + * + * @param processorCount Number of processing threads + * @param executor Executor to use for launching threads + */ + public MultiThreadSocketConnector(int processorCount, Executor executor) + { + if (processorCount < 1) + { + throw new IllegalArgumentException("Must have at least one processor"); + } + + this.executor = executor; + this.processorCount = processorCount; + ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; + + for (int i = 0; i < processorCount; i++) + { + ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor); + } + } + + /** + * How many seconds to keep the connection thread alive between connection requests + * + * @return Number of seconds to keep connection thread alive + */ + public int getWorkerTimeout() + { + return workerTimeout; + } + + /** + * Set how many seconds the connection worker thread should remain alive once idle before terminating itself. + * + * @param workerTimeout Number of seconds to keep thread alive. Must be >=0 + */ + public void setWorkerTimeout(int workerTimeout) + { + if (workerTimeout < 0) + { + throw new IllegalArgumentException("Must be >= 0"); + } + this.workerTimeout = workerTimeout; + } + + public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) + { + return connect(address, null, handler, config); + } + + public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, + IoHandler handler, IoServiceConfig config) + { + if (address == null) + { + throw new NullPointerException("address"); + } + if (handler == null) + { + throw new NullPointerException("handler"); + } + + if (!(address instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected address type: " + + address.getClass()); + } + + if (localAddress != null && !(localAddress instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected local address type: " + + localAddress.getClass()); + } + + if (config == null) + { + config = getDefaultConfig(); + } + + SocketChannel ch = null; + boolean success = false; + try + { + ch = SocketChannel.open(); + ch.socket().setReuseAddress(true); + if (localAddress != null) + { + ch.socket().bind(localAddress); + } + + ch.configureBlocking(false); + + if (ch.connect(address)) + { + DefaultConnectFuture future = new DefaultConnectFuture(); + newSession(ch, handler, config, future); + success = true; + return future; + } + + success = true; + } + catch (IOException e) + { + return DefaultConnectFuture.newFailedFuture(e); + } + finally + { + if (!success && ch != null) + { + try + { + ch.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + + ConnectionRequest request = new ConnectionRequest(ch, handler, config); + synchronized (lock) + { + try + { + startupWorker(); + } + catch (IOException e) + { + try + { + ch.close(); + } + catch (IOException e2) + { + ExceptionMonitor.getInstance().exceptionCaught(e2); + } + + return DefaultConnectFuture.newFailedFuture(e); + } + } + + synchronized (connectQueue) + { + connectQueue.push(request); + } + selector.wakeup(); + + return request; + } + + private synchronized void startupWorker() throws IOException + { + if (worker == null) + { + selector = Selector.open(); + worker = new Worker(); + executor.execute(new NamePreservingRunnable(worker)); + } + } + + private void registerNew() + { + if (connectQueue.isEmpty()) + { + return; + } + + for (; ;) + { + ConnectionRequest req; + synchronized (connectQueue) + { + req = (ConnectionRequest) connectQueue.pop(); + } + + if (req == null) + { + break; + } + + SocketChannel ch = req.channel; + try + { + ch.register(selector, SelectionKey.OP_CONNECT, req); + } + catch (IOException e) + { + req.setException(e); + } + } + } + + private void processSessions(Set keys) + { + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isConnectable()) + { + continue; + } + + SocketChannel ch = (SocketChannel) key.channel(); + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + boolean success = false; + try + { + ch.finishConnect(); + newSession(ch, entry.handler, entry.config, entry); + success = true; + } + catch (Throwable e) + { + entry.setException(e); + } + finally + { + key.cancel(); + if (!success) + { + try + { + ch.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + keys.clear(); + } + + private void processTimedOutSessions(Set keys) + { + long currentTime = System.currentTimeMillis(); + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isValid()) + { + continue; + } + + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + if (currentTime >= entry.deadline) + { + entry.setException(new ConnectException()); + try + { + key.channel().close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + key.cancel(); + } + } + } + } + + private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) + throws IOException + { + MultiThreadSocketSessionImpl session = + new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(), + config, ch, handler, ch.socket().getRemoteSocketAddress()); + + //new interface +// SocketSessionImpl session = new SocketSessionImpl( +// this, nextProcessor(), getListeners(), +// config, ch, handler, ch.socket().getRemoteSocketAddress() ); + try + { + getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getThreadModel().buildFilterChain(session.getFilterChain()); + } + catch (Throwable e) + { + throw (IOException) new IOException("Failed to create a session.").initCause(e); + } + + // Set the ConnectFuture of the specified session, which will be + // removed and notified by AbstractIoFilterChain eventually. +// session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); + session.setAttribute(AbstractIoFilterChain.class.getName() + ".connectFuture", connectFuture); + + // Forward the remaining process to the SocketIoProcessor. + session.getIoProcessor().addNew(session); + } + + private MultiThreadSocketIoProcessor nextProcessor() + { + return ioProcessors[processorDistributor++ % processorCount]; + } + + private class Worker implements Runnable + { + private long lastActive = System.currentTimeMillis(); + + public void run() + { + Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName); + + for (; ;) + { + try + { + int nKeys = selector.select(1000); + + registerNew(); + + if (nKeys > 0) + { + processSessions(selector.selectedKeys()); + } + + processTimedOutSessions(selector.keys()); + + if (selector.keys().isEmpty()) + { + if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) + { + synchronized (lock) + { + if (selector.keys().isEmpty() && + connectQueue.isEmpty()) + { + worker = null; + try + { + selector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + selector = null; + } + break; + } + } + } + } + else + { + lastActive = System.currentTimeMillis(); + } + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } + } + } + } + } + + private class ConnectionRequest extends DefaultConnectFuture + { + private final SocketChannel channel; + private final long deadline; + private final IoHandler handler; + private final IoServiceConfig config; + + private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) + { + this.channel = channel; + long timeout; + if (config instanceof IoConnectorConfig) + { + timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); + } + else + { + timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); + } + this.deadline = System.currentTimeMillis() + timeout; + this.handler = handler; + this.config = config; + } + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java new file mode 100644 index 0000000000..67b8c8d820 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import java.io.IOException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.util.Queue; + +/** + * An {@link IoFilterChain} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + */ +class MultiThreadSocketFilterChain extends AbstractIoFilterChain { + + MultiThreadSocketFilterChain( IoSession parent ) + { + super( parent ); + } + + protected void doWrite( IoSession session, WriteRequest writeRequest ) + { + MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session; + Queue writeRequestQueue = s.getWriteRequestQueue(); + + // SocketIoProcessor.doFlush() will reset it after write is finished + // because the buffer will be passed with messageSent event. + ( ( ByteBuffer ) writeRequest.getMessage() ).mark(); + synchronized( writeRequestQueue ) + { + writeRequestQueue.push( writeRequest ); + if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() ) + { + // Notify SocketIoProcessor only when writeRequestQueue was empty. + s.getIoProcessor().flush( s ); + } + } + } + + protected void doClose( IoSession session ) throws IOException + { + MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session; + s.getIoProcessor().remove( s ); + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java new file mode 100644 index 0000000000..11c54bb248 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java @@ -0,0 +1,1034 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.WriteTimeoutException; +import org.apache.mina.util.IdentityHashSet; +import org.apache.mina.util.NamePreservingRunnable; +import org.apache.mina.util.Queue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$, + */ +class MultiThreadSocketIoProcessor extends SocketIoProcessor +{ + Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class); + Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader"); + Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer"); + + private static final long SELECTOR_TIMEOUT = 1000L; + + private int MAX_READ_BYTES_PER_SESSION = 524288; //512K + private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K + + private final Object readLock = new Object(); + private final Object writeLock = new Object(); + + private final String threadName; + private final Executor executor; + + private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); + + /** + * @noinspection FieldAccessedSynchronizedAndUnsynchronized + */ + private volatile Selector selector, writeSelector; + + private final Queue newSessions = new Queue(); + private final Queue removingSessions = new Queue(); + private final BlockingQueue flushingSessions = new LinkedBlockingQueue(); + private final IdentityHashSet flushingSessionsSet = new IdentityHashSet(); + + private final Queue trafficControllingSessions = new Queue(); + + private ReadWorker readWorker; + private WriteWorker writeWorker; + private long lastIdleReadCheckTime = System.currentTimeMillis(); + private long lastIdleWriteCheckTime = System.currentTimeMillis(); + + MultiThreadSocketIoProcessor(String threadName, Executor executor) + { + super(threadName, executor); + this.threadName = threadName; + this.executor = executor; + } + + void addNew( SocketSessionImpl session ) throws IOException + { + synchronized( newSessions ) + { + newSessions.push( session ); + } + + startupWorker(); + + selector.wakeup(); + writeSelector.wakeup(); + } + + void remove( SocketSessionImpl session ) throws IOException + { + scheduleRemove( session ); + startupWorker(); + selector.wakeup(); + } + + private void startupWorker() throws IOException + { + synchronized(readLock) + { + if (readWorker == null) + { + selector = Selector.open(); + readWorker = new ReadWorker(); + executor.execute(new NamePreservingRunnable(readWorker)); + } + } + + synchronized(writeLock) + { + if (writeWorker == null) + { + writeSelector = Selector.open(); + writeWorker = new WriteWorker(); + executor.execute(new NamePreservingRunnable(writeWorker)); + } + } + + } + + void flush( SocketSessionImpl session ) + { + scheduleFlush( session ); + Selector selector = this.writeSelector; + + if( selector != null ) + { + selector.wakeup(); + } + } + + void updateTrafficMask( SocketSessionImpl session ) + { + scheduleTrafficControl( session ); + Selector selector = this.selector; + if( selector != null ) + { + selector.wakeup(); + } + } + + private void scheduleRemove( SocketSessionImpl session ) + { + synchronized( removingSessions ) + { + removingSessions.push( session ); + } + } + + private void scheduleFlush( SocketSessionImpl session ) + { + synchronized(flushingSessionsSet) + { + //if flushingSessions grows to contain Integer.MAX_VALUE sessions + // then this will fail. + if (flushingSessionsSet.add(session)) + { + flushingSessions.offer(session); + } + } + } + + private void scheduleTrafficControl( SocketSessionImpl session ) + { + synchronized( trafficControllingSessions ) + { + trafficControllingSessions.push( session ); + } + } + + private void doAddNewReader() throws InterruptedException + { + if( newSessions.isEmpty() ) + { + return; + } + + for( ; ; ) + { + MultiThreadSocketSessionImpl session; + + synchronized( newSessions ) + { + session = (MultiThreadSocketSessionImpl) newSessions.peek(); + } + + if( session == null ) + { + break; + } + + SocketChannel ch = session.getChannel(); + + + try + { + + ch.configureBlocking( false ); + session.setSelectionKey( ch.register( selector, + SelectionKey.OP_READ, + session ) ); + + + //System.out.println("ReadDebug:"+"Awaiting Registration"); + session.awaitRegistration(); + sessionCreated(session); + } + catch( IOException e ) + { + // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute + // and call ConnectFuture.setException(). + session.getFilterChain().fireExceptionCaught( session, e ); + } + } + } + + + private void doAddNewWrite() throws InterruptedException + { + if (newSessions.isEmpty()) + { + return; + } + + for (; ;) + { + MultiThreadSocketSessionImpl session; + + synchronized(newSessions) + { + session = (MultiThreadSocketSessionImpl) newSessions.peek(); + } + + if (session == null) + { + break; + } + + SocketChannel ch = session.getChannel(); + + try + { + ch.configureBlocking(false); + synchronized(flushingSessionsSet) + { + flushingSessionsSet.add(session); + } + + session.setWriteSelectionKey(ch.register(writeSelector, + SelectionKey.OP_WRITE, + session)); + + //System.out.println("WriteDebug:"+"Awaiting Registration"); + session.awaitRegistration(); + sessionCreated(session); + } + catch (IOException e) + { + + // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute + // and call ConnectFuture.setException(). + session.getFilterChain().fireExceptionCaught( session, e ); + } + } + } + + + + private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException + { + MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; + synchronized(newSessions) + { + if (!session.created()) + { + _logger.debug("Popping new session"); + newSessions.pop(); + + // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here + // in AbstractIoFilterChain.fireSessionOpened(). + session.getServiceListeners().fireSessionCreated( session ); + + session.doneCreation(); + } + } + } + + private void doRemove() + { + if( removingSessions.isEmpty() ) + { + return; + } + + for( ; ; ) + { + MultiThreadSocketSessionImpl session; + + synchronized( removingSessions ) + { + session = (MultiThreadSocketSessionImpl) removingSessions.pop(); + } + + if( session == null ) + { + break; + } + + SocketChannel ch = session.getChannel(); + SelectionKey key = session.getReadSelectionKey(); + SelectionKey writeKey = session.getWriteSelectionKey(); + + // Retry later if session is not yet fully initialized. + // (In case that Session.close() is called before addSession() is processed) + if (key == null || writeKey == null) + { + scheduleRemove( session ); + break; + } + // skip if channel is already closed + if (!key.isValid() || !writeKey.isValid()) + { + continue; + } + + try + { + //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); + synchronized(readLock) + { + key.cancel(); + } + synchronized(writeLock) + { + writeKey.cancel(); + } + ch.close(); + } + catch( IOException e ) + { + session.getFilterChain().fireExceptionCaught( session, e ); + } + finally + { + releaseWriteBuffers( session ); + session.getServiceListeners().fireSessionDestroyed( session ); + } + } + } + + private void processRead(Set selectedKeys) + { + Iterator it = selectedKeys.iterator(); + + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); + + synchronized(readLock) + { + if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) + { + read( session ); + } + } + + } + + selectedKeys.clear(); + } + + private void processWrite(Set selectedKeys) + { + Iterator it = selectedKeys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); + + synchronized(writeLock) + { + if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) + { + + // Clear OP_WRITE + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + + synchronized(flushingSessionsSet) + { + flushingSessions.offer(session); + } + } + } + } + + selectedKeys.clear(); + } + + private void read(SocketSessionImpl session) + { + + //if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session)); + } + + int totalReadBytes = 0; + + for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;) + { + ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); + SocketChannel ch = session.getChannel(); + + try + { + buf.clear(); + + int readBytes = 0; + int ret; + + try + { + while ((ret = ch.read(buf.buf())) > 0) + { + readBytes += ret; + totalReadBytes += ret; + } + } + finally + { + buf.flip(); + } + + + if (readBytes > 0) + { + session.increaseReadBytes(readBytes); + + session.getFilterChain().fireMessageReceived(session, buf); + buf = null; + } + + if (ret <= 0) + { + if (ret == 0) + { + if (readBytes == session.getReadBufferSize()) + { + continue; + } + } + else + { + scheduleRemove(session); + } + + break; + } + } + catch (Throwable e) + { + if (e instanceof IOException) + { + scheduleRemove(session); + } + session.getFilterChain().fireExceptionCaught(session, e); + } + finally + { + if (buf != null) + { + buf.release(); + } + } + }//for + + // if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes); + } + } + + + private void notifyReadIdleness() + { + // process idle sessions + long currentTime = System.currentTimeMillis(); + if ((currentTime - lastIdleReadCheckTime) >= 1000) + { + lastIdleReadCheckTime = currentTime; + Set keys = selector.keys(); + if( keys != null ) + { + for( Iterator it = keys.iterator(); it.hasNext(); ) + { + SelectionKey key = ( SelectionKey ) it.next(); + SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); + notifyReadIdleness(session, currentTime); + } + } + } + } + + private void notifyWriteIdleness() + { + // process idle sessions + long currentTime = System.currentTimeMillis(); + if ((currentTime - lastIdleWriteCheckTime) >= 1000) + { + lastIdleWriteCheckTime = currentTime; + Set keys = writeSelector.keys(); + if (keys != null) + { + for (Iterator it = keys.iterator(); it.hasNext();) + { + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); + notifyWriteIdleness(session, currentTime); + } + } + } + } + + private void notifyReadIdleness(SocketSessionImpl session, long currentTime) + { + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ), + IdleStatus.BOTH_IDLE, + Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) ); + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis( IdleStatus.READER_IDLE ), + IdleStatus.READER_IDLE, + Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) ); + + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); + } + + private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) + { + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ), + IdleStatus.WRITER_IDLE, + Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) ); + + notifyWriteTimeout( session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime() ); + } + + private void notifyIdleness0( SocketSessionImpl session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime ) + { + if( idleTime > 0 && lastIoTime != 0 + && ( currentTime - lastIoTime ) >= idleTime ) + { + session.increaseIdleCount( status ); + session.getFilterChain().fireSessionIdle( session, status ); + } + } + + private void notifyWriteTimeout( SocketSessionImpl session, + long currentTime, + long writeTimeout, long lastIoTime ) + { + + MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; + SelectionKey key = sesh.getWriteSelectionKey(); + + synchronized(writeLock) + { + if( writeTimeout > 0 + && ( currentTime - lastIoTime ) >= writeTimeout + && key != null && key.isValid() + && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 ) + { + session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() ); + } + } + } + + private SocketSessionImpl getNextFlushingSession() + { + return (SocketSessionImpl) flushingSessions.poll(); + } + + private void releaseSession(SocketSessionImpl session) + { + synchronized(session.getWriteRequestQueue()) + { + synchronized(flushingSessionsSet) + { + if (session.getScheduledWriteRequests() > 0) + { + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session)); + } + flushingSessions.offer(session); + } + else + { + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session)); + } + flushingSessionsSet.remove(session); + } + } + } + } + + private void releaseWriteBuffers(SocketSessionImpl session) + { + Queue writeRequestQueue = session.getWriteRequestQueue(); + WriteRequest req; + + //Should this be synchronized? + synchronized(writeRequestQueue) + { + while ((req = (WriteRequest) writeRequestQueue.pop()) != null) + { + try + { + ((ByteBuffer) req.getMessage()).release(); + } + catch (IllegalStateException e) + { + session.getFilterChain().fireExceptionCaught(session, e); + } + finally + { + req.getFuture().setWritten(false); + } + } + } + } + + private void doFlush() + { + MultiThreadSocketSessionImpl session; + + while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) + { + if( !session.isConnected() ) + { + releaseWriteBuffers( session ); + releaseSession(session); + continue; + } + + SelectionKey key = session.getWriteSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.write() is called before addSession() is processed) + if( key == null ) + { + scheduleFlush( session ); + releaseSession(session); + continue; + } + // skip if channel is already closed + if( !key.isValid() ) + { + releaseSession(session); + continue; + } + + try + { + if (doFlush(session)) + { + releaseSession(session); + } + } + catch( IOException e ) + { + releaseSession(session); + scheduleRemove( session ); + session.getFilterChain().fireExceptionCaught( session, e ); + } + + } + + } + + private boolean doFlush(SocketSessionImpl sessionParam) throws IOException + { + MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; + // Clear OP_WRITE + SelectionKey key = session.getWriteSelectionKey(); + synchronized(writeLock) + { + key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) ); + } + SocketChannel ch = session.getChannel(); + Queue writeRequestQueue = session.getWriteRequestQueue(); + + long totalFlushedBytes = 0; + for( ; ; ) + { + WriteRequest req; + + synchronized( writeRequestQueue ) + { + req = ( WriteRequest ) writeRequestQueue.first(); + } + + if( req == null ) + { + break; + } + + ByteBuffer buf = ( ByteBuffer ) req.getMessage(); + if( buf.remaining() == 0 ) + { + synchronized( writeRequestQueue ) + { + writeRequestQueue.pop(); + } + + session.increaseWrittenMessages(); + + buf.reset(); + session.getFilterChain().fireMessageSent( session, req ); + continue; + } + + + int writtenBytes = 0; + + // Reported as DIRMINA-362 + //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. +// if (key.isWritable()) + { + try + { + writtenBytes = ch.write(buf.buf()); + totalFlushedBytes += writtenBytes; + } + catch (IOException ioe) + { + throw ioe; + } + } + + if( writtenBytes > 0 ) + { + session.increaseWrittenBytes( writtenBytes ); + } + + if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) + { + // Kernel buffer is full + synchronized (writeLock) + { + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + } + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); + } + return false; + } + } + + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); + } + return true; + } + + private void doUpdateTrafficMask() + { + if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked()) + { + return; + } + + // Synchronize over entire operation as this method should be called + // from both read and write thread and we don't want the order of the + // updates to get changed. + trafficMaskUpdateLock.lock(); + try + { + for (; ;) + { + MultiThreadSocketSessionImpl session; + + session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop(); + + if (session == null) + { + break; + } + + SelectionKey key = session.getReadSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.suspend??() or session.resume??() is + // called before addSession() is processed) + if (key == null) + { + scheduleTrafficControl(session); + break; + } + // skip if channel is already closed + if (!key.isValid()) + { + continue; + } + + // The normal is OP_READ and, if there are write requests in the + // session's write queue, set OP_WRITE to trigger flushing. + + //Sset to Read and Write if there is nothing then the cost + // is one loop through the flusher. + int ops = SelectionKey.OP_READ; + + // Now mask the preferred ops with the mask of the current session + int mask = session.getTrafficMask().getInterestOps(); + synchronized (readLock) + { + key.interestOps(ops & mask); + } + //Change key to the WriteSelection Key + key = session.getWriteSelectionKey(); + if (key != null && key.isValid()) + { + Queue writeRequestQueue = session.getWriteRequestQueue(); + synchronized (writeRequestQueue) + { + if (!writeRequestQueue.isEmpty()) + { + ops = SelectionKey.OP_WRITE; + synchronized (writeLock) + { + key.interestOps(ops & mask); + } + } + } + } + } + } + finally + { + trafficMaskUpdateLock.unlock(); + } + + } + + private class WriteWorker implements Runnable + { + + public void run() + { + Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer"); + + //System.out.println("WriteDebug:"+"Startup"); + for (; ;) + { + try + { + int nKeys = writeSelector.select(SELECTOR_TIMEOUT); + + doAddNewWrite(); + doUpdateTrafficMask(); + + if (nKeys > 0) + { + //System.out.println("WriteDebug:"+nKeys + " keys from writeselector"); + processWrite(writeSelector.selectedKeys()); + } + else + { + //System.out.println("WriteDebug:"+"No keys from writeselector"); + } + + doRemove(); + notifyWriteIdleness(); + + if (flushingSessionsSet.size() > 0) + { + doFlush(); + } + + if (writeSelector.keys().isEmpty()) + { + synchronized(writeLock) + { + + if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) + { + writeWorker = null; + try + { + writeSelector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + writeSelector = null; + } + + break; + } + } + } + + } + catch (Throwable t) + { + ExceptionMonitor.getInstance().exceptionCaught(t); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } + } + } + //System.out.println("WriteDebug:"+"Shutdown"); + } + + } + + private class ReadWorker implements Runnable + { + + public void run() + { + Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); + + //System.out.println("ReadDebug:"+"Startup"); + for( ; ; ) + { + try + { + int nKeys = selector.select(SELECTOR_TIMEOUT); + + doAddNewReader(); + doUpdateTrafficMask(); + + if( nKeys > 0 ) + { + //System.out.println("ReadDebug:"+nKeys + " keys from selector"); + + processRead(selector.selectedKeys()); + } + else + { + //System.out.println("ReadDebug:"+"No keys from selector"); + } + + + doRemove(); + notifyReadIdleness(); + + if( selector.keys().isEmpty() ) + { + + synchronized(readLock) + { + if( selector.keys().isEmpty() && newSessions.isEmpty() ) + { + readWorker = null; + try + { + selector.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + selector = null; + } + + break; + } + } + } + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e1 ) + { + ExceptionMonitor.getInstance().exceptionCaught( e1 ); + } + } + } + //System.out.println("ReadDebug:"+"Shutdown"); + } + + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java new file mode 100644 index 0000000000..003df5e73c --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoConnectorConfig; +import org.apache.mina.common.support.BaseIoSessionConfig; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; + +/** + * An {@link IoConnectorConfig} for {@link SocketConnector}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class MultiThreadSocketSessionConfigImpl extends org.apache.mina.transport.socket.nio.SocketSessionConfigImpl +{ + private static boolean SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false; + private static boolean SET_SEND_BUFFER_SIZE_AVAILABLE = false; + private static boolean GET_TRAFFIC_CLASS_AVAILABLE = false; + private static boolean SET_TRAFFIC_CLASS_AVAILABLE = false; + + private static boolean DEFAULT_REUSE_ADDRESS; + private static int DEFAULT_RECEIVE_BUFFER_SIZE; + private static int DEFAULT_SEND_BUFFER_SIZE; + private static int DEFAULT_TRAFFIC_CLASS; + private static boolean DEFAULT_KEEP_ALIVE; + private static boolean DEFAULT_OOB_INLINE; + private static int DEFAULT_SO_LINGER; + private static boolean DEFAULT_TCP_NO_DELAY; + + static + { + initialize(); + } + + private static void initialize() + { + Socket socket = null; + + socket = new Socket(); + + try + { + DEFAULT_REUSE_ADDRESS = socket.getReuseAddress(); + DEFAULT_RECEIVE_BUFFER_SIZE = socket.getReceiveBufferSize(); + DEFAULT_SEND_BUFFER_SIZE = socket.getSendBufferSize(); + DEFAULT_KEEP_ALIVE = socket.getKeepAlive(); + DEFAULT_OOB_INLINE = socket.getOOBInline(); + DEFAULT_SO_LINGER = socket.getSoLinger(); + DEFAULT_TCP_NO_DELAY = socket.getTcpNoDelay(); + + // Check if setReceiveBufferSize is supported. + try + { + socket.setReceiveBufferSize(DEFAULT_RECEIVE_BUFFER_SIZE); + SET_RECEIVE_BUFFER_SIZE_AVAILABLE = true; + } + catch( SocketException e ) + { + SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false; + } + + // Check if setSendBufferSize is supported. + try + { + socket.setSendBufferSize(DEFAULT_SEND_BUFFER_SIZE); + SET_SEND_BUFFER_SIZE_AVAILABLE = true; + } + catch( SocketException e ) + { + SET_SEND_BUFFER_SIZE_AVAILABLE = false; + } + + // Check if getTrafficClass is supported. + try + { + DEFAULT_TRAFFIC_CLASS = socket.getTrafficClass(); + GET_TRAFFIC_CLASS_AVAILABLE = true; + } + catch( SocketException e ) + { + GET_TRAFFIC_CLASS_AVAILABLE = false; + DEFAULT_TRAFFIC_CLASS = 0; + } + } + catch( SocketException e ) + { + throw new ExceptionInInitializerError(e); + } + finally + { + if( socket != null ) + { + try + { + socket.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + public static boolean isSetReceiveBufferSizeAvailable() { + return SET_RECEIVE_BUFFER_SIZE_AVAILABLE; + } + + public static boolean isSetSendBufferSizeAvailable() { + return SET_SEND_BUFFER_SIZE_AVAILABLE; + } + + public static boolean isGetTrafficClassAvailable() { + return GET_TRAFFIC_CLASS_AVAILABLE; + } + + public static boolean isSetTrafficClassAvailable() { + return SET_TRAFFIC_CLASS_AVAILABLE; + } + + private boolean reuseAddress = DEFAULT_REUSE_ADDRESS; + private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; + private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; + private int trafficClass = DEFAULT_TRAFFIC_CLASS; + private boolean keepAlive = DEFAULT_KEEP_ALIVE; + private boolean oobInline = DEFAULT_OOB_INLINE; + private int soLinger = DEFAULT_SO_LINGER; + private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; + + /** + * Creates a new instance. + */ + MultiThreadSocketSessionConfigImpl() + { + } + + public boolean isReuseAddress() + { + return reuseAddress; + } + + public void setReuseAddress( boolean reuseAddress ) + { + this.reuseAddress = reuseAddress; + } + + public int getReceiveBufferSize() + { + return receiveBufferSize; + } + + public void setReceiveBufferSize( int receiveBufferSize ) + { + this.receiveBufferSize = receiveBufferSize; + } + + public int getSendBufferSize() + { + return sendBufferSize; + } + + public void setSendBufferSize( int sendBufferSize ) + { + this.sendBufferSize = sendBufferSize; + } + + public int getTrafficClass() + { + return trafficClass; + } + + public void setTrafficClass( int trafficClass ) + { + this.trafficClass = trafficClass; + } + + public boolean isKeepAlive() + { + return keepAlive; + } + + public void setKeepAlive( boolean keepAlive ) + { + this.keepAlive = keepAlive; + } + + public boolean isOobInline() + { + return oobInline; + } + + public void setOobInline( boolean oobInline ) + { + this.oobInline = oobInline; + } + + public int getSoLinger() + { + return soLinger; + } + + public void setSoLinger( int soLinger ) + { + this.soLinger = soLinger; + } + + public boolean isTcpNoDelay() + { + return tcpNoDelay; + } + + public void setTcpNoDelay( boolean tcpNoDelay ) + { + this.tcpNoDelay = tcpNoDelay; + } + + +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java new file mode 100644 index 0000000000..ee5c24b3ab --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoService; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.RuntimeIOException; +import org.apache.mina.common.TransportType; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.common.support.IoServiceListenerSupport; +import org.apache.mina.util.Queue; + +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * An {@link IoSession} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +class MultiThreadSocketSessionImpl extends SocketSessionImpl +{ + private final IoService manager; + private final IoServiceConfig serviceConfig; + private final SocketSessionConfig config = new SessionConfigImpl(); + private final MultiThreadSocketIoProcessor ioProcessor; + private final MultiThreadSocketFilterChain filterChain; + private final SocketChannel ch; + private final Queue writeRequestQueue; + private final IoHandler handler; + private final SocketAddress remoteAddress; + private final SocketAddress localAddress; + private final SocketAddress serviceAddress; + private final IoServiceListenerSupport serviceListeners; + private SelectionKey readKey, writeKey; + private int readBufferSize; + private CountDownLatch registeredReadyLatch = new CountDownLatch(2); + private AtomicBoolean created = new AtomicBoolean(false); + + /** + * Creates a new instance. + */ + MultiThreadSocketSessionImpl( IoService manager, + SocketIoProcessor ioProcessor, + IoServiceListenerSupport listeners, + IoServiceConfig serviceConfig, + SocketChannel ch, + IoHandler defaultHandler, + SocketAddress serviceAddress ) + { + super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress); + this.manager = manager; + this.serviceListeners = listeners; + this.ioProcessor = (MultiThreadSocketIoProcessor) ioProcessor; + this.filterChain = new MultiThreadSocketFilterChain(this); + this.ch = ch; + this.writeRequestQueue = new Queue(); + this.handler = defaultHandler; + this.remoteAddress = ch.socket().getRemoteSocketAddress(); + this.localAddress = ch.socket().getLocalSocketAddress(); + this.serviceAddress = serviceAddress; + this.serviceConfig = serviceConfig; + + // Apply the initial session settings + IoSessionConfig sessionConfig = serviceConfig.getSessionConfig(); + if( sessionConfig instanceof SocketSessionConfig ) + { + SocketSessionConfig cfg = ( SocketSessionConfig ) sessionConfig; + this.config.setKeepAlive( cfg.isKeepAlive() ); + this.config.setOobInline( cfg.isOobInline() ); + this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() ); + this.readBufferSize = cfg.getReceiveBufferSize(); + this.config.setReuseAddress( cfg.isReuseAddress() ); + this.config.setSendBufferSize( cfg.getSendBufferSize() ); + this.config.setSoLinger( cfg.getSoLinger() ); + this.config.setTcpNoDelay( cfg.isTcpNoDelay() ); + + if( this.config.getTrafficClass() != cfg.getTrafficClass() ) + { + this.config.setTrafficClass( cfg.getTrafficClass() ); + } + } + } + + void awaitRegistration() throws InterruptedException + { + registeredReadyLatch.countDown(); + + registeredReadyLatch.await(); + } + + boolean created() throws InterruptedException + { + return created.get(); + } + + void doneCreation() + { + created.getAndSet(true); + } + + public IoService getService() + { + return manager; + } + + public IoServiceConfig getServiceConfig() + { + return serviceConfig; + } + + public IoSessionConfig getConfig() + { + return config; + } + + SocketIoProcessor getIoProcessor() + { + return ioProcessor; + } + + public IoFilterChain getFilterChain() + { + return filterChain; + } + + SocketChannel getChannel() + { + return ch; + } + + IoServiceListenerSupport getServiceListeners() + { + return serviceListeners; + } + + SelectionKey getSelectionKey() + { + return readKey; + } + + SelectionKey getReadSelectionKey() + { + return readKey; + } + + SelectionKey getWriteSelectionKey() + { + return writeKey; + } + + void setSelectionKey(SelectionKey key) + { + this.readKey = key; + } + + void setWriteSelectionKey(SelectionKey key) + { + this.writeKey = key; + } + + public IoHandler getHandler() + { + return handler; + } + + protected void close0() + { + filterChain.fireFilterClose( this ); + } + + Queue getWriteRequestQueue() + { + return writeRequestQueue; + } + + /** + @return int Number of write scheduled write requests + @deprecated + */ + public int getScheduledWriteMessages() + { + return getScheduledWriteRequests(); + } + + public int getScheduledWriteRequests() + { + synchronized( writeRequestQueue ) + { + return writeRequestQueue.size(); + } + } + + public int getScheduledWriteBytes() + { + synchronized( writeRequestQueue ) + { + return writeRequestQueue.byteSize(); + } + } + + protected void write0( WriteRequest writeRequest ) + { + filterChain.fireFilterWrite( this, writeRequest ); + } + + public TransportType getTransportType() + { + return TransportType.SOCKET; + } + + public SocketAddress getRemoteAddress() + { + //This is what I had previously +// return ch.socket().getRemoteSocketAddress(); + return remoteAddress; + } + + public SocketAddress getLocalAddress() + { + //This is what I had previously +// return ch.socket().getLocalSocketAddress(); + return localAddress; + } + + public SocketAddress getServiceAddress() + { + return serviceAddress; + } + + protected void updateTrafficMask() + { + this.ioProcessor.updateTrafficMask( this ); + } + + int getReadBufferSize() + { + return readBufferSize; + } + + private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig + { + public boolean isKeepAlive() + { + try + { + return ch.socket().getKeepAlive(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setKeepAlive( boolean on ) + { + try + { + ch.socket().setKeepAlive( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isOobInline() + { + try + { + return ch.socket().getOOBInline(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setOobInline( boolean on ) + { + try + { + ch.socket().setOOBInline( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isReuseAddress() + { + try + { + return ch.socket().getReuseAddress(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setReuseAddress( boolean on ) + { + try + { + ch.socket().setReuseAddress( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public int getSoLinger() + { + try + { + return ch.socket().getSoLinger(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setSoLinger( int linger ) + { + try + { + if( linger < 0 ) + { + ch.socket().setSoLinger( false, 0 ); + } + else + { + ch.socket().setSoLinger( true, linger ); + } + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isTcpNoDelay() + { + try + { + return ch.socket().getTcpNoDelay(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setTcpNoDelay( boolean on ) + { + try + { + ch.socket().setTcpNoDelay( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public int getTrafficClass() + { + if( SocketSessionConfigImpl.isGetTrafficClassAvailable() ) + { + try + { + return ch.socket().getTrafficClass(); + } + catch( SocketException e ) + { + // Throw an exception only when setTrafficClass is also available. + if( SocketSessionConfigImpl.isSetTrafficClassAvailable() ) + { + throw new RuntimeIOException( e ); + } + } + } + + return 0; + } + + public void setTrafficClass( int tc ) + { + if( SocketSessionConfigImpl.isSetTrafficClassAvailable() ) + { + try + { + ch.socket().setTrafficClass( tc ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + } + + public int getSendBufferSize() + { + try + { + return ch.socket().getSendBufferSize(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setSendBufferSize( int size ) + { + if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() ) + { + try + { + ch.socket().setSendBufferSize( size ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + } + + public int getReceiveBufferSize() + { + try + { + return ch.socket().getReceiveBufferSize(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setReceiveBufferSize( int size ) + { + if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() ) + { + try + { + ch.socket().setReceiveBufferSize( size ); + MultiThreadSocketSessionImpl.this.readBufferSize = size; + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + } + } +} |