diff options
Diffstat (limited to 'dotnet/Qpid.Common/Collections')
| -rw-r--r-- | dotnet/Qpid.Common/Collections/BlockingQueue.cs | 95 | ||||
| -rw-r--r-- | dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs | 113 | ||||
| -rw-r--r-- | dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs | 384 | ||||
| -rw-r--r-- | dotnet/Qpid.Common/Collections/LinkedHashtable.cs | 327 | ||||
| -rw-r--r-- | dotnet/Qpid.Common/Collections/SynchronousQueue.cs | 375 |
5 files changed, 0 insertions, 1294 deletions
diff --git a/dotnet/Qpid.Common/Collections/BlockingQueue.cs b/dotnet/Qpid.Common/Collections/BlockingQueue.cs deleted file mode 100644 index dcfacf8474..0000000000 --- a/dotnet/Qpid.Common/Collections/BlockingQueue.cs +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Collections; - -namespace Apache.Qpid.Collections -{ - public abstract class BlockingQueue : Queue - { - /** - * Inserts the specified element into this queue if it is possible to do - * so immediately without violating capacity restrictions, returning - * <tt>true</tt> upon success and <tt>false</tt> if no space is currently - * available. When using a capacity-restricted queue, this method is - * generally preferable to {@link #add}, which can fail to insert an - * element only by throwing an exception. - * - * @param e the element to add - * @return <tt>true</tt> if the element was added to this queue, else - * <tt>false</tt> - * @throws ClassCastException if the class of the specified element - * prevents it from being added to this queue - * @throws NullPointerException if the specified element is null - * @throws IllegalArgumentException if some property of the specified - * element prevents it from being added to this queue - */ - public abstract bool EnqueueNoThrow(Object e); - - /** - * Inserts the specified element into this queue, waiting if necessary - * for space to become available. - * - * @param e the element to add - * @throws InterruptedException if interrupted while waiting - * @throws ClassCastException if the class of the specified element - * prevents it from being added to this queue - * @throws NullPointerException if the specified element is null - * @throws IllegalArgumentException if some property of the specified - * element prevents it from being added to this queue - */ - public abstract void EnqueueBlocking(object e); - - /** - * Retrieves and removes the head of this queue, waiting up to the - * specified wait time if necessary for an element to become available. - * - * @param timeout how long to wait before giving up, in units of - * <tt>unit</tt> - * @param unit a <tt>TimeUnit</tt> determining how to interpret the - * <tt>timeout</tt> parameter - * @return the head of this queue, or <tt>null</tt> if the - * specified waiting time elapses before an element is available - * @throws InterruptedException if interrupted while waiting - */ - public abstract object DequeueBlocking(); - - /** - * Returns the number of additional elements that this queue can ideally - * (in the absence of memory or resource constraints) accept without - * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic - * limit. - * - * <p>Note that you <em>cannot</em> always tell if an attempt to insert - * an element will succeed by inspecting <tt>remainingCapacity</tt> - * because it may be the case that another thread is about to - * insert or remove an element. - * - * @return the remaining capacity - */ - public abstract int RemainingCapacity - { - get; - } - } -} - - diff --git a/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs b/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs deleted file mode 100644 index ea4526faaf..0000000000 --- a/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs +++ /dev/null @@ -1,113 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Threading;
-
-
-namespace Apache.Qpid.Collections
-{
- /// <summary>
- /// Simple FIFO queue to support multi-threaded consumer
- /// and producers. It supports timeouts in dequeue operations.
- /// </summary>
- public sealed class ConsumerProducerQueue
- {
- private Queue _queue = new Queue();
- private WaitSemaphore _semaphore = new WaitSemaphore();
-
- /// <summary>
- /// Put an item into the tail of the queue
- /// </summary>
- /// <param name="item"></param>
- public void Enqueue(object item)
- {
- lock ( _queue.SyncRoot )
- {
- _queue.Enqueue(item);
- _semaphore.Increment();
- }
- }
-
- /// <summary>
- /// Wait indefinitely for an item to be available
- /// on the queue.
- /// </summary>
- /// <returns>The object at the head of the queue</returns>
- public object Dequeue()
- {
- return Dequeue(Timeout.Infinite);
- }
-
- /// <summary>
- /// Wait up to the number of milliseconds specified
- /// for an item to be available on the queue
- /// </summary>
- /// <param name="timeout">Number of milliseconds to wait</param>
- /// <returns>The object at the head of the queue, or null
- /// if the timeout expires</returns>
- public object Dequeue(long timeout)
- {
- if ( _semaphore.Decrement(timeout) )
- {
- lock ( _queue.SyncRoot )
- {
- return _queue.Dequeue();
- }
- }
- return null;
- }
-
- #region Simple Semaphore
- //
- // Simple Semaphore
- //
-
- class WaitSemaphore
- {
- private int _count;
- private AutoResetEvent _event = new AutoResetEvent(false);
-
- public void Increment()
- {
- Interlocked.Increment(ref _count);
- _event.Set();
- }
-
- public bool Decrement(long timeout)
- {
- if ( timeout > int.MaxValue )
- throw new ArgumentOutOfRangeException("timeout", timeout, "Must be <= Int32.MaxValue");
-
- int millis = (int) (timeout & 0x7FFFFFFF);
- if ( Interlocked.Decrement(ref _count) > 0 )
- {
- // there are messages in queue, so no need to wait
- return true;
- } else
- {
- return _event.WaitOne(millis, false);
- }
- }
- }
- #endregion // Simple Semaphore
- }
-}
diff --git a/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs b/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs deleted file mode 100644 index be92576951..0000000000 --- a/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs +++ /dev/null @@ -1,384 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Threading; - -namespace Apache.Qpid.Collections -{ - public class LinkedBlockingQueue : BlockingQueue - { - - /* - * A variant of the "two lock queue" algorithm. The putLock gates - * entry to put (and offer), and has an associated condition for - * waiting puts. Similarly for the takeLock. The "count" field - * that they both rely on is maintained as an atomic to avoid - * needing to get both locks in most cases. Also, to minimize need - * for puts to get takeLock and vice-versa, cascading notifies are - * used. When a put notices that it has enabled at least one take, - * it signals taker. That taker in turn signals others if more - * items have been entered since the signal. And symmetrically for - * takes signalling puts. Operations such as remove(Object) and - * iterators acquire both locks. - */ - - /** - * Linked list node class - */ - internal class Node - { - /** The item, volatile to ensure barrier separating write and read */ - internal volatile Object item; - internal Node next; - internal Node(Object x) { item = x; } - } - - /** The capacity bound, or Integer.MAX_VALUE if none */ - private readonly int capacity; - - /** Current number of elements */ - private volatile int count = 0; - - /** Head of linked list */ - private Node head; - - /** Tail of linked list */ - private Node last; - - /** Lock held by take, poll, etc */ - private readonly object takeLock = new Object(); //new SerializableLock(); - - /** Lock held by put, offer, etc */ - private readonly object putLock = new Object();//new SerializableLock(); - - /** - * Signals a waiting take. Called only from put/offer (which do not - * otherwise ordinarily lock takeLock.) - */ - private void SignalNotEmpty() - { - lock (takeLock) - { - Monitor.Pulse(takeLock); - } - } - - /** - * Signals a waiting put. Called only from take/poll. - */ - private void SignalNotFull() - { - lock (putLock) - { - Monitor.Pulse(putLock); - } - } - - /** - * Creates a node and links it at end of queue. - * @param x the item - */ - private void Insert(Object x) - { - last = last.next = new Node(x); - } - - /** - * Removes a node from head of queue, - * @return the node - */ - private Object Extract() - { - Node first = head.next; - head = first; - Object x = first.item; - first.item = null; - return x; - } - - - /** - * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of - * {@link Integer#MAX_VALUE}. - */ - public LinkedBlockingQueue() : this(Int32.MaxValue) - { - } - - /** - * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity. - * - * @param capacity the capacity of this queue - * @throws IllegalArgumentException if <tt>capacity</tt> is not greater - * than zero - */ - public LinkedBlockingQueue(int capacity) - { - if (capacity <= 0) throw new ArgumentException("Capacity must be positive, was passed " + capacity); - this.capacity = capacity; - last = head = new Node(null); - } - - // this doc comment is overridden to remove the reference to collections - // greater in size than Integer.MAX_VALUE - /** - * Returns the number of elements in this queue. - * - * @return the number of elements in this queue - */ - public int Size - { - get - { - return count; - } - } - - // this doc comment is a modified copy of the inherited doc comment, - // without the reference to unlimited queues. - /** - * Returns the number of additional elements that this queue can ideally - * (in the absence of memory or resource constraints) accept without - * blocking. This is always equal to the initial capacity of this queue - * less the current <tt>size</tt> of this queue. - * - * <p>Note that you <em>cannot</em> always tell if an attempt to insert - * an element will succeed by inspecting <tt>remainingCapacity</tt> - * because it may be the case that another thread is about to - * insert or remove an element. - */ - public override int RemainingCapacity - { - get - { - return capacity - count; - } - } - - /** - * Inserts the specified element at the tail of this queue, waiting if - * necessary for space to become available. - * - * @throws InterruptedException {@inheritDoc} - * @throws NullPointerException {@inheritDoc} - */ - public override void EnqueueBlocking(Object e) - { - if (e == null) throw new ArgumentNullException("Object must not be null"); - // Note: convention in all put/take/etc is to preset - // local var holding count negative to indicate failure unless set. - int c = -1; - lock (putLock) - { - /* - * Note that count is used in wait guard even though it is - * not protected by lock. This works because count can - * only decrease at this point (all other puts are shut - * out by lock), and we (or some other waiting put) are - * signalled if it ever changes from - * capacity. Similarly for all other uses of count in - * other wait guards. - */ - while (count == capacity) - { - Monitor.Wait(putLock); - } - - Insert(e); - lock(this) - { - c = count++; - } - if (c + 1 < capacity) - { - Monitor.Pulse(putLock); - } - } - - if (c == 0) - { - SignalNotEmpty(); - } - } - - /** - * Inserts the specified element at the tail of this queue if it is - * possible to do so immediately without exceeding the queue's capacity, - * returning <tt>true</tt> upon success and <tt>false</tt> if this queue - * is full. - * When using a capacity-restricted queue, this method is generally - * preferable to method {@link BlockingQueue#add add}, which can fail to - * insert an element only by throwing an exception. - * - * @throws NullPointerException if the specified element is null - */ - public override bool EnqueueNoThrow(Object e) - { - if (e == null) throw new ArgumentNullException("e must not be null"); - if (count == capacity) - { - return false; - } - int c = -1; - lock (putLock) - { - if (count < capacity) - { - Insert(e); - lock (this) - { - c = count++; - } - if (c + 1 < capacity) - { - Monitor.Pulse(putLock); - } - } - } - if (c == 0) - { - SignalNotEmpty(); - } - return c >= 0; - } - - /** - * Retrieves and removes the head of this queue, waiting if necessary - * until an element becomes available. - * - * @return the head of this queue - * @throws InterruptedException if interrupted while waiting - */ - public override Object DequeueBlocking() - { - Object x; - int c = -1; - lock (takeLock) - { - - while (count == 0) - { - Monitor.Wait(takeLock); - } - - - x = Extract(); - lock (this) { c = count--; } - if (c > 1) - { - Monitor.Pulse(takeLock); - } - } - if (c == capacity) - { - SignalNotFull(); - } - return x; - } - - public Object Poll() - { - if (count == 0) - { - return null; - } - Object x = null; - int c = -1; - lock (takeLock) - { - if (count > 0) - { - x = Extract(); - lock (this) { c = count--; } - if (c > 1) - { - Monitor.Pulse(takeLock); - } - } - } - if (c == capacity) - { - SignalNotFull(); - } - return x; - } - - - public override Object Peek() - { - if (count == 0) - { - return null; - } - lock (takeLock) - { - Node first = head.next; - if (first == null) - { - return null; - } - else - { - return first.item; - } - } - } - - public override String ToString() - { - lock (putLock) - { - lock (takeLock) - { - return base.ToString(); - } - } - } - - /** - * Atomically removes all of the elements from this queue. - * The queue will be empty after this call returns. - */ - public override void Clear() - { - lock (putLock) - { - lock (takeLock) - { - head.next = null; - last = head; - int c; - lock (this) - { - c = count; - count = 0; - } - if (c == capacity) - { - Monitor.PulseAll(putLock); - } - } - } - } - } -} - - diff --git a/dotnet/Qpid.Common/Collections/LinkedHashtable.cs b/dotnet/Qpid.Common/Collections/LinkedHashtable.cs deleted file mode 100644 index 10ab5c674d..0000000000 --- a/dotnet/Qpid.Common/Collections/LinkedHashtable.cs +++ /dev/null @@ -1,327 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Collections; - -namespace Apache.Qpid.Collections -{ - public class LinkedHashtable : IDictionary - { - /// <summary> - /// Maps from key to LinkedDictionaryEntry - /// </summary> - private Hashtable _indexedValues = new Hashtable(); - - private LinkedDictionaryEntry _head; - - private LinkedDictionaryEntry _tail; - - private class LinkedDictionaryEntry - { - public LinkedDictionaryEntry _previous; - public LinkedDictionaryEntry _next; - internal DictionaryEntry _entry; - - public LinkedDictionaryEntry(object key, object value) - { - _entry = new DictionaryEntry(key, value); - } - } - - public object this[object key] - { - get - { - LinkedDictionaryEntry entry = (LinkedDictionaryEntry)_indexedValues[key]; - if (entry == null) - { - return null; // key not found - } - else - { - return entry._entry.Value; - } - } - - set - { - LinkedDictionaryEntry entry = (LinkedDictionaryEntry)_indexedValues[key]; - if (entry == null) - { - Add(key, value); - } - else - { - entry._entry.Value = value; - } - } - } - - /// <summary> - /// Collect keys in linked order. - /// </summary> - public ICollection Keys - { - get - { - IList result = new ArrayList(); - foreach (DictionaryEntry entry in this) - { - result.Add(entry.Key); - } - return result; - } - } - - /// <summary> - /// Collect values in linked order. - /// </summary> - public ICollection Values - { - get - { - IList result = new ArrayList(); - foreach (DictionaryEntry entry in this) - { - result.Add(entry.Value); - } - return result; - } - } - - public bool IsReadOnly - { - get { return _indexedValues.IsReadOnly; } - } - - public bool IsFixedSize - { - get { return _indexedValues.IsFixedSize; } - } - - public bool Contains(object key) - { - return _indexedValues.Contains(key); - } - - public void Add(object key, object value) - { - if (key == null) throw new ArgumentNullException("key"); - - if (Contains(key)) - { - throw new ArgumentException("LinkedHashtable already contains key. key=" + key); - } - - LinkedDictionaryEntry de = new LinkedDictionaryEntry(key, value); - if (_head == null) - { - _head = de; - _tail = de; - } - else - { - _tail._next = de; - de._previous = _tail; - _tail = de; - } - _indexedValues[key] = de; - } - - public void Clear() - { - _indexedValues.Clear(); - } - - IDictionaryEnumerator IDictionary.GetEnumerator() - { - return new LHTEnumerator(this); - } - - public void Remove(object key) - { - if (key == null) throw new ArgumentNullException("key"); - - LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key]; - if (de == null) return; // key not found. - LinkedDictionaryEntry prev = de._previous; - if (prev == null) - { - _head = de._next; - } - else - { - prev._next = de._next; - } - - LinkedDictionaryEntry next = de._next; - if (next == null) - { - _tail = de; - } - else - { - next._previous = de._previous; - } - } - - private LinkedDictionaryEntry Head - { - get - { - return _head; - } - } - -// private LinkedDictionaryEntry Tail -// { -// get -// { -// return _tail; -// } -// } - - private class LHTEnumerator : IDictionaryEnumerator - { - private LinkedHashtable _container; - - private LinkedDictionaryEntry _current; - - /// <summary> - /// Set once we have navigated off the end of the collection - /// </summary> - private bool _needsReset = false; - - public LHTEnumerator(LinkedHashtable container) - { - _container = container; - } - - public object Current - { - get - { - if (_current == null) - { - throw new Exception("Iterator before first element"); - } - else - { - return _current._entry; - } - } - } - - public object Key - { - get { return _current._entry.Key; } - } - - public object Value - { - get { return _current._entry.Value; } - } - - public DictionaryEntry Entry - { - get - { - return _current._entry; - } - } - - public bool MoveNext() - { - if (_needsReset) - { - return false; - } - else if (_current == null) - { - _current = _container.Head; - } - else - { - _current = _current._next; - } - _needsReset = (_current == null); - return !_needsReset; - } - - public void Reset() - { - _current = null; - _needsReset = false; - } - } - - public void MoveToHead(object key) - { - LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key]; - if (de == null) - { - throw new ArgumentException("Key " + key + " not found"); - } - // if the head is the element then there is nothing to do - if (_head == de) - { - return; - } - de._previous._next = de._next; - if (de._next != null) - { - de._next._previous = de._previous; - } - else - { - _tail = de._previous; - } - de._next = _head; - _head = de; - de._previous = null; - } - - public void CopyTo(Array array, int index) - { - _indexedValues.CopyTo(array, index); - } - - public int Count - { - get { return _indexedValues.Count; } - } - - public object SyncRoot - { - get { return _indexedValues.SyncRoot; } - } - - public bool IsSynchronized - { - get { return _indexedValues.IsSynchronized; } - } - - public IEnumerator GetEnumerator() - { - return new LHTEnumerator(this); - } - } -} diff --git a/dotnet/Qpid.Common/Collections/SynchronousQueue.cs b/dotnet/Qpid.Common/Collections/SynchronousQueue.cs deleted file mode 100644 index 3c12df6067..0000000000 --- a/dotnet/Qpid.Common/Collections/SynchronousQueue.cs +++ /dev/null @@ -1,375 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Threading; - -namespace Apache.Qpid.Collections -{ - public class SynchronousQueue : BlockingQueue - { - /// <summary> - /// Lock protecting both wait queues - /// </summary> -// private readonly object _qlock = new object(); - - /// <summary> - /// Queue holding waiting puts - /// </summary> -// private readonly WaitQueue _waitingProducers; - - /// <summary> - /// Queue holding waiting takes - /// </summary> -// private readonly WaitQueue _waitingConsumers; - - /** - * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below. - * These queues have all transient fields, but are serializable - * in order to recover fairness settings when deserialized. - */ - internal abstract class WaitQueue - { - /** Creates, adds, and returns node for x. */ - internal abstract Node Enq(Object x); - /** Removes and returns node, or null if empty. */ - internal abstract Node Deq(); - /** Removes a cancelled node to avoid garbage retention. */ - internal abstract void Unlink(Node node); - /** Returns true if a cancelled node might be on queue. */ - internal abstract bool ShouldUnlink(Node node); - } - - /** - * FIFO queue to hold waiting puts/takes. - */ - sealed class FifoWaitQueue : WaitQueue - { - private Node head; - private Node last; - - internal override Node Enq(Object x) - { - Node p = new Node(x); - if (last == null) - { - last = head = p; - } - else - { - last = last.next = p; - } - return p; - } - - internal override Node Deq() - { - Node p = head; - if (p != null) - { - if ((head = p.next) == null) - { - last = null; - } - p.next = null; - } - return p; - } - - internal override bool ShouldUnlink(Node node) - { - return (node == last || node.next != null); - } - - internal override void Unlink(Node node) - { - Node p = head; - Node trail = null; - while (p != null) - { - if (p == node) - { - Node next = p.next; - if (trail == null) - { - head = next; - } - else - { - trail.next = next; - } - if (last == node) - { - last = trail; - } - break; - } - trail = p; - p = p.next; - } - } - } - - /** - * LIFO queue to hold waiting puts/takes. - */ - sealed class LifoWaitQueue : WaitQueue - { - private Node head; - - internal override Node Enq(Object x) - { - return head = new Node(x, head); - } - - internal override Node Deq() - { - Node p = head; - if (p != null) - { - head = p.next; - p.next = null; - } - return p; - } - - internal override bool ShouldUnlink(Node node) - { - // Return false if already dequeued or is bottom node (in which - // case we might retain at most one garbage node) - return (node == head || node.next != null); - } - - internal override void Unlink(Node node) - { - Node p = head; - Node trail = null; - while (p != null) - { - if (p == node) - { - Node next = p.next; - if (trail == null) - head = next; - else - trail.next = next; - break; - } - trail = p; - p = p.next; - } - } - } - - /** - * Nodes each maintain an item and handle waits and signals for - * getting and setting it. The class extends - * AbstractQueuedSynchronizer to manage blocking, using AQS state - * 0 for waiting, 1 for ack, -1 for cancelled. - */ - sealed internal class Node - { - - /** Synchronization state value representing that node acked */ - private const int ACK = 1; - /** Synchronization state value representing that node cancelled */ - private const int CANCEL = -1; - - internal int state = 0; - - /** The item being transferred */ - internal Object item; - /** Next node in wait queue */ - internal Node next; - - /** Creates a node with initial item */ - internal Node(Object x) - { - item = x; - } - - /** Creates a node with initial item and next */ - internal Node(Object x, Node n) - { - item = x; - next = n; - } - - /** - * Takes item and nulls out field (for sake of GC) - * - * PRE: lock owned - */ - private Object Extract() - { - Object x = item; - item = null; - return x; - } - - /** - * Tries to cancel on interrupt; if so rethrowing, - * else setting interrupt state - * - * PRE: lock owned - */ - /*private void checkCancellationOnInterrupt(InterruptedException ie) - throws InterruptedException - { - if (state == 0) { - state = CANCEL; - notify(); - throw ie; - } - Thread.currentThread().interrupt(); - }*/ - - /** - * Fills in the slot created by the consumer and signal consumer to - * continue. - */ - internal bool SetItem(Object x) - { - lock (this) - { - if (state != 0) return false; - item = x; - state = ACK; - Monitor.Pulse(this); - return true; - } - } - - /** - * Removes item from slot created by producer and signal producer - * to continue. - */ - internal Object GetItem() - { - if (state != 0) return null; - state = ACK; - Monitor.Pulse(this); - return Extract(); - } - - /** - * Waits for a consumer to take item placed by producer. - */ - internal void WaitForTake() //throws InterruptedException { - { - while (state == 0) - { - Monitor.Wait(this); - } - } - - /** - * Waits for a producer to put item placed by consumer. - */ - internal object WaitForPut() - { - lock (this) - { - while (state == 0) Monitor.Wait(this); - } - return Extract(); - } - - private bool Attempt(long nanos) - { - if (state != 0) return true; - if (nanos <= 0) { - state = CANCEL; - Monitor.Pulse(this); - return false; - } - - while (true) - { - Monitor.Wait(nanos); - //TimeUnit.NANOSECONDS.timedWait(this, nanos); - if (state != 0) - { - return true; - } - //nanos = deadline - Utils.nanoTime(); - //if (nanos <= 0) - else - { - state = CANCEL; - Monitor.Pulse(this); - return false; - } - } - } - - /** - * Waits for a consumer to take item placed by producer or time out. - */ - internal bool WaitForTake(long nanos) - { - return Attempt(nanos); - } - - /** - * Waits for a producer to put item placed by consumer, or time out. - */ - internal object WaitForPut(long nanos) - { - if (!Attempt(nanos)) - { - return null; - } - else - { - return Extract(); - } - } - } - - public SynchronousQueue(bool strict) - { - // TODO !!!! - } - - public override bool EnqueueNoThrow(object e) - { - throw new NotImplementedException(); - } - - public override void EnqueueBlocking(object e) - { - throw new NotImplementedException(); - } - - public override object DequeueBlocking() - { - throw new NotImplementedException(); - } - - public override int RemainingCapacity - { - get - { - throw new NotImplementedException(); - } - } - } -} |
