summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java197
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java3
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java276
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java151
4 files changed, 483 insertions, 144 deletions
diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
new file mode 100644
index 0000000000..810d12f472
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link ProtocolDecoder} that cumulates the content of received
+ * buffers to a <em>cumulative buffer</em> to help users implement decoders.
+ * <p>
+ * If the received {@link ByteBuffer} is only a part of a message.
+ * decoders should cumulate received buffers to make a message complete or
+ * to postpone decoding until more buffers arrive.
+ * <p>
+ * Here is an example decoder that decodes CRLF terminated lines into
+ * <code>Command</code> objects:
+ * <pre>
+ * public class CRLFTerminatedCommandLineDecoder
+ * extends CumulativeProtocolDecoder {
+ *
+ * private Command parseCommand(ByteBuffer in) {
+ * // Convert the bytes in the specified buffer to a
+ * // Command object.
+ * ...
+ * }
+ *
+ * protected boolean doDecode(IoSession session, ByteBuffer in,
+ * ProtocolDecoderOutput out)
+ * throws Exception {
+ *
+ * // Remember the initial position.
+ * int start = in.position();
+ *
+ * // Now find the first CRLF in the buffer.
+ * byte previous = 0;
+ * while (in.hasRemaining()) {
+ * byte current = in.get();
+ *
+ * if (previous == '\r' && current == '\n') {
+ * // Remember the current position and limit.
+ * int position = in.position();
+ * int limit = in.limit();
+ * try {
+ * in.position(start);
+ * in.limit(position);
+ * // The bytes between in.position() and in.limit()
+ * // now contain a full CRLF terminated line.
+ * out.write(parseCommand(in.slice()));
+ * } finally {
+ * // Set the position to point right after the
+ * // detected line and set the limit to the old
+ * // one.
+ * in.position(position);
+ * in.limit(limit);
+ * }
+ * // Decoded one line; CumulativeProtocolDecoder will
+ * // call me again until I return false. So just
+ * // return true until there are no more lines in the
+ * // buffer.
+ * return true;
+ * }
+ *
+ * previous = current;
+ * }
+ *
+ * // Could not find CRLF in the buffer. Reset the initial
+ * // position to the one we recorded above.
+ * in.position(start);
+ *
+ * return false;
+ * }
+ * }
+ * </pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 598285 $, $Date: 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) $
+ */
+public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter {
+
+ private static final String BUFFER = OurCumulativeProtocolDecoder.class
+ .getName()
+ + ".Buffer";
+
+ /**
+ * Creates a new instance.
+ */
+ protected OurCumulativeProtocolDecoder() {
+ }
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is NOT compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception {
+ boolean usingSessionBuffer = true;
+ ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER);
+ // If we have a session buffer, append data to that; otherwise
+ // use the buffer read from the network directly.
+ if (buf != null) {
+ buf.put(in);
+ buf.flip();
+ } else {
+ buf = in;
+ usingSessionBuffer = false;
+ }
+
+ for (;;) {
+ int oldPos = buf.position();
+ boolean decoded = doDecode(session, buf, out);
+ if (decoded) {
+ if (buf.position() == oldPos) {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed.");
+ }
+
+ if (!buf.hasRemaining()) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if (buf.hasRemaining()) {
+ storeRemainingInSession(buf, session);
+ } else {
+ if (usingSessionBuffer)
+ removeSessionBuffer(session);
+ }
+ }
+
+ /**
+ * Implement this method to consume the specified cumulative buffer and
+ * decode its content into message(s).
+ *
+ * @param in the cumulative buffer
+ * @return <tt>true</tt> if and only if there's more to decode in the buffer
+ * and you want to have <tt>doDecode</tt> method invoked again.
+ * Return <tt>false</tt> if remaining data is not enough to decode,
+ * then this method will be invoked again when more data is cumulated.
+ * @throws Exception if cannot decode <tt>in</tt>.
+ */
+ protected abstract boolean doDecode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception;
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose(IoSession session) throws Exception {
+ removeSessionBuffer(session);
+ }
+
+ private void removeSessionBuffer(IoSession session) {
+ ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER);
+ if (buf != null) {
+ buf.release();
+ }
+ }
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session) {
+ ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity());
+ remainingBuf.setAutoExpand(true);
+ remainingBuf.order(buf.order());
+ remainingBuf.put(buf);
+ session.setAttribute(BUFFER, remainingBuf);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
index 202ac1a530..cb24102edd 100644
--- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
@@ -376,8 +376,7 @@ public class MultiThreadSocketConnector extends SocketConnector
// Set the ConnectFuture of the specified session, which will be
// removed and notified by AbstractIoFilterChain eventually.
-// session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
- session.setAttribute(AbstractIoFilterChain.class.getName() + ".connectFuture", connectFuture);
+ session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
// Forward the remaining process to the SocketIoProcessor.
session.getIoProcessor().addNew(session);
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
index 11c54bb248..03838ca3f1 100644
--- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
@@ -66,9 +66,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
- /**
- * @noinspection FieldAccessedSynchronizedAndUnsynchronized
- */
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
private volatile Selector selector, writeSelector;
private final Queue newSessions = new Queue();
@@ -90,11 +88,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
this.executor = executor;
}
- void addNew( SocketSessionImpl session ) throws IOException
+ void addNew(SocketSessionImpl session) throws IOException
{
- synchronized( newSessions )
+ synchronized (newSessions)
{
- newSessions.push( session );
+ newSessions.push(session);
}
startupWorker();
@@ -103,16 +101,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
writeSelector.wakeup();
}
- void remove( SocketSessionImpl session ) throws IOException
+ void remove(SocketSessionImpl session) throws IOException
{
- scheduleRemove( session );
+ scheduleRemove(session);
startupWorker();
selector.wakeup();
}
private void startupWorker() throws IOException
{
- synchronized(readLock)
+ synchronized (readLock)
{
if (readWorker == null)
{
@@ -122,7 +120,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
}
- synchronized(writeLock)
+ synchronized (writeLock)
{
if (writeWorker == null)
{
@@ -134,38 +132,38 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
- void flush( SocketSessionImpl session )
+ void flush(SocketSessionImpl session)
{
- scheduleFlush( session );
+ scheduleFlush(session);
Selector selector = this.writeSelector;
- if( selector != null )
+ if (selector != null)
{
selector.wakeup();
}
}
- void updateTrafficMask( SocketSessionImpl session )
+ void updateTrafficMask(SocketSessionImpl session)
{
- scheduleTrafficControl( session );
+ scheduleTrafficControl(session);
Selector selector = this.selector;
- if( selector != null )
+ if (selector != null)
{
selector.wakeup();
}
}
- private void scheduleRemove( SocketSessionImpl session )
+ private void scheduleRemove(SocketSessionImpl session)
{
- synchronized( removingSessions )
+ synchronized (removingSessions)
{
- removingSessions.push( session );
+ removingSessions.push(session);
}
}
- private void scheduleFlush( SocketSessionImpl session )
+ private void scheduleFlush(SocketSessionImpl session)
{
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
//if flushingSessions grows to contain Integer.MAX_VALUE sessions
// then this will fail.
@@ -176,31 +174,31 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
}
- private void scheduleTrafficControl( SocketSessionImpl session )
+ private void scheduleTrafficControl(SocketSessionImpl session)
{
- synchronized( trafficControllingSessions )
+ synchronized (trafficControllingSessions)
{
- trafficControllingSessions.push( session );
+ trafficControllingSessions.push(session);
}
}
private void doAddNewReader() throws InterruptedException
{
- if( newSessions.isEmpty() )
+ if (newSessions.isEmpty())
{
return;
}
- for( ; ; )
+ for (; ;)
{
MultiThreadSocketSessionImpl session;
- synchronized( newSessions )
+ synchronized (newSessions)
{
session = (MultiThreadSocketSessionImpl) newSessions.peek();
}
- if( session == null )
+ if (session == null)
{
break;
}
@@ -211,21 +209,20 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
try
{
- ch.configureBlocking( false );
- session.setSelectionKey( ch.register( selector,
- SelectionKey.OP_READ,
- session ) );
-
+ ch.configureBlocking(false);
+ session.setSelectionKey(ch.register(selector,
+ SelectionKey.OP_READ,
+ session));
//System.out.println("ReadDebug:"+"Awaiting Registration");
session.awaitRegistration();
sessionCreated(session);
}
- catch( IOException e )
+ catch (IOException e)
{
// Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
- session.getFilterChain().fireExceptionCaught( session, e );
+ session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
@@ -242,7 +239,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
{
MultiThreadSocketSessionImpl session;
- synchronized(newSessions)
+ synchronized (newSessions)
{
session = (MultiThreadSocketSessionImpl) newSessions.peek();
}
@@ -257,7 +254,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
try
{
ch.configureBlocking(false);
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
flushingSessionsSet.add(session);
}
@@ -275,17 +272,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
- session.getFilterChain().fireExceptionCaught( session, e );
+ session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
-
private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
{
MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
- synchronized(newSessions)
+ synchronized (newSessions)
{
if (!session.created())
{
@@ -294,7 +290,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
- session.getServiceListeners().fireSessionCreated( session );
+ session.getServiceListeners().fireSessionCreated(session);
session.doneCreation();
}
@@ -303,21 +299,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void doRemove()
{
- if( removingSessions.isEmpty() )
+ if (removingSessions.isEmpty())
{
return;
}
- for( ; ; )
+ for (; ;)
{
MultiThreadSocketSessionImpl session;
- synchronized( removingSessions )
+ synchronized (removingSessions)
{
session = (MultiThreadSocketSessionImpl) removingSessions.pop();
}
- if( session == null )
+ if (session == null)
{
break;
}
@@ -330,7 +326,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// (In case that Session.close() is called before addSession() is processed)
if (key == null || writeKey == null)
{
- scheduleRemove( session );
+ scheduleRemove(session);
break;
}
// skip if channel is already closed
@@ -342,24 +338,24 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
try
{
//System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
- synchronized(readLock)
+ synchronized (readLock)
{
key.cancel();
}
- synchronized(writeLock)
+ synchronized (writeLock)
{
writeKey.cancel();
}
ch.close();
}
- catch( IOException e )
+ catch (IOException e)
{
- session.getFilterChain().fireExceptionCaught( session, e );
+ session.getFilterChain().fireExceptionCaught(session, e);
}
finally
{
- releaseWriteBuffers( session );
- session.getServiceListeners().fireSessionDestroyed( session );
+ releaseWriteBuffers(session);
+ session.getServiceListeners().fireSessionDestroyed(session);
}
}
}
@@ -368,16 +364,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
{
Iterator it = selectedKeys.iterator();
- while( it.hasNext() )
+ while (it.hasNext())
{
- SelectionKey key = ( SelectionKey ) it.next();
+ SelectionKey key = (SelectionKey) it.next();
MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
- synchronized(readLock)
+ synchronized (readLock)
{
if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
{
- read( session );
+ read(session);
}
}
@@ -395,7 +391,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
SelectionKey key = (SelectionKey) it.next();
SocketSessionImpl session = (SocketSessionImpl) key.attachment();
- synchronized(writeLock)
+ synchronized (writeLock)
{
if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
{
@@ -403,7 +399,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// Clear OP_WRITE
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
flushingSessions.offer(session);
}
@@ -424,7 +420,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
int totalReadBytes = 0;
- for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;)
+ while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION)
{
ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
SocketChannel ch = session.getChannel();
@@ -482,6 +478,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
scheduleRemove(session);
}
session.getFilterChain().fireExceptionCaught(session, e);
+
+ //Stop Reading this session.
+ return;
}
finally
{
@@ -507,12 +506,12 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
{
lastIdleReadCheckTime = currentTime;
Set keys = selector.keys();
- if( keys != null )
+ if (keys != null)
{
- for( Iterator it = keys.iterator(); it.hasNext(); )
+ for (Iterator it = keys.iterator(); it.hasNext();)
{
- SelectionKey key = ( SelectionKey ) it.next();
- SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
notifyReadIdleness(session, currentTime);
}
}
@@ -542,15 +541,15 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
{
notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
- IdleStatus.BOTH_IDLE,
- Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE,
+ Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
- IdleStatus.READER_IDLE,
- Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
+ IdleStatus.READER_IDLE,
+ Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
notifyWriteTimeout(session, currentTime, session
.getWriteTimeoutInMillis(), session.getLastWriteTime());
@@ -559,51 +558,51 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
{
notifyIdleness0(
- session, currentTime,
+ session, currentTime,
session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
IdleStatus.BOTH_IDLE,
Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
notifyIdleness0(
session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
- IdleStatus.WRITER_IDLE,
- Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
+ session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+ IdleStatus.WRITER_IDLE,
+ Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
- notifyWriteTimeout( session, currentTime, session
- .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+ notifyWriteTimeout(session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime());
}
- private void notifyIdleness0( SocketSessionImpl session, long currentTime,
- long idleTime, IdleStatus status,
- long lastIoTime )
+ private void notifyIdleness0(SocketSessionImpl session, long currentTime,
+ long idleTime, IdleStatus status,
+ long lastIoTime)
{
- if( idleTime > 0 && lastIoTime != 0
- && ( currentTime - lastIoTime ) >= idleTime )
+ if (idleTime > 0 && lastIoTime != 0
+ && (currentTime - lastIoTime) >= idleTime)
{
- session.increaseIdleCount( status );
- session.getFilterChain().fireSessionIdle( session, status );
+ session.increaseIdleCount(status);
+ session.getFilterChain().fireSessionIdle(session, status);
}
}
- private void notifyWriteTimeout( SocketSessionImpl session,
- long currentTime,
- long writeTimeout, long lastIoTime )
+ private void notifyWriteTimeout(SocketSessionImpl session,
+ long currentTime,
+ long writeTimeout, long lastIoTime)
{
MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
SelectionKey key = sesh.getWriteSelectionKey();
- synchronized(writeLock)
- {
- if( writeTimeout > 0
- && ( currentTime - lastIoTime ) >= writeTimeout
- && key != null && key.isValid()
- && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
+ synchronized (writeLock)
{
- session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() );
+ if (writeTimeout > 0
+ && (currentTime - lastIoTime) >= writeTimeout
+ && key != null && key.isValid()
+ && (key.interestOps() & SelectionKey.OP_WRITE) != 0)
+ {
+ session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException());
+ }
}
}
- }
private SocketSessionImpl getNextFlushingSession()
{
@@ -612,9 +611,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void releaseSession(SocketSessionImpl session)
{
- synchronized(session.getWriteRequestQueue())
+ synchronized (session.getWriteRequestQueue())
{
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
if (session.getScheduledWriteRequests() > 0)
{
@@ -642,7 +641,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
WriteRequest req;
//Should this be synchronized?
- synchronized(writeRequestQueue)
+ synchronized (writeRequestQueue)
{
while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
{
@@ -668,9 +667,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
{
- if( !session.isConnected() )
+ if (!session.isConnected())
{
- releaseWriteBuffers( session );
+ releaseWriteBuffers(session);
releaseSession(session);
continue;
}
@@ -678,14 +677,14 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
SelectionKey key = session.getWriteSelectionKey();
// Retry later if session is not yet fully initialized.
// (In case that Session.write() is called before addSession() is processed)
- if( key == null )
+ if (key == null)
{
- scheduleFlush( session );
+ scheduleFlush(session);
releaseSession(session);
continue;
}
// skip if channel is already closed
- if( !key.isValid() )
+ if (!key.isValid())
{
releaseSession(session);
continue;
@@ -698,11 +697,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
releaseSession(session);
}
}
- catch( IOException e )
+ catch (IOException e)
{
releaseSession(session);
- scheduleRemove( session );
- session.getFilterChain().fireExceptionCaught( session, e );
+ scheduleRemove(session);
+ session.getFilterChain().fireExceptionCaught(session, e);
}
}
@@ -714,32 +713,32 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
// Clear OP_WRITE
SelectionKey key = session.getWriteSelectionKey();
- synchronized(writeLock)
+ synchronized (writeLock)
{
- key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
}
SocketChannel ch = session.getChannel();
Queue writeRequestQueue = session.getWriteRequestQueue();
long totalFlushedBytes = 0;
- for( ; ; )
+ while (true)
{
WriteRequest req;
- synchronized( writeRequestQueue )
+ synchronized (writeRequestQueue)
{
- req = ( WriteRequest ) writeRequestQueue.first();
+ req = (WriteRequest) writeRequestQueue.first();
}
- if( req == null )
+ if (req == null)
{
break;
}
- ByteBuffer buf = ( ByteBuffer ) req.getMessage();
- if( buf.remaining() == 0 )
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0)
{
- synchronized( writeRequestQueue )
+ synchronized (writeRequestQueue)
{
writeRequestQueue.pop();
}
@@ -747,7 +746,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
session.increaseWrittenMessages();
buf.reset();
- session.getFilterChain().fireMessageSent( session, req );
+ session.getFilterChain().fireMessageSent(session, req);
continue;
}
@@ -755,23 +754,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
int writtenBytes = 0;
// Reported as DIRMINA-362
- //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
-// if (key.isWritable())
+ //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
+ if (key.isWritable())
{
- try
- {
- writtenBytes = ch.write(buf.buf());
- totalFlushedBytes += writtenBytes;
- }
- catch (IOException ioe)
- {
- throw ioe;
- }
+ writtenBytes = ch.write(buf.buf());
+ totalFlushedBytes += writtenBytes;
}
- if( writtenBytes > 0 )
+ if (writtenBytes > 0)
{
- session.increaseWrittenBytes( writtenBytes );
+ session.increaseWrittenBytes(writtenBytes);
}
if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
@@ -911,7 +903,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
if (writeSelector.keys().isEmpty())
{
- synchronized(writeLock)
+ synchronized (writeLock)
{
if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
@@ -963,7 +955,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
//System.out.println("ReadDebug:"+"Startup");
- for( ; ; )
+ for (; ;)
{
try
{
@@ -972,7 +964,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
doAddNewReader();
doUpdateTrafficMask();
- if( nKeys > 0 )
+ if (nKeys > 0)
{
//System.out.println("ReadDebug:"+nKeys + " keys from selector");
@@ -987,21 +979,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
doRemove();
notifyReadIdleness();
- if( selector.keys().isEmpty() )
+ if (selector.keys().isEmpty())
{
- synchronized(readLock)
+ synchronized (readLock)
{
- if( selector.keys().isEmpty() && newSessions.isEmpty() )
+ if (selector.keys().isEmpty() && newSessions.isEmpty())
{
readWorker = null;
try
{
selector.close();
}
- catch( IOException e )
+ catch (IOException e)
{
- ExceptionMonitor.getInstance().exceptionCaught( e );
+ ExceptionMonitor.getInstance().exceptionCaught(e);
}
finally
{
@@ -1013,17 +1005,17 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
}
}
- catch( Throwable t )
+ catch (Throwable t)
{
- ExceptionMonitor.getInstance().exceptionCaught( t );
+ ExceptionMonitor.getInstance().exceptionCaught(t);
try
{
- Thread.sleep( 1000 );
+ Thread.sleep(1000);
}
- catch( InterruptedException e1 )
+ catch (InterruptedException e1)
{
- ExceptionMonitor.getInstance().exceptionCaught( e1 );
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
diff --git a/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java
new file mode 100644
index 0000000000..16e74b17d2
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.vmpipe;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.BaseIoConnectorConfig;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.transport.vmpipe.support.VmPipe;
+import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker;
+import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl;
+import org.apache.mina.util.AnonymousSocketAddress;
+
+/**
+ * Connects to {@link IoHandler}s which is bound on the specified
+ * {@link VmPipeAddress}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class QpidVmPipeConnector extends VmPipeConnector
+{
+ private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
+ private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig()
+ {
+ public IoSessionConfig getSessionConfig()
+ {
+ return CONFIG;
+ }
+ };
+
+ /**
+ * Creates a new instance.
+ */
+ public QpidVmPipeConnector()
+ {
+ }
+
+ public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config )
+ {
+ return connect( address, null, handler, config );
+ }
+
+ public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config )
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+ if( ! ( address instanceof VmPipeAddress ) )
+ throw new IllegalArgumentException(
+ "address must be VmPipeAddress." );
+
+ if( config == null )
+ {
+ config = getDefaultConfig();
+ }
+
+ VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address );
+ if( entry == null )
+ {
+ return DefaultConnectFuture.newFailedFuture(
+ new IOException( "Endpoint unavailable: " + address ) );
+ }
+
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ VmPipeSessionImpl localSession =
+ new VmPipeSessionImpl(
+ this,
+ config,
+ getListeners(),
+ new Object(), // lock
+ new AnonymousSocketAddress(),
+ handler,
+ entry );
+
+ // initialize acceptor session
+ VmPipeSessionImpl remoteSession = localSession.getRemoteSession();
+ try
+ {
+ IoFilterChain filterChain = remoteSession.getFilterChain();
+ entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain );
+ entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain );
+ entry.getConfig().getThreadModel().buildFilterChain( filterChain );
+
+ // The following sentences don't throw any exceptions.
+ entry.getListeners().fireSessionCreated( remoteSession );
+ VmPipeIdleStatusChecker.getInstance().addSession( remoteSession );
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ remoteSession.close();
+ }
+
+
+ // initialize connector session
+ try
+ {
+ IoFilterChain filterChain = localSession.getFilterChain();
+ this.getFilterChainBuilder().buildFilterChain( filterChain );
+ config.getFilterChainBuilder().buildFilterChain( filterChain );
+ config.getThreadModel().buildFilterChain( filterChain );
+
+ // The following sentences don't throw any exceptions.
+ localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future );
+ getListeners().fireSessionCreated( localSession );
+ VmPipeIdleStatusChecker.getInstance().addSession( localSession);
+ }
+ catch( Throwable t )
+ {
+ future.setException( t );
+ }
+
+
+
+ return future;
+ }
+
+ public IoServiceConfig getDefaultConfig()
+ {
+ return defaultConfig;
+ }
+} \ No newline at end of file