diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java | 111 |
1 files changed, 70 insertions, 41 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index 15d1c20ff1..82b600de88 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,40 +55,28 @@ import org.slf4j.LoggerFactory; */ public class Job implements ReadWriteRunnable { + + /** Defines the maximum number of events that will be batched into a single job. */ + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; /** Holds the queue of events that make up the job. */ - private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); + private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>(); /** Holds a status flag, that indicates when the job is actively running. */ private final AtomicBoolean _active = new AtomicBoolean(); - /** Holds the completion continuation, called upon completion of a run of the job. */ - private final JobCompletionHandler _completionHandler; - private final boolean _readJob; + private ReferenceCountingExecutorService _poolReference; + private final static Logger _logger = LoggerFactory.getLogger(Job.class); - /** - * Creates a new job that aggregates many continuations together. - * - * @param session The Mina session. - * @param completionHandler The per job run, terminal continuation. - * @param maxEvents The maximum number of aggregated continuations to process per run of the job. - * @param readJob - */ - Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) - { - _completionHandler = completionHandler; - _maxEvents = maxEvents; - _readJob = readJob; - } - - public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob) + public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob) { - _completionHandler = completionHandler; + _poolReference = poolReference; _maxEvents = maxEvents; _readJob = readJob; } @@ -99,7 +86,7 @@ public class Job implements ReadWriteRunnable * * @param evt The continuation to enqueue. */ - public void add(Event evt) + public void add(Runnable evt) { _eventQueue.add(evt); } @@ -113,14 +100,14 @@ public class Job implements ReadWriteRunnable int i = _maxEvents; while( --i != 0 ) { - Event e = _eventQueue.poll(); + Runnable e = _eventQueue.poll(); if (e == null) { return true; } else { - e.process(); + e.run(); } } return false; @@ -162,11 +149,11 @@ public class Job implements ReadWriteRunnable if(processAll()) { deactivate(); - _completionHandler.completed(this); + completed(); } else { - _completionHandler.notCompleted(this); + notCompleted(); } } @@ -174,19 +161,6 @@ public class Job implements ReadWriteRunnable { return _readJob; } - - /** - * Another interface for a continuation. - * - * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask, - * Runnable or a custom Continuation interface. - */ - static interface JobCompletionHandler - { - public void completed(Job job); - - public void notCompleted(final Job job); - } /** * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. @@ -194,7 +168,7 @@ public class Job implements ReadWriteRunnable * @param job The job. * @param event The event to hand off asynchronously. */ - public static void fireAsynchEvent(ExecutorService pool, Job job, Event event) + public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event) { job.add(event); @@ -221,4 +195,59 @@ public class Job implements ReadWriteRunnable } + /** + * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing + * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. + * + * @param session The Mina session to work in. + * @param job The job that completed. + */ + public void completed() + { + if (!isComplete()) + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? + // Can the pool be shutdown at this point? + if (activate()) + { + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + + } + } + } + + public void notCompleted() + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + } |