diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-11-29 18:25:21 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-11-29 18:25:21 +0000 |
commit | 6e42f5723c06adc28679d11bffd4fa0ac3e5af85 (patch) | |
tree | dc931162a81ae4866e1c5b97c3dfae7658cf89b2 | |
parent | 8ccbc251b2b4663919b6e0293612aa2078679394 (diff) | |
download | qpid-python-6e42f5723c06adc28679d11bffd4fa0ac3e5af85.tar.gz |
QPID-564 QPID-92 Tidied up a few points and fixed infinite loop in Read IO Thread
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1@599533 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java | 276 |
1 files changed, 134 insertions, 142 deletions
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java index 11c54bb248..03838ca3f1 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java @@ -66,9 +66,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); - /** - * @noinspection FieldAccessedSynchronizedAndUnsynchronized - */ + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ private volatile Selector selector, writeSelector; private final Queue newSessions = new Queue(); @@ -90,11 +88,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor this.executor = executor; } - void addNew( SocketSessionImpl session ) throws IOException + void addNew(SocketSessionImpl session) throws IOException { - synchronized( newSessions ) + synchronized (newSessions) { - newSessions.push( session ); + newSessions.push(session); } startupWorker(); @@ -103,16 +101,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor writeSelector.wakeup(); } - void remove( SocketSessionImpl session ) throws IOException + void remove(SocketSessionImpl session) throws IOException { - scheduleRemove( session ); + scheduleRemove(session); startupWorker(); selector.wakeup(); } private void startupWorker() throws IOException { - synchronized(readLock) + synchronized (readLock) { if (readWorker == null) { @@ -122,7 +120,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } - synchronized(writeLock) + synchronized (writeLock) { if (writeWorker == null) { @@ -134,38 +132,38 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } - void flush( SocketSessionImpl session ) + void flush(SocketSessionImpl session) { - scheduleFlush( session ); + scheduleFlush(session); Selector selector = this.writeSelector; - if( selector != null ) + if (selector != null) { selector.wakeup(); } } - void updateTrafficMask( SocketSessionImpl session ) + void updateTrafficMask(SocketSessionImpl session) { - scheduleTrafficControl( session ); + scheduleTrafficControl(session); Selector selector = this.selector; - if( selector != null ) + if (selector != null) { selector.wakeup(); } } - private void scheduleRemove( SocketSessionImpl session ) + private void scheduleRemove(SocketSessionImpl session) { - synchronized( removingSessions ) + synchronized (removingSessions) { - removingSessions.push( session ); + removingSessions.push(session); } } - private void scheduleFlush( SocketSessionImpl session ) + private void scheduleFlush(SocketSessionImpl session) { - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { //if flushingSessions grows to contain Integer.MAX_VALUE sessions // then this will fail. @@ -176,31 +174,31 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } - private void scheduleTrafficControl( SocketSessionImpl session ) + private void scheduleTrafficControl(SocketSessionImpl session) { - synchronized( trafficControllingSessions ) + synchronized (trafficControllingSessions) { - trafficControllingSessions.push( session ); + trafficControllingSessions.push(session); } } private void doAddNewReader() throws InterruptedException { - if( newSessions.isEmpty() ) + if (newSessions.isEmpty()) { return; } - for( ; ; ) + for (; ;) { MultiThreadSocketSessionImpl session; - synchronized( newSessions ) + synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } - if( session == null ) + if (session == null) { break; } @@ -211,21 +209,20 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { - ch.configureBlocking( false ); - session.setSelectionKey( ch.register( selector, - SelectionKey.OP_READ, - session ) ); - + ch.configureBlocking(false); + session.setSelectionKey(ch.register(selector, + SelectionKey.OP_READ, + session)); //System.out.println("ReadDebug:"+"Awaiting Registration"); session.awaitRegistration(); sessionCreated(session); } - catch( IOException e ) + catch (IOException e) { // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } } } @@ -242,7 +239,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { MultiThreadSocketSessionImpl session; - synchronized(newSessions) + synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } @@ -257,7 +254,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { ch.configureBlocking(false); - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { flushingSessionsSet.add(session); } @@ -275,17 +272,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } } } - private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException { MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; - synchronized(newSessions) + synchronized (newSessions) { if (!session.created()) { @@ -294,7 +290,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). - session.getServiceListeners().fireSessionCreated( session ); + session.getServiceListeners().fireSessionCreated(session); session.doneCreation(); } @@ -303,21 +299,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void doRemove() { - if( removingSessions.isEmpty() ) + if (removingSessions.isEmpty()) { return; } - for( ; ; ) + for (; ;) { MultiThreadSocketSessionImpl session; - synchronized( removingSessions ) + synchronized (removingSessions) { session = (MultiThreadSocketSessionImpl) removingSessions.pop(); } - if( session == null ) + if (session == null) { break; } @@ -330,7 +326,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // (In case that Session.close() is called before addSession() is processed) if (key == null || writeKey == null) { - scheduleRemove( session ); + scheduleRemove(session); break; } // skip if channel is already closed @@ -342,24 +338,24 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); - synchronized(readLock) + synchronized (readLock) { key.cancel(); } - synchronized(writeLock) + synchronized (writeLock) { writeKey.cancel(); } ch.close(); } - catch( IOException e ) + catch (IOException e) { - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } finally { - releaseWriteBuffers( session ); - session.getServiceListeners().fireSessionDestroyed( session ); + releaseWriteBuffers(session); + session.getServiceListeners().fireSessionDestroyed(session); } } } @@ -368,16 +364,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { Iterator it = selectedKeys.iterator(); - while( it.hasNext() ) + while (it.hasNext()) { - SelectionKey key = ( SelectionKey ) it.next(); + SelectionKey key = (SelectionKey) it.next(); MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); - synchronized(readLock) + synchronized (readLock) { if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) { - read( session ); + read(session); } } @@ -395,7 +391,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor SelectionKey key = (SelectionKey) it.next(); SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - synchronized(writeLock) + synchronized (writeLock) { if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) { @@ -403,7 +399,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // Clear OP_WRITE key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { flushingSessions.offer(session); } @@ -424,7 +420,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor int totalReadBytes = 0; - for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;) + while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) { ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); SocketChannel ch = session.getChannel(); @@ -482,6 +478,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor scheduleRemove(session); } session.getFilterChain().fireExceptionCaught(session, e); + + //Stop Reading this session. + return; } finally { @@ -507,12 +506,12 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { lastIdleReadCheckTime = currentTime; Set keys = selector.keys(); - if( keys != null ) + if (keys != null) { - for( Iterator it = keys.iterator(); it.hasNext(); ) + for (Iterator it = keys.iterator(); it.hasNext();) { - SelectionKey key = ( SelectionKey ) it.next(); - SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); notifyReadIdleness(session, currentTime); } } @@ -542,15 +541,15 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void notifyReadIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ), - IdleStatus.BOTH_IDLE, - Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) ); + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.READER_IDLE ), - IdleStatus.READER_IDLE, - Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) ); + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.READER_IDLE), + IdleStatus.READER_IDLE, + Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); notifyWriteTimeout(session, currentTime, session .getWriteTimeoutInMillis(), session.getLastWriteTime()); @@ -559,51 +558,51 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( - session, currentTime, + session, currentTime, session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( session, currentTime, - session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ), - IdleStatus.WRITER_IDLE, - Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) ); + session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), + IdleStatus.WRITER_IDLE, + Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); - notifyWriteTimeout( session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime() ); + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); } - private void notifyIdleness0( SocketSessionImpl session, long currentTime, - long idleTime, IdleStatus status, - long lastIoTime ) + private void notifyIdleness0(SocketSessionImpl session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime) { - if( idleTime > 0 && lastIoTime != 0 - && ( currentTime - lastIoTime ) >= idleTime ) + if (idleTime > 0 && lastIoTime != 0 + && (currentTime - lastIoTime) >= idleTime) { - session.increaseIdleCount( status ); - session.getFilterChain().fireSessionIdle( session, status ); + session.increaseIdleCount(status); + session.getFilterChain().fireSessionIdle(session, status); } } - private void notifyWriteTimeout( SocketSessionImpl session, - long currentTime, - long writeTimeout, long lastIoTime ) + private void notifyWriteTimeout(SocketSessionImpl session, + long currentTime, + long writeTimeout, long lastIoTime) { MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; SelectionKey key = sesh.getWriteSelectionKey(); - synchronized(writeLock) - { - if( writeTimeout > 0 - && ( currentTime - lastIoTime ) >= writeTimeout - && key != null && key.isValid() - && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 ) + synchronized (writeLock) { - session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() ); + if (writeTimeout > 0 + && (currentTime - lastIoTime) >= writeTimeout + && key != null && key.isValid() + && (key.interestOps() & SelectionKey.OP_WRITE) != 0) + { + session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); + } } } - } private SocketSessionImpl getNextFlushingSession() { @@ -612,9 +611,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void releaseSession(SocketSessionImpl session) { - synchronized(session.getWriteRequestQueue()) + synchronized (session.getWriteRequestQueue()) { - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { if (session.getScheduledWriteRequests() > 0) { @@ -642,7 +641,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor WriteRequest req; //Should this be synchronized? - synchronized(writeRequestQueue) + synchronized (writeRequestQueue) { while ((req = (WriteRequest) writeRequestQueue.pop()) != null) { @@ -668,9 +667,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) { - if( !session.isConnected() ) + if (!session.isConnected()) { - releaseWriteBuffers( session ); + releaseWriteBuffers(session); releaseSession(session); continue; } @@ -678,14 +677,14 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor SelectionKey key = session.getWriteSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.write() is called before addSession() is processed) - if( key == null ) + if (key == null) { - scheduleFlush( session ); + scheduleFlush(session); releaseSession(session); continue; } // skip if channel is already closed - if( !key.isValid() ) + if (!key.isValid()) { releaseSession(session); continue; @@ -698,11 +697,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor releaseSession(session); } } - catch( IOException e ) + catch (IOException e) { releaseSession(session); - scheduleRemove( session ); - session.getFilterChain().fireExceptionCaught( session, e ); + scheduleRemove(session); + session.getFilterChain().fireExceptionCaught(session, e); } } @@ -714,32 +713,32 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; // Clear OP_WRITE SelectionKey key = session.getWriteSelectionKey(); - synchronized(writeLock) + synchronized (writeLock) { - key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) ); + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } SocketChannel ch = session.getChannel(); Queue writeRequestQueue = session.getWriteRequestQueue(); long totalFlushedBytes = 0; - for( ; ; ) + while (true) { WriteRequest req; - synchronized( writeRequestQueue ) + synchronized (writeRequestQueue) { - req = ( WriteRequest ) writeRequestQueue.first(); + req = (WriteRequest) writeRequestQueue.first(); } - if( req == null ) + if (req == null) { break; } - ByteBuffer buf = ( ByteBuffer ) req.getMessage(); - if( buf.remaining() == 0 ) + ByteBuffer buf = (ByteBuffer) req.getMessage(); + if (buf.remaining() == 0) { - synchronized( writeRequestQueue ) + synchronized (writeRequestQueue) { writeRequestQueue.pop(); } @@ -747,7 +746,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor session.increaseWrittenMessages(); buf.reset(); - session.getFilterChain().fireMessageSent( session, req ); + session.getFilterChain().fireMessageSent(session, req); continue; } @@ -755,23 +754,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor int writtenBytes = 0; // Reported as DIRMINA-362 - //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. -// if (key.isWritable()) + //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. + if (key.isWritable()) { - try - { - writtenBytes = ch.write(buf.buf()); - totalFlushedBytes += writtenBytes; - } - catch (IOException ioe) - { - throw ioe; - } + writtenBytes = ch.write(buf.buf()); + totalFlushedBytes += writtenBytes; } - if( writtenBytes > 0 ) + if (writtenBytes > 0) { - session.increaseWrittenBytes( writtenBytes ); + session.increaseWrittenBytes(writtenBytes); } if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) @@ -911,7 +903,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor if (writeSelector.keys().isEmpty()) { - synchronized(writeLock) + synchronized (writeLock) { if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) @@ -963,7 +955,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); //System.out.println("ReadDebug:"+"Startup"); - for( ; ; ) + for (; ;) { try { @@ -972,7 +964,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor doAddNewReader(); doUpdateTrafficMask(); - if( nKeys > 0 ) + if (nKeys > 0) { //System.out.println("ReadDebug:"+nKeys + " keys from selector"); @@ -987,21 +979,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor doRemove(); notifyReadIdleness(); - if( selector.keys().isEmpty() ) + if (selector.keys().isEmpty()) { - synchronized(readLock) + synchronized (readLock) { - if( selector.keys().isEmpty() && newSessions.isEmpty() ) + if (selector.keys().isEmpty() && newSessions.isEmpty()) { readWorker = null; try { selector.close(); } - catch( IOException e ) + catch (IOException e) { - ExceptionMonitor.getInstance().exceptionCaught( e ); + ExceptionMonitor.getInstance().exceptionCaught(e); } finally { @@ -1013,17 +1005,17 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } } - catch( Throwable t ) + catch (Throwable t) { - ExceptionMonitor.getInstance().exceptionCaught( t ); + ExceptionMonitor.getInstance().exceptionCaught(t); try { - Thread.sleep( 1000 ); + Thread.sleep(1000); } - catch( InterruptedException e1 ) + catch (InterruptedException e1) { - ExceptionMonitor.getInstance().exceptionCaught( e1 ); + ExceptionMonitor.getInstance().exceptionCaught(e1); } } } |