summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java111
1 files changed, 70 insertions, 41 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
index 15d1c20ff1..82b600de88 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,40 +55,28 @@ import org.slf4j.LoggerFactory;
*/
public class Job implements ReadWriteRunnable
{
+
+ /** Defines the maximum number of events that will be batched into a single job. */
+ public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
/** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
private final int _maxEvents;
/** Holds the queue of events that make up the job. */
- private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
+ private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>();
/** Holds a status flag, that indicates when the job is actively running. */
private final AtomicBoolean _active = new AtomicBoolean();
- /** Holds the completion continuation, called upon completion of a run of the job. */
- private final JobCompletionHandler _completionHandler;
-
private final boolean _readJob;
+ private ReferenceCountingExecutorService _poolReference;
+
private final static Logger _logger = LoggerFactory.getLogger(Job.class);
- /**
- * Creates a new job that aggregates many continuations together.
- *
- * @param session The Mina session.
- * @param completionHandler The per job run, terminal continuation.
- * @param maxEvents The maximum number of aggregated continuations to process per run of the job.
- * @param readJob
- */
- Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
- {
- _completionHandler = completionHandler;
- _maxEvents = maxEvents;
- _readJob = readJob;
- }
-
- public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob)
+ public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob)
{
- _completionHandler = completionHandler;
+ _poolReference = poolReference;
_maxEvents = maxEvents;
_readJob = readJob;
}
@@ -99,7 +86,7 @@ public class Job implements ReadWriteRunnable
*
* @param evt The continuation to enqueue.
*/
- public void add(Event evt)
+ public void add(Runnable evt)
{
_eventQueue.add(evt);
}
@@ -113,14 +100,14 @@ public class Job implements ReadWriteRunnable
int i = _maxEvents;
while( --i != 0 )
{
- Event e = _eventQueue.poll();
+ Runnable e = _eventQueue.poll();
if (e == null)
{
return true;
}
else
{
- e.process();
+ e.run();
}
}
return false;
@@ -162,11 +149,11 @@ public class Job implements ReadWriteRunnable
if(processAll())
{
deactivate();
- _completionHandler.completed(this);
+ completed();
}
else
{
- _completionHandler.notCompleted(this);
+ notCompleted();
}
}
@@ -174,19 +161,6 @@ public class Job implements ReadWriteRunnable
{
return _readJob;
}
-
- /**
- * Another interface for a continuation.
- *
- * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
- * Runnable or a custom Continuation interface.
- */
- static interface JobCompletionHandler
- {
- public void completed(Job job);
-
- public void notCompleted(final Job job);
- }
/**
* Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
@@ -194,7 +168,7 @@ public class Job implements ReadWriteRunnable
* @param job The job.
* @param event The event to hand off asynchronously.
*/
- public static void fireAsynchEvent(ExecutorService pool, Job job, Event event)
+ public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event)
{
job.add(event);
@@ -221,4 +195,59 @@ public class Job implements ReadWriteRunnable
}
+ /**
+ * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
+ * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
+ *
+ * @param session The Mina session to work in.
+ * @param job The job that completed.
+ */
+ public void completed()
+ {
+ if (!isComplete())
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+
+ // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
+ // Can the pool be shutdown at this point?
+ if (activate())
+ {
+ try
+ {
+ pool.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
+ }
+ }
+ }
+
+ public void notCompleted()
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ try
+ {
+ pool.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+
}