diff options
Diffstat (limited to 'java/common')
4 files changed, 483 insertions, 144 deletions
diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java new file mode 100644 index 0000000000..810d12f472 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.codec; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; + +/** + * A {@link ProtocolDecoder} that cumulates the content of received + * buffers to a <em>cumulative buffer</em> to help users implement decoders. + * <p> + * If the received {@link ByteBuffer} is only a part of a message. + * decoders should cumulate received buffers to make a message complete or + * to postpone decoding until more buffers arrive. + * <p> + * Here is an example decoder that decodes CRLF terminated lines into + * <code>Command</code> objects: + * <pre> + * public class CRLFTerminatedCommandLineDecoder + * extends CumulativeProtocolDecoder { + * + * private Command parseCommand(ByteBuffer in) { + * // Convert the bytes in the specified buffer to a + * // Command object. + * ... + * } + * + * protected boolean doDecode(IoSession session, ByteBuffer in, + * ProtocolDecoderOutput out) + * throws Exception { + * + * // Remember the initial position. + * int start = in.position(); + * + * // Now find the first CRLF in the buffer. + * byte previous = 0; + * while (in.hasRemaining()) { + * byte current = in.get(); + * + * if (previous == '\r' && current == '\n') { + * // Remember the current position and limit. + * int position = in.position(); + * int limit = in.limit(); + * try { + * in.position(start); + * in.limit(position); + * // The bytes between in.position() and in.limit() + * // now contain a full CRLF terminated line. + * out.write(parseCommand(in.slice())); + * } finally { + * // Set the position to point right after the + * // detected line and set the limit to the old + * // one. + * in.position(position); + * in.limit(limit); + * } + * // Decoded one line; CumulativeProtocolDecoder will + * // call me again until I return false. So just + * // return true until there are no more lines in the + * // buffer. + * return true; + * } + * + * previous = current; + * } + * + * // Could not find CRLF in the buffer. Reset the initial + * // position to the one we recorded above. + * in.position(start); + * + * return false; + * } + * } + * </pre> + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 598285 $, $Date: 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) $ + */ +public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter { + + private static final String BUFFER = OurCumulativeProtocolDecoder.class + .getName() + + ".Buffer"; + + /** + * Creates a new instance. + */ + protected OurCumulativeProtocolDecoder() { + } + + /** + * Cumulates content of <tt>in</tt> into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> + * and the cumulative buffer is NOT compacted after decoding ends. + * + * @throws IllegalStateException if your <tt>doDecode()</tt> returned + * <tt>true</tt> not consuming the cumulative buffer. + */ + public void decode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception { + boolean usingSessionBuffer = true; + ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER); + // If we have a session buffer, append data to that; otherwise + // use the buffer read from the network directly. + if (buf != null) { + buf.put(in); + buf.flip(); + } else { + buf = in; + usingSessionBuffer = false; + } + + for (;;) { + int oldPos = buf.position(); + boolean decoded = doDecode(session, buf, out); + if (decoded) { + if (buf.position() == oldPos) { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed."); + } + + if (!buf.hasRemaining()) { + break; + } + } else { + break; + } + } + + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if (buf.hasRemaining()) { + storeRemainingInSession(buf, session); + } else { + if (usingSessionBuffer) + removeSessionBuffer(session); + } + } + + /** + * Implement this method to consume the specified cumulative buffer and + * decode its content into message(s). + * + * @param in the cumulative buffer + * @return <tt>true</tt> if and only if there's more to decode in the buffer + * and you want to have <tt>doDecode</tt> method invoked again. + * Return <tt>false</tt> if remaining data is not enough to decode, + * then this method will be invoked again when more data is cumulated. + * @throws Exception if cannot decode <tt>in</tt>. + */ + protected abstract boolean doDecode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception; + + /** + * Releases the cumulative buffer used by the specified <tt>session</tt>. + * Please don't forget to call <tt>super.dispose( session )</tt> when + * you override this method. + */ + public void dispose(IoSession session) throws Exception { + removeSessionBuffer(session); + } + + private void removeSessionBuffer(IoSession session) { + ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER); + if (buf != null) { + buf.release(); + } + } + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) { + ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity()); + remainingBuf.setAutoExpand(true); + remainingBuf.order(buf.order()); + remainingBuf.put(buf); + session.setAttribute(BUFFER, remainingBuf); + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java index 202ac1a530..cb24102edd 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java @@ -376,8 +376,7 @@ public class MultiThreadSocketConnector extends SocketConnector // Set the ConnectFuture of the specified session, which will be // removed and notified by AbstractIoFilterChain eventually. -// session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); - session.setAttribute(AbstractIoFilterChain.class.getName() + ".connectFuture", connectFuture); + session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); // Forward the remaining process to the SocketIoProcessor. session.getIoProcessor().addNew(session); diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java index 11c54bb248..03838ca3f1 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java @@ -66,9 +66,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); - /** - * @noinspection FieldAccessedSynchronizedAndUnsynchronized - */ + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ private volatile Selector selector, writeSelector; private final Queue newSessions = new Queue(); @@ -90,11 +88,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor this.executor = executor; } - void addNew( SocketSessionImpl session ) throws IOException + void addNew(SocketSessionImpl session) throws IOException { - synchronized( newSessions ) + synchronized (newSessions) { - newSessions.push( session ); + newSessions.push(session); } startupWorker(); @@ -103,16 +101,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor writeSelector.wakeup(); } - void remove( SocketSessionImpl session ) throws IOException + void remove(SocketSessionImpl session) throws IOException { - scheduleRemove( session ); + scheduleRemove(session); startupWorker(); selector.wakeup(); } private void startupWorker() throws IOException { - synchronized(readLock) + synchronized (readLock) { if (readWorker == null) { @@ -122,7 +120,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } - synchronized(writeLock) + synchronized (writeLock) { if (writeWorker == null) { @@ -134,38 +132,38 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } - void flush( SocketSessionImpl session ) + void flush(SocketSessionImpl session) { - scheduleFlush( session ); + scheduleFlush(session); Selector selector = this.writeSelector; - if( selector != null ) + if (selector != null) { selector.wakeup(); } } - void updateTrafficMask( SocketSessionImpl session ) + void updateTrafficMask(SocketSessionImpl session) { - scheduleTrafficControl( session ); + scheduleTrafficControl(session); Selector selector = this.selector; - if( selector != null ) + if (selector != null) { selector.wakeup(); } } - private void scheduleRemove( SocketSessionImpl session ) + private void scheduleRemove(SocketSessionImpl session) { - synchronized( removingSessions ) + synchronized (removingSessions) { - removingSessions.push( session ); + removingSessions.push(session); } } - private void scheduleFlush( SocketSessionImpl session ) + private void scheduleFlush(SocketSessionImpl session) { - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { //if flushingSessions grows to contain Integer.MAX_VALUE sessions // then this will fail. @@ -176,31 +174,31 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } - private void scheduleTrafficControl( SocketSessionImpl session ) + private void scheduleTrafficControl(SocketSessionImpl session) { - synchronized( trafficControllingSessions ) + synchronized (trafficControllingSessions) { - trafficControllingSessions.push( session ); + trafficControllingSessions.push(session); } } private void doAddNewReader() throws InterruptedException { - if( newSessions.isEmpty() ) + if (newSessions.isEmpty()) { return; } - for( ; ; ) + for (; ;) { MultiThreadSocketSessionImpl session; - synchronized( newSessions ) + synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } - if( session == null ) + if (session == null) { break; } @@ -211,21 +209,20 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { - ch.configureBlocking( false ); - session.setSelectionKey( ch.register( selector, - SelectionKey.OP_READ, - session ) ); - + ch.configureBlocking(false); + session.setSelectionKey(ch.register(selector, + SelectionKey.OP_READ, + session)); //System.out.println("ReadDebug:"+"Awaiting Registration"); session.awaitRegistration(); sessionCreated(session); } - catch( IOException e ) + catch (IOException e) { // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } } } @@ -242,7 +239,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { MultiThreadSocketSessionImpl session; - synchronized(newSessions) + synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } @@ -257,7 +254,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { ch.configureBlocking(false); - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { flushingSessionsSet.add(session); } @@ -275,17 +272,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } } } - private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException { MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; - synchronized(newSessions) + synchronized (newSessions) { if (!session.created()) { @@ -294,7 +290,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). - session.getServiceListeners().fireSessionCreated( session ); + session.getServiceListeners().fireSessionCreated(session); session.doneCreation(); } @@ -303,21 +299,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void doRemove() { - if( removingSessions.isEmpty() ) + if (removingSessions.isEmpty()) { return; } - for( ; ; ) + for (; ;) { MultiThreadSocketSessionImpl session; - synchronized( removingSessions ) + synchronized (removingSessions) { session = (MultiThreadSocketSessionImpl) removingSessions.pop(); } - if( session == null ) + if (session == null) { break; } @@ -330,7 +326,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // (In case that Session.close() is called before addSession() is processed) if (key == null || writeKey == null) { - scheduleRemove( session ); + scheduleRemove(session); break; } // skip if channel is already closed @@ -342,24 +338,24 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); - synchronized(readLock) + synchronized (readLock) { key.cancel(); } - synchronized(writeLock) + synchronized (writeLock) { writeKey.cancel(); } ch.close(); } - catch( IOException e ) + catch (IOException e) { - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } finally { - releaseWriteBuffers( session ); - session.getServiceListeners().fireSessionDestroyed( session ); + releaseWriteBuffers(session); + session.getServiceListeners().fireSessionDestroyed(session); } } } @@ -368,16 +364,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { Iterator it = selectedKeys.iterator(); - while( it.hasNext() ) + while (it.hasNext()) { - SelectionKey key = ( SelectionKey ) it.next(); + SelectionKey key = (SelectionKey) it.next(); MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); - synchronized(readLock) + synchronized (readLock) { if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) { - read( session ); + read(session); } } @@ -395,7 +391,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor SelectionKey key = (SelectionKey) it.next(); SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - synchronized(writeLock) + synchronized (writeLock) { if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) { @@ -403,7 +399,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // Clear OP_WRITE key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { flushingSessions.offer(session); } @@ -424,7 +420,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor int totalReadBytes = 0; - for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;) + while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) { ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); SocketChannel ch = session.getChannel(); @@ -482,6 +478,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor scheduleRemove(session); } session.getFilterChain().fireExceptionCaught(session, e); + + //Stop Reading this session. + return; } finally { @@ -507,12 +506,12 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { lastIdleReadCheckTime = currentTime; Set keys = selector.keys(); - if( keys != null ) + if (keys != null) { - for( Iterator it = keys.iterator(); it.hasNext(); ) + for (Iterator it = keys.iterator(); it.hasNext();) { - SelectionKey key = ( SelectionKey ) it.next(); - SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); notifyReadIdleness(session, currentTime); } } @@ -542,15 +541,15 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void notifyReadIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ), - IdleStatus.BOTH_IDLE, - Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) ); + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.READER_IDLE ), - IdleStatus.READER_IDLE, - Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) ); + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.READER_IDLE), + IdleStatus.READER_IDLE, + Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); notifyWriteTimeout(session, currentTime, session .getWriteTimeoutInMillis(), session.getLastWriteTime()); @@ -559,51 +558,51 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( - session, currentTime, + session, currentTime, session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( session, currentTime, - session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ), - IdleStatus.WRITER_IDLE, - Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) ); + session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), + IdleStatus.WRITER_IDLE, + Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); - notifyWriteTimeout( session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime() ); + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); } - private void notifyIdleness0( SocketSessionImpl session, long currentTime, - long idleTime, IdleStatus status, - long lastIoTime ) + private void notifyIdleness0(SocketSessionImpl session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime) { - if( idleTime > 0 && lastIoTime != 0 - && ( currentTime - lastIoTime ) >= idleTime ) + if (idleTime > 0 && lastIoTime != 0 + && (currentTime - lastIoTime) >= idleTime) { - session.increaseIdleCount( status ); - session.getFilterChain().fireSessionIdle( session, status ); + session.increaseIdleCount(status); + session.getFilterChain().fireSessionIdle(session, status); } } - private void notifyWriteTimeout( SocketSessionImpl session, - long currentTime, - long writeTimeout, long lastIoTime ) + private void notifyWriteTimeout(SocketSessionImpl session, + long currentTime, + long writeTimeout, long lastIoTime) { MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; SelectionKey key = sesh.getWriteSelectionKey(); - synchronized(writeLock) - { - if( writeTimeout > 0 - && ( currentTime - lastIoTime ) >= writeTimeout - && key != null && key.isValid() - && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 ) + synchronized (writeLock) { - session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() ); + if (writeTimeout > 0 + && (currentTime - lastIoTime) >= writeTimeout + && key != null && key.isValid() + && (key.interestOps() & SelectionKey.OP_WRITE) != 0) + { + session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); + } } } - } private SocketSessionImpl getNextFlushingSession() { @@ -612,9 +611,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void releaseSession(SocketSessionImpl session) { - synchronized(session.getWriteRequestQueue()) + synchronized (session.getWriteRequestQueue()) { - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { if (session.getScheduledWriteRequests() > 0) { @@ -642,7 +641,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor WriteRequest req; //Should this be synchronized? - synchronized(writeRequestQueue) + synchronized (writeRequestQueue) { while ((req = (WriteRequest) writeRequestQueue.pop()) != null) { @@ -668,9 +667,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) { - if( !session.isConnected() ) + if (!session.isConnected()) { - releaseWriteBuffers( session ); + releaseWriteBuffers(session); releaseSession(session); continue; } @@ -678,14 +677,14 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor SelectionKey key = session.getWriteSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.write() is called before addSession() is processed) - if( key == null ) + if (key == null) { - scheduleFlush( session ); + scheduleFlush(session); releaseSession(session); continue; } // skip if channel is already closed - if( !key.isValid() ) + if (!key.isValid()) { releaseSession(session); continue; @@ -698,11 +697,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor releaseSession(session); } } - catch( IOException e ) + catch (IOException e) { releaseSession(session); - scheduleRemove( session ); - session.getFilterChain().fireExceptionCaught( session, e ); + scheduleRemove(session); + session.getFilterChain().fireExceptionCaught(session, e); } } @@ -714,32 +713,32 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; // Clear OP_WRITE SelectionKey key = session.getWriteSelectionKey(); - synchronized(writeLock) + synchronized (writeLock) { - key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) ); + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } SocketChannel ch = session.getChannel(); Queue writeRequestQueue = session.getWriteRequestQueue(); long totalFlushedBytes = 0; - for( ; ; ) + while (true) { WriteRequest req; - synchronized( writeRequestQueue ) + synchronized (writeRequestQueue) { - req = ( WriteRequest ) writeRequestQueue.first(); + req = (WriteRequest) writeRequestQueue.first(); } - if( req == null ) + if (req == null) { break; } - ByteBuffer buf = ( ByteBuffer ) req.getMessage(); - if( buf.remaining() == 0 ) + ByteBuffer buf = (ByteBuffer) req.getMessage(); + if (buf.remaining() == 0) { - synchronized( writeRequestQueue ) + synchronized (writeRequestQueue) { writeRequestQueue.pop(); } @@ -747,7 +746,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor session.increaseWrittenMessages(); buf.reset(); - session.getFilterChain().fireMessageSent( session, req ); + session.getFilterChain().fireMessageSent(session, req); continue; } @@ -755,23 +754,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor int writtenBytes = 0; // Reported as DIRMINA-362 - //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. -// if (key.isWritable()) + //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. + if (key.isWritable()) { - try - { - writtenBytes = ch.write(buf.buf()); - totalFlushedBytes += writtenBytes; - } - catch (IOException ioe) - { - throw ioe; - } + writtenBytes = ch.write(buf.buf()); + totalFlushedBytes += writtenBytes; } - if( writtenBytes > 0 ) + if (writtenBytes > 0) { - session.increaseWrittenBytes( writtenBytes ); + session.increaseWrittenBytes(writtenBytes); } if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) @@ -911,7 +903,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor if (writeSelector.keys().isEmpty()) { - synchronized(writeLock) + synchronized (writeLock) { if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) @@ -963,7 +955,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); //System.out.println("ReadDebug:"+"Startup"); - for( ; ; ) + for (; ;) { try { @@ -972,7 +964,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor doAddNewReader(); doUpdateTrafficMask(); - if( nKeys > 0 ) + if (nKeys > 0) { //System.out.println("ReadDebug:"+nKeys + " keys from selector"); @@ -987,21 +979,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor doRemove(); notifyReadIdleness(); - if( selector.keys().isEmpty() ) + if (selector.keys().isEmpty()) { - synchronized(readLock) + synchronized (readLock) { - if( selector.keys().isEmpty() && newSessions.isEmpty() ) + if (selector.keys().isEmpty() && newSessions.isEmpty()) { readWorker = null; try { selector.close(); } - catch( IOException e ) + catch (IOException e) { - ExceptionMonitor.getInstance().exceptionCaught( e ); + ExceptionMonitor.getInstance().exceptionCaught(e); } finally { @@ -1013,17 +1005,17 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } } - catch( Throwable t ) + catch (Throwable t) { - ExceptionMonitor.getInstance().exceptionCaught( t ); + ExceptionMonitor.getInstance().exceptionCaught(t); try { - Thread.sleep( 1000 ); + Thread.sleep(1000); } - catch( InterruptedException e1 ) + catch (InterruptedException e1) { - ExceptionMonitor.getInstance().exceptionCaught( e1 ); + ExceptionMonitor.getInstance().exceptionCaught(e1); } } } diff --git a/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java new file mode 100644 index 0000000000..16e74b17d2 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.vmpipe; + +import java.io.IOException; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.common.support.BaseIoConnectorConfig; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.transport.vmpipe.support.VmPipe; +import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker; +import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl; +import org.apache.mina.util.AnonymousSocketAddress; + +/** + * Connects to {@link IoHandler}s which is bound on the specified + * {@link VmPipeAddress}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class QpidVmPipeConnector extends VmPipeConnector +{ + private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {}; + private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig() + { + public IoSessionConfig getSessionConfig() + { + return CONFIG; + } + }; + + /** + * Creates a new instance. + */ + public QpidVmPipeConnector() + { + } + + public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config ) + { + return connect( address, null, handler, config ); + } + + public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config ) + { + if( address == null ) + throw new NullPointerException( "address" ); + if( handler == null ) + throw new NullPointerException( "handler" ); + if( ! ( address instanceof VmPipeAddress ) ) + throw new IllegalArgumentException( + "address must be VmPipeAddress." ); + + if( config == null ) + { + config = getDefaultConfig(); + } + + VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address ); + if( entry == null ) + { + return DefaultConnectFuture.newFailedFuture( + new IOException( "Endpoint unavailable: " + address ) ); + } + + DefaultConnectFuture future = new DefaultConnectFuture(); + VmPipeSessionImpl localSession = + new VmPipeSessionImpl( + this, + config, + getListeners(), + new Object(), // lock + new AnonymousSocketAddress(), + handler, + entry ); + + // initialize acceptor session + VmPipeSessionImpl remoteSession = localSession.getRemoteSession(); + try + { + IoFilterChain filterChain = remoteSession.getFilterChain(); + entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + entry.getListeners().fireSessionCreated( remoteSession ); + VmPipeIdleStatusChecker.getInstance().addSession( remoteSession ); + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + remoteSession.close(); + } + + + // initialize connector session + try + { + IoFilterChain filterChain = localSession.getFilterChain(); + this.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future ); + getListeners().fireSessionCreated( localSession ); + VmPipeIdleStatusChecker.getInstance().addSession( localSession); + } + catch( Throwable t ) + { + future.setException( t ); + } + + + + return future; + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } +}
\ No newline at end of file |
