/*************************************************
*
* = PACKAGE
* JACE.Concurrency
*
* = FILENAME
* Token.java
*
*@author Prashant Jain
*
*************************************************/
package JACE.Concurrency;
import java.util.*;
import JACE.ASX.*;
import JACE.OS.*;
/**
* Class that acquires, renews, and releases a synchronization
* token that is serviced in strict FIFO ordering.
*
* This is a general-purpose synchronization mechanism that offers
* several benefits. For example, it implements "recursive mutex"
* semantics, where a thread that owns the token can reacquire it
* without deadlocking. In addition, threads that are blocked
* awaiting the token are serviced in strict FIFO order as other
* threads release the token. The solution makes use of the
* Specific Notification pattern presented by Tom Cargill in
* "Specific Notification for Java Thread Synchronization," PLoP96.
*
*
* This class DOES support recursive semantics.
*/
public class Token extends LockAdapter
{
/**
* Acquire ownership of the lock, blocking indefinitely if necessary.
*
*@return AbstractLock.FAILURE or AbstractLock.SUCCESS
*@exception InterruptedException indicates another thread has
* interrupted this one during wait
*/
public int acquire () throws InterruptedException
{
try
{
return this.acquire (null);
}
catch (TimeoutException e)
{
// This really shouldn't happen since we are supposed to
// block.
return AbstractLock.FAILURE;
}
}
/**
* Acquire the token by the given absolute time time-out. The
* method uses synchronized blocks internally to avoid race conditions.
*@param timeout time to wait until before throwing a
* TimeoutException (unless the token is acquired before that).
* Performs a blocking acquire if the given timeout is null.
*@return AbstractLock.SUCCESS if acquires without calling
* AbstractLock.SLEEPHOOK if is called.
* AbstractLock.FAILURE if failure occurs
*@exception TimeoutException if time-out occurs
*@exception InterruptedException exception during wait
*/
public int acquire (TimeValue timeout) throws TimeoutException,
InterruptedException
{
int result = AbstractLock.SUCCESS;
WaitObject snl = new WaitObject ();
boolean mustWait;
synchronized (snl)
{
synchronized (this)
{
mustWait = !this.snq_.isEmpty ();
if (mustWait && isOwner ())
{
// I am the one who has the token. So just increment
// the nesting level
this.nestingLevel_++;
return AbstractLock.SUCCESS;
}
// Add local lock to the queue
this.snq_.addElement (snl);
}
if (mustWait)
{
result = AbstractLock.SLEEPHOOK;
sleepHook();
boolean exceptionOccured = true;
try {
snl.timedWait(timeout);
exceptionOccured = false;
} finally {
if (exceptionOccured) {
synchronized (this) {
if (!snq_.removeElement (snl)) {
setOwner ();
release ();
}
}
}
}
}
// Set the owner of the token
synchronized (this) {
setOwner();
}
}
return result;
}
/**
* Try to acquire the token. Implements a non-blocking acquire.
*
*@return AbstractLock.SUCCESS if acquires
* AbstractLock.FAILURE if failure occurs
*/
public synchronized int tryAcquire ()
{
int result = AbstractLock.SUCCESS;
if (this.snq_.isEmpty ())
{
// No one has the token, so acquire it
this.snq_.addElement (new WaitObject ());
setOwner();
}
else if (isOwner())
{
this.nestingLevel_++;
}
// Someone else has the token.
else
{
// Would have to block to acquire the token, so return
// failure.
result = AbstractLock.FAILURE;
}
return result;
}
/**
* An optimized method that efficiently reacquires the token if no
* other threads are waiting. This is useful for situations where
* you don't want to degrade the quality of service if there are
* other threads waiting to get the token. If the given TimeValue
* is null, it's the same as calling renew(int requeuePosition).
*@param requeuePosition Position in the queue where to insert the
* lock. If requeuePosition == -1 and there are other threads
* waiting to obtain the token we are queued at the end of the list
* of waiters. If requeuePosition > -1 then it indicates how many
* entries to skip over before inserting our thread into the list of
* waiters (e.g.,requeuePosition == 0 means "insert at front of the
* queue").
*@param timeout Throw a TimeoutException if the token isn't renewed
* before this absolute time timeout.
*@return AbstractLock.SUCCESS if renewed the lock
* AbstractLock.FAILURE if failure occurs
*@exception TimeoutException exception if timeout occurs
*@exception InterruptedException exception during wait
*/
public int renew (int requeuePosition, TimeValue timeout)
throws TimeoutException, InterruptedException
{
WaitObject snl = null;
int saveNestingLevel = 0;
synchronized (this)
{
if (!isOwner ())
return AbstractLock.FAILURE;
// Check if there is a thread waiting to acquire the token. If
// not or if requeuePosition == 0, then we don't do anything
// and we simply keep the token.
if (this.snq_.size () > 1 && requeuePosition != 0)
{
// Save the nesting level
saveNestingLevel = this.nestingLevel_;
this.nestingLevel_ = 0;
// Reinsert ourselves at requeuePosition in the queue
snl = (WaitObject) this.snq_.firstElement ();
this.snq_.removeElementAt (0);
if (requeuePosition < 0)
this.snq_.addElement (snl); // Insert at end
else
this.snq_.insertElementAt (snl, Math.min(requeuePosition,
this.snq_.size()));
synchronized (this.snq_.firstElement ())
{
// Notify the first waiting thread in the queue
WaitObject obj = (WaitObject) this.snq_.firstElement ();
// Set its condition to be true so that it falls out
// of the for loop
obj.condition (true);
// Now signal the thread
obj.signal ();
}
}
}
// Check if we reinserted the lock in the queue and therefore need
// to do a wait
if (snl != null)
{
synchronized (snl)
{
// Set the condition to be false so that we can begin the
// wait
snl.condition (false);
// Wait until the given absolute time (or until notified
// if the timeout is null)
boolean exceptionOccured = true;
try {
snl.timedWait (timeout);
exceptionOccured = false;
} finally {
if (exceptionOccured) {
synchronized (this) {
if (!snq_.removeElement (snl)) {
setOwner ();
release ();
}
}
}
}
}
synchronized (this) {
// Restore the nesting level and current owner of the lock
this.nestingLevel_ = saveNestingLevel;
// Set the owner of the token
setOwner();
}
}
return AbstractLock.SUCCESS;
}
/**
* Release the token. It is safe for non-owners to call
* this.
*@return AbstractLock.SUCCESS on success
* AbstractLock.FAILURE on failure (for instance, a non-owner
* calling release)
*/
public synchronized int release ()
{
if (!isOwner())
return AbstractLock.FAILURE;
// Check if nestingLevel > 0 and if so, decrement it
if (this.nestingLevel_ > 0)
this.nestingLevel_--;
else
{
clearOwner ();
this.snq_.removeElementAt (0);
if (!this.snq_.isEmpty ())
{
synchronized (this.snq_.firstElement ())
{
// Notify the first waiting thread in the queue
WaitObject obj = (WaitObject) this.snq_.firstElement ();
// Set its condition to be true so that it falls out
// of the for loop
obj.condition (true);
// Now signal the thread
obj.signal ();
}
}
}
return AbstractLock.SUCCESS;
}
private Vector snq_ = new Vector ();
// Vector of lock objects
private int nestingLevel_ = 0;
// Current Nesting Level
}