diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
| commit | 7e34266b9a23f4536415bfbc3f161b84615b6550 (patch) | |
| tree | 484008cf2d413f58b5e4ab80b373303c66200888 /RC9/qpid/dotnet/Qpid.Common/Collections | |
| parent | 4612263ea692f00a4bd810438bdaf9bc88022091 (diff) | |
| download | qpid-python-M4.tar.gz | |
Tag M4 RC9M4
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@734202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC9/qpid/dotnet/Qpid.Common/Collections')
5 files changed, 1294 insertions, 0 deletions
diff --git a/RC9/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs b/RC9/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs new file mode 100644 index 0000000000..dcfacf8474 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; + +namespace Apache.Qpid.Collections +{ + public abstract class BlockingQueue : Queue + { + /** + * Inserts the specified element into this queue if it is possible to do + * so immediately without violating capacity restrictions, returning + * <tt>true</tt> upon success and <tt>false</tt> if no space is currently + * available. When using a capacity-restricted queue, this method is + * generally preferable to {@link #add}, which can fail to insert an + * element only by throwing an exception. + * + * @param e the element to add + * @return <tt>true</tt> if the element was added to this queue, else + * <tt>false</tt> + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + public abstract bool EnqueueNoThrow(Object e); + + /** + * Inserts the specified element into this queue, waiting if necessary + * for space to become available. + * + * @param e the element to add + * @throws InterruptedException if interrupted while waiting + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + public abstract void EnqueueBlocking(object e); + + /** + * Retrieves and removes the head of this queue, waiting up to the + * specified wait time if necessary for an element to become available. + * + * @param timeout how long to wait before giving up, in units of + * <tt>unit</tt> + * @param unit a <tt>TimeUnit</tt> determining how to interpret the + * <tt>timeout</tt> parameter + * @return the head of this queue, or <tt>null</tt> if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ + public abstract object DequeueBlocking(); + + /** + * Returns the number of additional elements that this queue can ideally + * (in the absence of memory or resource constraints) accept without + * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic + * limit. + * + * <p>Note that you <em>cannot</em> always tell if an attempt to insert + * an element will succeed by inspecting <tt>remainingCapacity</tt> + * because it may be the case that another thread is about to + * insert or remove an element. + * + * @return the remaining capacity + */ + public abstract int RemainingCapacity + { + get; + } + } +} + + diff --git a/RC9/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs b/RC9/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs new file mode 100644 index 0000000000..ea4526faaf --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs @@ -0,0 +1,113 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Threading;
+
+
+namespace Apache.Qpid.Collections
+{
+ /// <summary>
+ /// Simple FIFO queue to support multi-threaded consumer
+ /// and producers. It supports timeouts in dequeue operations.
+ /// </summary>
+ public sealed class ConsumerProducerQueue
+ {
+ private Queue _queue = new Queue();
+ private WaitSemaphore _semaphore = new WaitSemaphore();
+
+ /// <summary>
+ /// Put an item into the tail of the queue
+ /// </summary>
+ /// <param name="item"></param>
+ public void Enqueue(object item)
+ {
+ lock ( _queue.SyncRoot )
+ {
+ _queue.Enqueue(item);
+ _semaphore.Increment();
+ }
+ }
+
+ /// <summary>
+ /// Wait indefinitely for an item to be available
+ /// on the queue.
+ /// </summary>
+ /// <returns>The object at the head of the queue</returns>
+ public object Dequeue()
+ {
+ return Dequeue(Timeout.Infinite);
+ }
+
+ /// <summary>
+ /// Wait up to the number of milliseconds specified
+ /// for an item to be available on the queue
+ /// </summary>
+ /// <param name="timeout">Number of milliseconds to wait</param>
+ /// <returns>The object at the head of the queue, or null
+ /// if the timeout expires</returns>
+ public object Dequeue(long timeout)
+ {
+ if ( _semaphore.Decrement(timeout) )
+ {
+ lock ( _queue.SyncRoot )
+ {
+ return _queue.Dequeue();
+ }
+ }
+ return null;
+ }
+
+ #region Simple Semaphore
+ //
+ // Simple Semaphore
+ //
+
+ class WaitSemaphore
+ {
+ private int _count;
+ private AutoResetEvent _event = new AutoResetEvent(false);
+
+ public void Increment()
+ {
+ Interlocked.Increment(ref _count);
+ _event.Set();
+ }
+
+ public bool Decrement(long timeout)
+ {
+ if ( timeout > int.MaxValue )
+ throw new ArgumentOutOfRangeException("timeout", timeout, "Must be <= Int32.MaxValue");
+
+ int millis = (int) (timeout & 0x7FFFFFFF);
+ if ( Interlocked.Decrement(ref _count) > 0 )
+ {
+ // there are messages in queue, so no need to wait
+ return true;
+ } else
+ {
+ return _event.WaitOne(millis, false);
+ }
+ }
+ }
+ #endregion // Simple Semaphore
+ }
+}
diff --git a/RC9/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs b/RC9/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs new file mode 100644 index 0000000000..be92576951 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs @@ -0,0 +1,384 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; + +namespace Apache.Qpid.Collections +{ + public class LinkedBlockingQueue : BlockingQueue + { + + /* + * A variant of the "two lock queue" algorithm. The putLock gates + * entry to put (and offer), and has an associated condition for + * waiting puts. Similarly for the takeLock. The "count" field + * that they both rely on is maintained as an atomic to avoid + * needing to get both locks in most cases. Also, to minimize need + * for puts to get takeLock and vice-versa, cascading notifies are + * used. When a put notices that it has enabled at least one take, + * it signals taker. That taker in turn signals others if more + * items have been entered since the signal. And symmetrically for + * takes signalling puts. Operations such as remove(Object) and + * iterators acquire both locks. + */ + + /** + * Linked list node class + */ + internal class Node + { + /** The item, volatile to ensure barrier separating write and read */ + internal volatile Object item; + internal Node next; + internal Node(Object x) { item = x; } + } + + /** The capacity bound, or Integer.MAX_VALUE if none */ + private readonly int capacity; + + /** Current number of elements */ + private volatile int count = 0; + + /** Head of linked list */ + private Node head; + + /** Tail of linked list */ + private Node last; + + /** Lock held by take, poll, etc */ + private readonly object takeLock = new Object(); //new SerializableLock(); + + /** Lock held by put, offer, etc */ + private readonly object putLock = new Object();//new SerializableLock(); + + /** + * Signals a waiting take. Called only from put/offer (which do not + * otherwise ordinarily lock takeLock.) + */ + private void SignalNotEmpty() + { + lock (takeLock) + { + Monitor.Pulse(takeLock); + } + } + + /** + * Signals a waiting put. Called only from take/poll. + */ + private void SignalNotFull() + { + lock (putLock) + { + Monitor.Pulse(putLock); + } + } + + /** + * Creates a node and links it at end of queue. + * @param x the item + */ + private void Insert(Object x) + { + last = last.next = new Node(x); + } + + /** + * Removes a node from head of queue, + * @return the node + */ + private Object Extract() + { + Node first = head.next; + head = first; + Object x = first.item; + first.item = null; + return x; + } + + + /** + * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of + * {@link Integer#MAX_VALUE}. + */ + public LinkedBlockingQueue() : this(Int32.MaxValue) + { + } + + /** + * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity. + * + * @param capacity the capacity of this queue + * @throws IllegalArgumentException if <tt>capacity</tt> is not greater + * than zero + */ + public LinkedBlockingQueue(int capacity) + { + if (capacity <= 0) throw new ArgumentException("Capacity must be positive, was passed " + capacity); + this.capacity = capacity; + last = head = new Node(null); + } + + // this doc comment is overridden to remove the reference to collections + // greater in size than Integer.MAX_VALUE + /** + * Returns the number of elements in this queue. + * + * @return the number of elements in this queue + */ + public int Size + { + get + { + return count; + } + } + + // this doc comment is a modified copy of the inherited doc comment, + // without the reference to unlimited queues. + /** + * Returns the number of additional elements that this queue can ideally + * (in the absence of memory or resource constraints) accept without + * blocking. This is always equal to the initial capacity of this queue + * less the current <tt>size</tt> of this queue. + * + * <p>Note that you <em>cannot</em> always tell if an attempt to insert + * an element will succeed by inspecting <tt>remainingCapacity</tt> + * because it may be the case that another thread is about to + * insert or remove an element. + */ + public override int RemainingCapacity + { + get + { + return capacity - count; + } + } + + /** + * Inserts the specified element at the tail of this queue, waiting if + * necessary for space to become available. + * + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + public override void EnqueueBlocking(Object e) + { + if (e == null) throw new ArgumentNullException("Object must not be null"); + // Note: convention in all put/take/etc is to preset + // local var holding count negative to indicate failure unless set. + int c = -1; + lock (putLock) + { + /* + * Note that count is used in wait guard even though it is + * not protected by lock. This works because count can + * only decrease at this point (all other puts are shut + * out by lock), and we (or some other waiting put) are + * signalled if it ever changes from + * capacity. Similarly for all other uses of count in + * other wait guards. + */ + while (count == capacity) + { + Monitor.Wait(putLock); + } + + Insert(e); + lock(this) + { + c = count++; + } + if (c + 1 < capacity) + { + Monitor.Pulse(putLock); + } + } + + if (c == 0) + { + SignalNotEmpty(); + } + } + + /** + * Inserts the specified element at the tail of this queue if it is + * possible to do so immediately without exceeding the queue's capacity, + * returning <tt>true</tt> upon success and <tt>false</tt> if this queue + * is full. + * When using a capacity-restricted queue, this method is generally + * preferable to method {@link BlockingQueue#add add}, which can fail to + * insert an element only by throwing an exception. + * + * @throws NullPointerException if the specified element is null + */ + public override bool EnqueueNoThrow(Object e) + { + if (e == null) throw new ArgumentNullException("e must not be null"); + if (count == capacity) + { + return false; + } + int c = -1; + lock (putLock) + { + if (count < capacity) + { + Insert(e); + lock (this) + { + c = count++; + } + if (c + 1 < capacity) + { + Monitor.Pulse(putLock); + } + } + } + if (c == 0) + { + SignalNotEmpty(); + } + return c >= 0; + } + + /** + * Retrieves and removes the head of this queue, waiting if necessary + * until an element becomes available. + * + * @return the head of this queue + * @throws InterruptedException if interrupted while waiting + */ + public override Object DequeueBlocking() + { + Object x; + int c = -1; + lock (takeLock) + { + + while (count == 0) + { + Monitor.Wait(takeLock); + } + + + x = Extract(); + lock (this) { c = count--; } + if (c > 1) + { + Monitor.Pulse(takeLock); + } + } + if (c == capacity) + { + SignalNotFull(); + } + return x; + } + + public Object Poll() + { + if (count == 0) + { + return null; + } + Object x = null; + int c = -1; + lock (takeLock) + { + if (count > 0) + { + x = Extract(); + lock (this) { c = count--; } + if (c > 1) + { + Monitor.Pulse(takeLock); + } + } + } + if (c == capacity) + { + SignalNotFull(); + } + return x; + } + + + public override Object Peek() + { + if (count == 0) + { + return null; + } + lock (takeLock) + { + Node first = head.next; + if (first == null) + { + return null; + } + else + { + return first.item; + } + } + } + + public override String ToString() + { + lock (putLock) + { + lock (takeLock) + { + return base.ToString(); + } + } + } + + /** + * Atomically removes all of the elements from this queue. + * The queue will be empty after this call returns. + */ + public override void Clear() + { + lock (putLock) + { + lock (takeLock) + { + head.next = null; + last = head; + int c; + lock (this) + { + c = count; + count = 0; + } + if (c == capacity) + { + Monitor.PulseAll(putLock); + } + } + } + } + } +} + + diff --git a/RC9/qpid/dotnet/Qpid.Common/Collections/LinkedHashtable.cs b/RC9/qpid/dotnet/Qpid.Common/Collections/LinkedHashtable.cs new file mode 100644 index 0000000000..10ab5c674d --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Collections/LinkedHashtable.cs @@ -0,0 +1,327 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; + +namespace Apache.Qpid.Collections +{ + public class LinkedHashtable : IDictionary + { + /// <summary> + /// Maps from key to LinkedDictionaryEntry + /// </summary> + private Hashtable _indexedValues = new Hashtable(); + + private LinkedDictionaryEntry _head; + + private LinkedDictionaryEntry _tail; + + private class LinkedDictionaryEntry + { + public LinkedDictionaryEntry _previous; + public LinkedDictionaryEntry _next; + internal DictionaryEntry _entry; + + public LinkedDictionaryEntry(object key, object value) + { + _entry = new DictionaryEntry(key, value); + } + } + + public object this[object key] + { + get + { + LinkedDictionaryEntry entry = (LinkedDictionaryEntry)_indexedValues[key]; + if (entry == null) + { + return null; // key not found + } + else + { + return entry._entry.Value; + } + } + + set + { + LinkedDictionaryEntry entry = (LinkedDictionaryEntry)_indexedValues[key]; + if (entry == null) + { + Add(key, value); + } + else + { + entry._entry.Value = value; + } + } + } + + /// <summary> + /// Collect keys in linked order. + /// </summary> + public ICollection Keys + { + get + { + IList result = new ArrayList(); + foreach (DictionaryEntry entry in this) + { + result.Add(entry.Key); + } + return result; + } + } + + /// <summary> + /// Collect values in linked order. + /// </summary> + public ICollection Values + { + get + { + IList result = new ArrayList(); + foreach (DictionaryEntry entry in this) + { + result.Add(entry.Value); + } + return result; + } + } + + public bool IsReadOnly + { + get { return _indexedValues.IsReadOnly; } + } + + public bool IsFixedSize + { + get { return _indexedValues.IsFixedSize; } + } + + public bool Contains(object key) + { + return _indexedValues.Contains(key); + } + + public void Add(object key, object value) + { + if (key == null) throw new ArgumentNullException("key"); + + if (Contains(key)) + { + throw new ArgumentException("LinkedHashtable already contains key. key=" + key); + } + + LinkedDictionaryEntry de = new LinkedDictionaryEntry(key, value); + if (_head == null) + { + _head = de; + _tail = de; + } + else + { + _tail._next = de; + de._previous = _tail; + _tail = de; + } + _indexedValues[key] = de; + } + + public void Clear() + { + _indexedValues.Clear(); + } + + IDictionaryEnumerator IDictionary.GetEnumerator() + { + return new LHTEnumerator(this); + } + + public void Remove(object key) + { + if (key == null) throw new ArgumentNullException("key"); + + LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key]; + if (de == null) return; // key not found. + LinkedDictionaryEntry prev = de._previous; + if (prev == null) + { + _head = de._next; + } + else + { + prev._next = de._next; + } + + LinkedDictionaryEntry next = de._next; + if (next == null) + { + _tail = de; + } + else + { + next._previous = de._previous; + } + } + + private LinkedDictionaryEntry Head + { + get + { + return _head; + } + } + +// private LinkedDictionaryEntry Tail +// { +// get +// { +// return _tail; +// } +// } + + private class LHTEnumerator : IDictionaryEnumerator + { + private LinkedHashtable _container; + + private LinkedDictionaryEntry _current; + + /// <summary> + /// Set once we have navigated off the end of the collection + /// </summary> + private bool _needsReset = false; + + public LHTEnumerator(LinkedHashtable container) + { + _container = container; + } + + public object Current + { + get + { + if (_current == null) + { + throw new Exception("Iterator before first element"); + } + else + { + return _current._entry; + } + } + } + + public object Key + { + get { return _current._entry.Key; } + } + + public object Value + { + get { return _current._entry.Value; } + } + + public DictionaryEntry Entry + { + get + { + return _current._entry; + } + } + + public bool MoveNext() + { + if (_needsReset) + { + return false; + } + else if (_current == null) + { + _current = _container.Head; + } + else + { + _current = _current._next; + } + _needsReset = (_current == null); + return !_needsReset; + } + + public void Reset() + { + _current = null; + _needsReset = false; + } + } + + public void MoveToHead(object key) + { + LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key]; + if (de == null) + { + throw new ArgumentException("Key " + key + " not found"); + } + // if the head is the element then there is nothing to do + if (_head == de) + { + return; + } + de._previous._next = de._next; + if (de._next != null) + { + de._next._previous = de._previous; + } + else + { + _tail = de._previous; + } + de._next = _head; + _head = de; + de._previous = null; + } + + public void CopyTo(Array array, int index) + { + _indexedValues.CopyTo(array, index); + } + + public int Count + { + get { return _indexedValues.Count; } + } + + public object SyncRoot + { + get { return _indexedValues.SyncRoot; } + } + + public bool IsSynchronized + { + get { return _indexedValues.IsSynchronized; } + } + + public IEnumerator GetEnumerator() + { + return new LHTEnumerator(this); + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs b/RC9/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs new file mode 100644 index 0000000000..3c12df6067 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs @@ -0,0 +1,375 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; + +namespace Apache.Qpid.Collections +{ + public class SynchronousQueue : BlockingQueue + { + /// <summary> + /// Lock protecting both wait queues + /// </summary> +// private readonly object _qlock = new object(); + + /// <summary> + /// Queue holding waiting puts + /// </summary> +// private readonly WaitQueue _waitingProducers; + + /// <summary> + /// Queue holding waiting takes + /// </summary> +// private readonly WaitQueue _waitingConsumers; + + /** + * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below. + * These queues have all transient fields, but are serializable + * in order to recover fairness settings when deserialized. + */ + internal abstract class WaitQueue + { + /** Creates, adds, and returns node for x. */ + internal abstract Node Enq(Object x); + /** Removes and returns node, or null if empty. */ + internal abstract Node Deq(); + /** Removes a cancelled node to avoid garbage retention. */ + internal abstract void Unlink(Node node); + /** Returns true if a cancelled node might be on queue. */ + internal abstract bool ShouldUnlink(Node node); + } + + /** + * FIFO queue to hold waiting puts/takes. + */ + sealed class FifoWaitQueue : WaitQueue + { + private Node head; + private Node last; + + internal override Node Enq(Object x) + { + Node p = new Node(x); + if (last == null) + { + last = head = p; + } + else + { + last = last.next = p; + } + return p; + } + + internal override Node Deq() + { + Node p = head; + if (p != null) + { + if ((head = p.next) == null) + { + last = null; + } + p.next = null; + } + return p; + } + + internal override bool ShouldUnlink(Node node) + { + return (node == last || node.next != null); + } + + internal override void Unlink(Node node) + { + Node p = head; + Node trail = null; + while (p != null) + { + if (p == node) + { + Node next = p.next; + if (trail == null) + { + head = next; + } + else + { + trail.next = next; + } + if (last == node) + { + last = trail; + } + break; + } + trail = p; + p = p.next; + } + } + } + + /** + * LIFO queue to hold waiting puts/takes. + */ + sealed class LifoWaitQueue : WaitQueue + { + private Node head; + + internal override Node Enq(Object x) + { + return head = new Node(x, head); + } + + internal override Node Deq() + { + Node p = head; + if (p != null) + { + head = p.next; + p.next = null; + } + return p; + } + + internal override bool ShouldUnlink(Node node) + { + // Return false if already dequeued or is bottom node (in which + // case we might retain at most one garbage node) + return (node == head || node.next != null); + } + + internal override void Unlink(Node node) + { + Node p = head; + Node trail = null; + while (p != null) + { + if (p == node) + { + Node next = p.next; + if (trail == null) + head = next; + else + trail.next = next; + break; + } + trail = p; + p = p.next; + } + } + } + + /** + * Nodes each maintain an item and handle waits and signals for + * getting and setting it. The class extends + * AbstractQueuedSynchronizer to manage blocking, using AQS state + * 0 for waiting, 1 for ack, -1 for cancelled. + */ + sealed internal class Node + { + + /** Synchronization state value representing that node acked */ + private const int ACK = 1; + /** Synchronization state value representing that node cancelled */ + private const int CANCEL = -1; + + internal int state = 0; + + /** The item being transferred */ + internal Object item; + /** Next node in wait queue */ + internal Node next; + + /** Creates a node with initial item */ + internal Node(Object x) + { + item = x; + } + + /** Creates a node with initial item and next */ + internal Node(Object x, Node n) + { + item = x; + next = n; + } + + /** + * Takes item and nulls out field (for sake of GC) + * + * PRE: lock owned + */ + private Object Extract() + { + Object x = item; + item = null; + return x; + } + + /** + * Tries to cancel on interrupt; if so rethrowing, + * else setting interrupt state + * + * PRE: lock owned + */ + /*private void checkCancellationOnInterrupt(InterruptedException ie) + throws InterruptedException + { + if (state == 0) { + state = CANCEL; + notify(); + throw ie; + } + Thread.currentThread().interrupt(); + }*/ + + /** + * Fills in the slot created by the consumer and signal consumer to + * continue. + */ + internal bool SetItem(Object x) + { + lock (this) + { + if (state != 0) return false; + item = x; + state = ACK; + Monitor.Pulse(this); + return true; + } + } + + /** + * Removes item from slot created by producer and signal producer + * to continue. + */ + internal Object GetItem() + { + if (state != 0) return null; + state = ACK; + Monitor.Pulse(this); + return Extract(); + } + + /** + * Waits for a consumer to take item placed by producer. + */ + internal void WaitForTake() //throws InterruptedException { + { + while (state == 0) + { + Monitor.Wait(this); + } + } + + /** + * Waits for a producer to put item placed by consumer. + */ + internal object WaitForPut() + { + lock (this) + { + while (state == 0) Monitor.Wait(this); + } + return Extract(); + } + + private bool Attempt(long nanos) + { + if (state != 0) return true; + if (nanos <= 0) { + state = CANCEL; + Monitor.Pulse(this); + return false; + } + + while (true) + { + Monitor.Wait(nanos); + //TimeUnit.NANOSECONDS.timedWait(this, nanos); + if (state != 0) + { + return true; + } + //nanos = deadline - Utils.nanoTime(); + //if (nanos <= 0) + else + { + state = CANCEL; + Monitor.Pulse(this); + return false; + } + } + } + + /** + * Waits for a consumer to take item placed by producer or time out. + */ + internal bool WaitForTake(long nanos) + { + return Attempt(nanos); + } + + /** + * Waits for a producer to put item placed by consumer, or time out. + */ + internal object WaitForPut(long nanos) + { + if (!Attempt(nanos)) + { + return null; + } + else + { + return Extract(); + } + } + } + + public SynchronousQueue(bool strict) + { + // TODO !!!! + } + + public override bool EnqueueNoThrow(object e) + { + throw new NotImplementedException(); + } + + public override void EnqueueBlocking(object e) + { + throw new NotImplementedException(); + } + + public override object DequeueBlocking() + { + throw new NotImplementedException(); + } + + public override int RemainingCapacity + { + get + { + throw new NotImplementedException(); + } + } + } +} |
