/************************************************* * * = PACKAGE * JACE.Concurrency * * = FILENAME * Token.java * *@author Prashant Jain * *************************************************/ package JACE.Concurrency; import java.util.*; import JACE.ASX.*; class WaitObject extends TimedWait { public boolean condition () { return this.condition_; } public void condition (boolean c) { this.condition_ = c; } private boolean condition_ = false; } /** *
*

SYNOPSIS

*
* Class that acquires, renews, and releases a synchronization * token that is serviced in strict FIFO ordering. * *
* *

DESCRIPTION

*
* This is a general-purpose synchronization mechanism that offers * several benefits. For example, it implements "recursive mutex" * semantics, where a thread that owns the token can reacquire it * without deadlocking. In addition, threads that are blocked * awaiting the token are serviced in strict FIFO order as other * threads release the token. The solution makes use of the * Specific Notification pattern presented by Tom Cargill in * "Specific Notification for Java Thread Synchronization," PLoP96. *
*/ public class Token { /** * Acquire the token. Note that this will block. The method uses * synchronized blocks internally to avoid race conditions. It * ignores thread interrupts. *@return 0 if acquires without calling * 1 if is called. * -1 if failure occurs (should never happen) */ public int acquire () { try { return this.acquire (null); } catch (TimeoutException e) { // This really shouldn't happen since we are supposed to // block. return -1; } } /** * Acquire the token. Returns failure * Throws a TimeoutException if the token isn't acquired before the * given absolute time timeout. *@param timeout time (TimeValue) to wait until before throwing a * TimeoutException (unless the token is acquired before that). * Performs a blocking acquire if the given timeout is null. *@return 0 if acquires without calling * 1 if is called. * -1 if failure occurs (timeout) */ public int acquire (TimeValue timeout) throws TimeoutException { int result = 0; WaitObject snl = new WaitObject (); boolean mustWait; synchronized (snl) { synchronized (this) { mustWait = !this.snq_.isEmpty (); if (mustWait && isOwner ()) { // I am the one who has the token. So just increment // the nesting level this.nestingLevel_++; return 0; } // Add local lock to the queue this.snq_.addElement (snl); } if (mustWait) { result = 1; sleepHook(); while (mustWait) { try { snl.timedWait(timeout); mustWait = false; } catch (InterruptedException e) { // must keep waiting } } } // Set the owner of the token setOwner(); } return result; } /** * Try to acquire the token. Implements a non-blocking acquire. *@return 0 if acquires without calling * -1 if failure occurs */ public synchronized int tryAcquire () { int result = 0; if (this.snq_.isEmpty ()) { // No one has the token, so acquire it this.snq_.addElement (new WaitObject ()); setOwner(); } else if (isOwner()) { this.nestingLevel_++; } // Someone else has the token. else { // Would have to block to acquire the token, so return // failure. result = -1; } return result; } /** * Method that is called before a thread goes to sleep in an * acquire(). This should be overridden by a subclass to define * the appropriate behavior before acquire() goes to sleep. * By default, this is a no-op. */ public void sleepHook () { } /** * An optimized method that efficiently reacquires the token if no * other threads are waiting. This is useful for situations where * you don't want to degrade the quality of service if there are * other threads waiting to get the token. This blocks until it * can regain the token. *@param requeuePosition Position in the queue where to insert the * lock. If requeuePosition == -1 and there are other threads * waiting to obtain the token we are queued at the end of the list * of waiters. If requeuePosition > -1 then it indicates how many * entries to skip over before inserting our thread into the list of * waiters (e.g.,requeuePosition == 0 means "insert at front of the * queue"). */ public void renew (int requeuePosition) { try { this.renew (requeuePosition, null); } catch (TimeoutException e) { // This really shouldn't happen since we are supposed to // block. } } /** * An optimized method that efficiently reacquires the token if no * other threads are waiting. This is useful for situations where * you don't want to degrade the quality of service if there are * other threads waiting to get the token. If the given TimeValue * is null, it's the same as calling renew(int requeuePosition). *@param requeuePosition Position in the queue where to insert the * lock. If requeuePosition == -1 and there are other threads * waiting to obtain the token we are queued at the end of the list * of waiters. If requeuePosition > -1 then it indicates how many * entries to skip over before inserting our thread into the list of * waiters (e.g.,requeuePosition == 0 means "insert at front of the * queue"). *@param timeout Throw a TimeoutException if the token isn't renewed * before this absolute time timeout. *@exception TimeoutException exception if timeout occurs */ public void renew (int requeuePosition, TimeValue timeout) throws TimeoutException { WaitObject snl = null; int saveNestingLevel = 0; synchronized (this) { // Check if there is a thread waiting to acquire the token. If // not or if requeuePosition == 0, then we don't do anything // and we simply keep the token. if (this.snq_.size () > 1 && requeuePosition != 0) { // Save the nesting level saveNestingLevel = this.nestingLevel_; this.nestingLevel_ = 0; // Reinsert ourselves at requeuePosition in the queue snl = (WaitObject) this.snq_.firstElement (); this.snq_.removeElementAt (0); if (requeuePosition < 0) this.snq_.addElement (snl); // Insert at end else this.snq_.insertElementAt (snl, Math.min(requeuePosition, this.snq_.size())); synchronized (this.snq_.firstElement ()) { // Notify the first waiting thread in the queue WaitObject obj = (WaitObject) this.snq_.firstElement (); // Set its condition to be true so that it falls out // of the for loop obj.condition (true); // Now signal the thread obj.signal (); } } } // Check if we reinserted the lock in the queue and therefore need // to do a wait if (snl != null) { synchronized (snl) { // Set the condition to be false so that we can begin the // wait snl.condition (false); // Wait until the given absolute time (or until notified // if the timeout is null) boolean mustWait = true; while (mustWait) { try { snl.timedWait (timeout); mustWait = false; } catch (InterruptedException e) { // must keep waiting } } } // Restore the nesting level and current owner of the lock this.nestingLevel_ = saveNestingLevel; // Set the owner of the token setOwner(); } } /** * Release the token. It is safe for non-owners to call * this. */ public synchronized void release () { if (!isOwner()) return; // Check if nestingLevel > 0 and if so, decrement it if (this.nestingLevel_ > 0) this.nestingLevel_--; else { this.snq_.removeElementAt (0); if (!this.snq_.isEmpty ()) { synchronized (this.snq_.firstElement ()) { // Notify the first waiting thread in the queue WaitObject obj = (WaitObject) this.snq_.firstElement (); // Set its condition to be true so that it falls out // of the for loop obj.condition (true); // Now signal the thread obj.signal (); } } } } // The next two methods allow subclasses to change the behavior of the // checking and setting the Object owner_ member variable. The default // is to use the current Thread's toString() as the Object. protected void setOwner() { this.owner_ = Thread.currentThread().toString(); } protected boolean isOwner() { return Thread.currentThread().toString().equals(this.owner_); } private Vector snq_ = new Vector (); // Vector of lock objects private int nestingLevel_ = 0; // Current Nesting Level private Object owner_ = null; // Current owner of the token. The setOwner() and isOwner() // methods provide subclasses with the ability to change the // behavior. The default is to use the Thread.toString(). }