summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-11-29 18:25:21 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-11-29 18:25:21 +0000
commit6e42f5723c06adc28679d11bffd4fa0ac3e5af85 (patch)
treedc931162a81ae4866e1c5b97c3dfae7658cf89b2
parent8ccbc251b2b4663919b6e0293612aa2078679394 (diff)
downloadqpid-python-6e42f5723c06adc28679d11bffd4fa0ac3e5af85.tar.gz
QPID-564 QPID-92 Tidied up a few points and fixed infinite loop in Read IO Thread
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1@599533 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java276
1 files changed, 134 insertions, 142 deletions
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
index 11c54bb248..03838ca3f1 100644
--- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
@@ -66,9 +66,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
- /**
- * @noinspection FieldAccessedSynchronizedAndUnsynchronized
- */
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
private volatile Selector selector, writeSelector;
private final Queue newSessions = new Queue();
@@ -90,11 +88,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
this.executor = executor;
}
- void addNew( SocketSessionImpl session ) throws IOException
+ void addNew(SocketSessionImpl session) throws IOException
{
- synchronized( newSessions )
+ synchronized (newSessions)
{
- newSessions.push( session );
+ newSessions.push(session);
}
startupWorker();
@@ -103,16 +101,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
writeSelector.wakeup();
}
- void remove( SocketSessionImpl session ) throws IOException
+ void remove(SocketSessionImpl session) throws IOException
{
- scheduleRemove( session );
+ scheduleRemove(session);
startupWorker();
selector.wakeup();
}
private void startupWorker() throws IOException
{
- synchronized(readLock)
+ synchronized (readLock)
{
if (readWorker == null)
{
@@ -122,7 +120,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
}
- synchronized(writeLock)
+ synchronized (writeLock)
{
if (writeWorker == null)
{
@@ -134,38 +132,38 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
- void flush( SocketSessionImpl session )
+ void flush(SocketSessionImpl session)
{
- scheduleFlush( session );
+ scheduleFlush(session);
Selector selector = this.writeSelector;
- if( selector != null )
+ if (selector != null)
{
selector.wakeup();
}
}
- void updateTrafficMask( SocketSessionImpl session )
+ void updateTrafficMask(SocketSessionImpl session)
{
- scheduleTrafficControl( session );
+ scheduleTrafficControl(session);
Selector selector = this.selector;
- if( selector != null )
+ if (selector != null)
{
selector.wakeup();
}
}
- private void scheduleRemove( SocketSessionImpl session )
+ private void scheduleRemove(SocketSessionImpl session)
{
- synchronized( removingSessions )
+ synchronized (removingSessions)
{
- removingSessions.push( session );
+ removingSessions.push(session);
}
}
- private void scheduleFlush( SocketSessionImpl session )
+ private void scheduleFlush(SocketSessionImpl session)
{
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
//if flushingSessions grows to contain Integer.MAX_VALUE sessions
// then this will fail.
@@ -176,31 +174,31 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
}
- private void scheduleTrafficControl( SocketSessionImpl session )
+ private void scheduleTrafficControl(SocketSessionImpl session)
{
- synchronized( trafficControllingSessions )
+ synchronized (trafficControllingSessions)
{
- trafficControllingSessions.push( session );
+ trafficControllingSessions.push(session);
}
}
private void doAddNewReader() throws InterruptedException
{
- if( newSessions.isEmpty() )
+ if (newSessions.isEmpty())
{
return;
}
- for( ; ; )
+ for (; ;)
{
MultiThreadSocketSessionImpl session;
- synchronized( newSessions )
+ synchronized (newSessions)
{
session = (MultiThreadSocketSessionImpl) newSessions.peek();
}
- if( session == null )
+ if (session == null)
{
break;
}
@@ -211,21 +209,20 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
try
{
- ch.configureBlocking( false );
- session.setSelectionKey( ch.register( selector,
- SelectionKey.OP_READ,
- session ) );
-
+ ch.configureBlocking(false);
+ session.setSelectionKey(ch.register(selector,
+ SelectionKey.OP_READ,
+ session));
//System.out.println("ReadDebug:"+"Awaiting Registration");
session.awaitRegistration();
sessionCreated(session);
}
- catch( IOException e )
+ catch (IOException e)
{
// Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
- session.getFilterChain().fireExceptionCaught( session, e );
+ session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
@@ -242,7 +239,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
{
MultiThreadSocketSessionImpl session;
- synchronized(newSessions)
+ synchronized (newSessions)
{
session = (MultiThreadSocketSessionImpl) newSessions.peek();
}
@@ -257,7 +254,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
try
{
ch.configureBlocking(false);
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
flushingSessionsSet.add(session);
}
@@ -275,17 +272,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
- session.getFilterChain().fireExceptionCaught( session, e );
+ session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
-
private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
{
MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
- synchronized(newSessions)
+ synchronized (newSessions)
{
if (!session.created())
{
@@ -294,7 +290,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
- session.getServiceListeners().fireSessionCreated( session );
+ session.getServiceListeners().fireSessionCreated(session);
session.doneCreation();
}
@@ -303,21 +299,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void doRemove()
{
- if( removingSessions.isEmpty() )
+ if (removingSessions.isEmpty())
{
return;
}
- for( ; ; )
+ for (; ;)
{
MultiThreadSocketSessionImpl session;
- synchronized( removingSessions )
+ synchronized (removingSessions)
{
session = (MultiThreadSocketSessionImpl) removingSessions.pop();
}
- if( session == null )
+ if (session == null)
{
break;
}
@@ -330,7 +326,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// (In case that Session.close() is called before addSession() is processed)
if (key == null || writeKey == null)
{
- scheduleRemove( session );
+ scheduleRemove(session);
break;
}
// skip if channel is already closed
@@ -342,24 +338,24 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
try
{
//System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
- synchronized(readLock)
+ synchronized (readLock)
{
key.cancel();
}
- synchronized(writeLock)
+ synchronized (writeLock)
{
writeKey.cancel();
}
ch.close();
}
- catch( IOException e )
+ catch (IOException e)
{
- session.getFilterChain().fireExceptionCaught( session, e );
+ session.getFilterChain().fireExceptionCaught(session, e);
}
finally
{
- releaseWriteBuffers( session );
- session.getServiceListeners().fireSessionDestroyed( session );
+ releaseWriteBuffers(session);
+ session.getServiceListeners().fireSessionDestroyed(session);
}
}
}
@@ -368,16 +364,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
{
Iterator it = selectedKeys.iterator();
- while( it.hasNext() )
+ while (it.hasNext())
{
- SelectionKey key = ( SelectionKey ) it.next();
+ SelectionKey key = (SelectionKey) it.next();
MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
- synchronized(readLock)
+ synchronized (readLock)
{
if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
{
- read( session );
+ read(session);
}
}
@@ -395,7 +391,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
SelectionKey key = (SelectionKey) it.next();
SocketSessionImpl session = (SocketSessionImpl) key.attachment();
- synchronized(writeLock)
+ synchronized (writeLock)
{
if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
{
@@ -403,7 +399,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// Clear OP_WRITE
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
flushingSessions.offer(session);
}
@@ -424,7 +420,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
int totalReadBytes = 0;
- for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;)
+ while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION)
{
ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
SocketChannel ch = session.getChannel();
@@ -482,6 +478,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
scheduleRemove(session);
}
session.getFilterChain().fireExceptionCaught(session, e);
+
+ //Stop Reading this session.
+ return;
}
finally
{
@@ -507,12 +506,12 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
{
lastIdleReadCheckTime = currentTime;
Set keys = selector.keys();
- if( keys != null )
+ if (keys != null)
{
- for( Iterator it = keys.iterator(); it.hasNext(); )
+ for (Iterator it = keys.iterator(); it.hasNext();)
{
- SelectionKey key = ( SelectionKey ) it.next();
- SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
notifyReadIdleness(session, currentTime);
}
}
@@ -542,15 +541,15 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
{
notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
- IdleStatus.BOTH_IDLE,
- Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE,
+ Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
- IdleStatus.READER_IDLE,
- Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
+ IdleStatus.READER_IDLE,
+ Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
notifyWriteTimeout(session, currentTime, session
.getWriteTimeoutInMillis(), session.getLastWriteTime());
@@ -559,51 +558,51 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
{
notifyIdleness0(
- session, currentTime,
+ session, currentTime,
session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
IdleStatus.BOTH_IDLE,
Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
notifyIdleness0(
session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
- IdleStatus.WRITER_IDLE,
- Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
+ session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+ IdleStatus.WRITER_IDLE,
+ Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
- notifyWriteTimeout( session, currentTime, session
- .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+ notifyWriteTimeout(session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime());
}
- private void notifyIdleness0( SocketSessionImpl session, long currentTime,
- long idleTime, IdleStatus status,
- long lastIoTime )
+ private void notifyIdleness0(SocketSessionImpl session, long currentTime,
+ long idleTime, IdleStatus status,
+ long lastIoTime)
{
- if( idleTime > 0 && lastIoTime != 0
- && ( currentTime - lastIoTime ) >= idleTime )
+ if (idleTime > 0 && lastIoTime != 0
+ && (currentTime - lastIoTime) >= idleTime)
{
- session.increaseIdleCount( status );
- session.getFilterChain().fireSessionIdle( session, status );
+ session.increaseIdleCount(status);
+ session.getFilterChain().fireSessionIdle(session, status);
}
}
- private void notifyWriteTimeout( SocketSessionImpl session,
- long currentTime,
- long writeTimeout, long lastIoTime )
+ private void notifyWriteTimeout(SocketSessionImpl session,
+ long currentTime,
+ long writeTimeout, long lastIoTime)
{
MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
SelectionKey key = sesh.getWriteSelectionKey();
- synchronized(writeLock)
- {
- if( writeTimeout > 0
- && ( currentTime - lastIoTime ) >= writeTimeout
- && key != null && key.isValid()
- && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
+ synchronized (writeLock)
{
- session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() );
+ if (writeTimeout > 0
+ && (currentTime - lastIoTime) >= writeTimeout
+ && key != null && key.isValid()
+ && (key.interestOps() & SelectionKey.OP_WRITE) != 0)
+ {
+ session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException());
+ }
}
}
- }
private SocketSessionImpl getNextFlushingSession()
{
@@ -612,9 +611,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void releaseSession(SocketSessionImpl session)
{
- synchronized(session.getWriteRequestQueue())
+ synchronized (session.getWriteRequestQueue())
{
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
if (session.getScheduledWriteRequests() > 0)
{
@@ -642,7 +641,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
WriteRequest req;
//Should this be synchronized?
- synchronized(writeRequestQueue)
+ synchronized (writeRequestQueue)
{
while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
{
@@ -668,9 +667,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
{
- if( !session.isConnected() )
+ if (!session.isConnected())
{
- releaseWriteBuffers( session );
+ releaseWriteBuffers(session);
releaseSession(session);
continue;
}
@@ -678,14 +677,14 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
SelectionKey key = session.getWriteSelectionKey();
// Retry later if session is not yet fully initialized.
// (In case that Session.write() is called before addSession() is processed)
- if( key == null )
+ if (key == null)
{
- scheduleFlush( session );
+ scheduleFlush(session);
releaseSession(session);
continue;
}
// skip if channel is already closed
- if( !key.isValid() )
+ if (!key.isValid())
{
releaseSession(session);
continue;
@@ -698,11 +697,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
releaseSession(session);
}
}
- catch( IOException e )
+ catch (IOException e)
{
releaseSession(session);
- scheduleRemove( session );
- session.getFilterChain().fireExceptionCaught( session, e );
+ scheduleRemove(session);
+ session.getFilterChain().fireExceptionCaught(session, e);
}
}
@@ -714,32 +713,32 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
// Clear OP_WRITE
SelectionKey key = session.getWriteSelectionKey();
- synchronized(writeLock)
+ synchronized (writeLock)
{
- key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
}
SocketChannel ch = session.getChannel();
Queue writeRequestQueue = session.getWriteRequestQueue();
long totalFlushedBytes = 0;
- for( ; ; )
+ while (true)
{
WriteRequest req;
- synchronized( writeRequestQueue )
+ synchronized (writeRequestQueue)
{
- req = ( WriteRequest ) writeRequestQueue.first();
+ req = (WriteRequest) writeRequestQueue.first();
}
- if( req == null )
+ if (req == null)
{
break;
}
- ByteBuffer buf = ( ByteBuffer ) req.getMessage();
- if( buf.remaining() == 0 )
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0)
{
- synchronized( writeRequestQueue )
+ synchronized (writeRequestQueue)
{
writeRequestQueue.pop();
}
@@ -747,7 +746,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
session.increaseWrittenMessages();
buf.reset();
- session.getFilterChain().fireMessageSent( session, req );
+ session.getFilterChain().fireMessageSent(session, req);
continue;
}
@@ -755,23 +754,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
int writtenBytes = 0;
// Reported as DIRMINA-362
- //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
-// if (key.isWritable())
+ //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
+ if (key.isWritable())
{
- try
- {
- writtenBytes = ch.write(buf.buf());
- totalFlushedBytes += writtenBytes;
- }
- catch (IOException ioe)
- {
- throw ioe;
- }
+ writtenBytes = ch.write(buf.buf());
+ totalFlushedBytes += writtenBytes;
}
- if( writtenBytes > 0 )
+ if (writtenBytes > 0)
{
- session.increaseWrittenBytes( writtenBytes );
+ session.increaseWrittenBytes(writtenBytes);
}
if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
@@ -911,7 +903,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
if (writeSelector.keys().isEmpty())
{
- synchronized(writeLock)
+ synchronized (writeLock)
{
if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
@@ -963,7 +955,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
//System.out.println("ReadDebug:"+"Startup");
- for( ; ; )
+ for (; ;)
{
try
{
@@ -972,7 +964,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
doAddNewReader();
doUpdateTrafficMask();
- if( nKeys > 0 )
+ if (nKeys > 0)
{
//System.out.println("ReadDebug:"+nKeys + " keys from selector");
@@ -987,21 +979,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
doRemove();
notifyReadIdleness();
- if( selector.keys().isEmpty() )
+ if (selector.keys().isEmpty())
{
- synchronized(readLock)
+ synchronized (readLock)
{
- if( selector.keys().isEmpty() && newSessions.isEmpty() )
+ if (selector.keys().isEmpty() && newSessions.isEmpty())
{
readWorker = null;
try
{
selector.close();
}
- catch( IOException e )
+ catch (IOException e)
{
- ExceptionMonitor.getInstance().exceptionCaught( e );
+ ExceptionMonitor.getInstance().exceptionCaught(e);
}
finally
{
@@ -1013,17 +1005,17 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
}
}
- catch( Throwable t )
+ catch (Throwable t)
{
- ExceptionMonitor.getInstance().exceptionCaught( t );
+ ExceptionMonitor.getInstance().exceptionCaught(t);
try
{
- Thread.sleep( 1000 );
+ Thread.sleep(1000);
}
- catch( InterruptedException e1 )
+ catch (InterruptedException e1)
{
- ExceptionMonitor.getInstance().exceptionCaught( e1 );
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}