summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Common/Collections
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Common/Collections')
-rw-r--r--dotnet/Qpid.Common/Collections/BlockingQueue.cs95
-rw-r--r--dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs113
-rw-r--r--dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs384
-rw-r--r--dotnet/Qpid.Common/Collections/LinkedHashtable.cs327
-rw-r--r--dotnet/Qpid.Common/Collections/SynchronousQueue.cs375
5 files changed, 0 insertions, 1294 deletions
diff --git a/dotnet/Qpid.Common/Collections/BlockingQueue.cs b/dotnet/Qpid.Common/Collections/BlockingQueue.cs
deleted file mode 100644
index dcfacf8474..0000000000
--- a/dotnet/Qpid.Common/Collections/BlockingQueue.cs
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-
-namespace Apache.Qpid.Collections
-{
- public abstract class BlockingQueue : Queue
- {
- /**
- * Inserts the specified element into this queue if it is possible to do
- * so immediately without violating capacity restrictions, returning
- * <tt>true</tt> upon success and <tt>false</tt> if no space is currently
- * available. When using a capacity-restricted queue, this method is
- * generally preferable to {@link #add}, which can fail to insert an
- * element only by throwing an exception.
- *
- * @param e the element to add
- * @return <tt>true</tt> if the element was added to this queue, else
- * <tt>false</tt>
- * @throws ClassCastException if the class of the specified element
- * prevents it from being added to this queue
- * @throws NullPointerException if the specified element is null
- * @throws IllegalArgumentException if some property of the specified
- * element prevents it from being added to this queue
- */
- public abstract bool EnqueueNoThrow(Object e);
-
- /**
- * Inserts the specified element into this queue, waiting if necessary
- * for space to become available.
- *
- * @param e the element to add
- * @throws InterruptedException if interrupted while waiting
- * @throws ClassCastException if the class of the specified element
- * prevents it from being added to this queue
- * @throws NullPointerException if the specified element is null
- * @throws IllegalArgumentException if some property of the specified
- * element prevents it from being added to this queue
- */
- public abstract void EnqueueBlocking(object e);
-
- /**
- * Retrieves and removes the head of this queue, waiting up to the
- * specified wait time if necessary for an element to become available.
- *
- * @param timeout how long to wait before giving up, in units of
- * <tt>unit</tt>
- * @param unit a <tt>TimeUnit</tt> determining how to interpret the
- * <tt>timeout</tt> parameter
- * @return the head of this queue, or <tt>null</tt> if the
- * specified waiting time elapses before an element is available
- * @throws InterruptedException if interrupted while waiting
- */
- public abstract object DequeueBlocking();
-
- /**
- * Returns the number of additional elements that this queue can ideally
- * (in the absence of memory or resource constraints) accept without
- * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic
- * limit.
- *
- * <p>Note that you <em>cannot</em> always tell if an attempt to insert
- * an element will succeed by inspecting <tt>remainingCapacity</tt>
- * because it may be the case that another thread is about to
- * insert or remove an element.
- *
- * @return the remaining capacity
- */
- public abstract int RemainingCapacity
- {
- get;
- }
- }
-}
-
-
diff --git a/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs b/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
deleted file mode 100644
index ea4526faaf..0000000000
--- a/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Threading;
-
-
-namespace Apache.Qpid.Collections
-{
- /// <summary>
- /// Simple FIFO queue to support multi-threaded consumer
- /// and producers. It supports timeouts in dequeue operations.
- /// </summary>
- public sealed class ConsumerProducerQueue
- {
- private Queue _queue = new Queue();
- private WaitSemaphore _semaphore = new WaitSemaphore();
-
- /// <summary>
- /// Put an item into the tail of the queue
- /// </summary>
- /// <param name="item"></param>
- public void Enqueue(object item)
- {
- lock ( _queue.SyncRoot )
- {
- _queue.Enqueue(item);
- _semaphore.Increment();
- }
- }
-
- /// <summary>
- /// Wait indefinitely for an item to be available
- /// on the queue.
- /// </summary>
- /// <returns>The object at the head of the queue</returns>
- public object Dequeue()
- {
- return Dequeue(Timeout.Infinite);
- }
-
- /// <summary>
- /// Wait up to the number of milliseconds specified
- /// for an item to be available on the queue
- /// </summary>
- /// <param name="timeout">Number of milliseconds to wait</param>
- /// <returns>The object at the head of the queue, or null
- /// if the timeout expires</returns>
- public object Dequeue(long timeout)
- {
- if ( _semaphore.Decrement(timeout) )
- {
- lock ( _queue.SyncRoot )
- {
- return _queue.Dequeue();
- }
- }
- return null;
- }
-
- #region Simple Semaphore
- //
- // Simple Semaphore
- //
-
- class WaitSemaphore
- {
- private int _count;
- private AutoResetEvent _event = new AutoResetEvent(false);
-
- public void Increment()
- {
- Interlocked.Increment(ref _count);
- _event.Set();
- }
-
- public bool Decrement(long timeout)
- {
- if ( timeout > int.MaxValue )
- throw new ArgumentOutOfRangeException("timeout", timeout, "Must be <= Int32.MaxValue");
-
- int millis = (int) (timeout & 0x7FFFFFFF);
- if ( Interlocked.Decrement(ref _count) > 0 )
- {
- // there are messages in queue, so no need to wait
- return true;
- } else
- {
- return _event.WaitOne(millis, false);
- }
- }
- }
- #endregion // Simple Semaphore
- }
-}
diff --git a/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs b/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
deleted file mode 100644
index be92576951..0000000000
--- a/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-
-namespace Apache.Qpid.Collections
-{
- public class LinkedBlockingQueue : BlockingQueue
- {
-
- /*
- * A variant of the "two lock queue" algorithm. The putLock gates
- * entry to put (and offer), and has an associated condition for
- * waiting puts. Similarly for the takeLock. The "count" field
- * that they both rely on is maintained as an atomic to avoid
- * needing to get both locks in most cases. Also, to minimize need
- * for puts to get takeLock and vice-versa, cascading notifies are
- * used. When a put notices that it has enabled at least one take,
- * it signals taker. That taker in turn signals others if more
- * items have been entered since the signal. And symmetrically for
- * takes signalling puts. Operations such as remove(Object) and
- * iterators acquire both locks.
- */
-
- /**
- * Linked list node class
- */
- internal class Node
- {
- /** The item, volatile to ensure barrier separating write and read */
- internal volatile Object item;
- internal Node next;
- internal Node(Object x) { item = x; }
- }
-
- /** The capacity bound, or Integer.MAX_VALUE if none */
- private readonly int capacity;
-
- /** Current number of elements */
- private volatile int count = 0;
-
- /** Head of linked list */
- private Node head;
-
- /** Tail of linked list */
- private Node last;
-
- /** Lock held by take, poll, etc */
- private readonly object takeLock = new Object(); //new SerializableLock();
-
- /** Lock held by put, offer, etc */
- private readonly object putLock = new Object();//new SerializableLock();
-
- /**
- * Signals a waiting take. Called only from put/offer (which do not
- * otherwise ordinarily lock takeLock.)
- */
- private void SignalNotEmpty()
- {
- lock (takeLock)
- {
- Monitor.Pulse(takeLock);
- }
- }
-
- /**
- * Signals a waiting put. Called only from take/poll.
- */
- private void SignalNotFull()
- {
- lock (putLock)
- {
- Monitor.Pulse(putLock);
- }
- }
-
- /**
- * Creates a node and links it at end of queue.
- * @param x the item
- */
- private void Insert(Object x)
- {
- last = last.next = new Node(x);
- }
-
- /**
- * Removes a node from head of queue,
- * @return the node
- */
- private Object Extract()
- {
- Node first = head.next;
- head = first;
- Object x = first.item;
- first.item = null;
- return x;
- }
-
-
- /**
- * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
- * {@link Integer#MAX_VALUE}.
- */
- public LinkedBlockingQueue() : this(Int32.MaxValue)
- {
- }
-
- /**
- * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
- *
- * @param capacity the capacity of this queue
- * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
- * than zero
- */
- public LinkedBlockingQueue(int capacity)
- {
- if (capacity <= 0) throw new ArgumentException("Capacity must be positive, was passed " + capacity);
- this.capacity = capacity;
- last = head = new Node(null);
- }
-
- // this doc comment is overridden to remove the reference to collections
- // greater in size than Integer.MAX_VALUE
- /**
- * Returns the number of elements in this queue.
- *
- * @return the number of elements in this queue
- */
- public int Size
- {
- get
- {
- return count;
- }
- }
-
- // this doc comment is a modified copy of the inherited doc comment,
- // without the reference to unlimited queues.
- /**
- * Returns the number of additional elements that this queue can ideally
- * (in the absence of memory or resource constraints) accept without
- * blocking. This is always equal to the initial capacity of this queue
- * less the current <tt>size</tt> of this queue.
- *
- * <p>Note that you <em>cannot</em> always tell if an attempt to insert
- * an element will succeed by inspecting <tt>remainingCapacity</tt>
- * because it may be the case that another thread is about to
- * insert or remove an element.
- */
- public override int RemainingCapacity
- {
- get
- {
- return capacity - count;
- }
- }
-
- /**
- * Inserts the specified element at the tail of this queue, waiting if
- * necessary for space to become available.
- *
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public override void EnqueueBlocking(Object e)
- {
- if (e == null) throw new ArgumentNullException("Object must not be null");
- // Note: convention in all put/take/etc is to preset
- // local var holding count negative to indicate failure unless set.
- int c = -1;
- lock (putLock)
- {
- /*
- * Note that count is used in wait guard even though it is
- * not protected by lock. This works because count can
- * only decrease at this point (all other puts are shut
- * out by lock), and we (or some other waiting put) are
- * signalled if it ever changes from
- * capacity. Similarly for all other uses of count in
- * other wait guards.
- */
- while (count == capacity)
- {
- Monitor.Wait(putLock);
- }
-
- Insert(e);
- lock(this)
- {
- c = count++;
- }
- if (c + 1 < capacity)
- {
- Monitor.Pulse(putLock);
- }
- }
-
- if (c == 0)
- {
- SignalNotEmpty();
- }
- }
-
- /**
- * Inserts the specified element at the tail of this queue if it is
- * possible to do so immediately without exceeding the queue's capacity,
- * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
- * is full.
- * When using a capacity-restricted queue, this method is generally
- * preferable to method {@link BlockingQueue#add add}, which can fail to
- * insert an element only by throwing an exception.
- *
- * @throws NullPointerException if the specified element is null
- */
- public override bool EnqueueNoThrow(Object e)
- {
- if (e == null) throw new ArgumentNullException("e must not be null");
- if (count == capacity)
- {
- return false;
- }
- int c = -1;
- lock (putLock)
- {
- if (count < capacity)
- {
- Insert(e);
- lock (this)
- {
- c = count++;
- }
- if (c + 1 < capacity)
- {
- Monitor.Pulse(putLock);
- }
- }
- }
- if (c == 0)
- {
- SignalNotEmpty();
- }
- return c >= 0;
- }
-
- /**
- * Retrieves and removes the head of this queue, waiting if necessary
- * until an element becomes available.
- *
- * @return the head of this queue
- * @throws InterruptedException if interrupted while waiting
- */
- public override Object DequeueBlocking()
- {
- Object x;
- int c = -1;
- lock (takeLock)
- {
-
- while (count == 0)
- {
- Monitor.Wait(takeLock);
- }
-
-
- x = Extract();
- lock (this) { c = count--; }
- if (c > 1)
- {
- Monitor.Pulse(takeLock);
- }
- }
- if (c == capacity)
- {
- SignalNotFull();
- }
- return x;
- }
-
- public Object Poll()
- {
- if (count == 0)
- {
- return null;
- }
- Object x = null;
- int c = -1;
- lock (takeLock)
- {
- if (count > 0)
- {
- x = Extract();
- lock (this) { c = count--; }
- if (c > 1)
- {
- Monitor.Pulse(takeLock);
- }
- }
- }
- if (c == capacity)
- {
- SignalNotFull();
- }
- return x;
- }
-
-
- public override Object Peek()
- {
- if (count == 0)
- {
- return null;
- }
- lock (takeLock)
- {
- Node first = head.next;
- if (first == null)
- {
- return null;
- }
- else
- {
- return first.item;
- }
- }
- }
-
- public override String ToString()
- {
- lock (putLock)
- {
- lock (takeLock)
- {
- return base.ToString();
- }
- }
- }
-
- /**
- * Atomically removes all of the elements from this queue.
- * The queue will be empty after this call returns.
- */
- public override void Clear()
- {
- lock (putLock)
- {
- lock (takeLock)
- {
- head.next = null;
- last = head;
- int c;
- lock (this)
- {
- c = count;
- count = 0;
- }
- if (c == capacity)
- {
- Monitor.PulseAll(putLock);
- }
- }
- }
- }
- }
-}
-
-
diff --git a/dotnet/Qpid.Common/Collections/LinkedHashtable.cs b/dotnet/Qpid.Common/Collections/LinkedHashtable.cs
deleted file mode 100644
index 10ab5c674d..0000000000
--- a/dotnet/Qpid.Common/Collections/LinkedHashtable.cs
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-
-namespace Apache.Qpid.Collections
-{
- public class LinkedHashtable : IDictionary
- {
- /// <summary>
- /// Maps from key to LinkedDictionaryEntry
- /// </summary>
- private Hashtable _indexedValues = new Hashtable();
-
- private LinkedDictionaryEntry _head;
-
- private LinkedDictionaryEntry _tail;
-
- private class LinkedDictionaryEntry
- {
- public LinkedDictionaryEntry _previous;
- public LinkedDictionaryEntry _next;
- internal DictionaryEntry _entry;
-
- public LinkedDictionaryEntry(object key, object value)
- {
- _entry = new DictionaryEntry(key, value);
- }
- }
-
- public object this[object key]
- {
- get
- {
- LinkedDictionaryEntry entry = (LinkedDictionaryEntry)_indexedValues[key];
- if (entry == null)
- {
- return null; // key not found
- }
- else
- {
- return entry._entry.Value;
- }
- }
-
- set
- {
- LinkedDictionaryEntry entry = (LinkedDictionaryEntry)_indexedValues[key];
- if (entry == null)
- {
- Add(key, value);
- }
- else
- {
- entry._entry.Value = value;
- }
- }
- }
-
- /// <summary>
- /// Collect keys in linked order.
- /// </summary>
- public ICollection Keys
- {
- get
- {
- IList result = new ArrayList();
- foreach (DictionaryEntry entry in this)
- {
- result.Add(entry.Key);
- }
- return result;
- }
- }
-
- /// <summary>
- /// Collect values in linked order.
- /// </summary>
- public ICollection Values
- {
- get
- {
- IList result = new ArrayList();
- foreach (DictionaryEntry entry in this)
- {
- result.Add(entry.Value);
- }
- return result;
- }
- }
-
- public bool IsReadOnly
- {
- get { return _indexedValues.IsReadOnly; }
- }
-
- public bool IsFixedSize
- {
- get { return _indexedValues.IsFixedSize; }
- }
-
- public bool Contains(object key)
- {
- return _indexedValues.Contains(key);
- }
-
- public void Add(object key, object value)
- {
- if (key == null) throw new ArgumentNullException("key");
-
- if (Contains(key))
- {
- throw new ArgumentException("LinkedHashtable already contains key. key=" + key);
- }
-
- LinkedDictionaryEntry de = new LinkedDictionaryEntry(key, value);
- if (_head == null)
- {
- _head = de;
- _tail = de;
- }
- else
- {
- _tail._next = de;
- de._previous = _tail;
- _tail = de;
- }
- _indexedValues[key] = de;
- }
-
- public void Clear()
- {
- _indexedValues.Clear();
- }
-
- IDictionaryEnumerator IDictionary.GetEnumerator()
- {
- return new LHTEnumerator(this);
- }
-
- public void Remove(object key)
- {
- if (key == null) throw new ArgumentNullException("key");
-
- LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key];
- if (de == null) return; // key not found.
- LinkedDictionaryEntry prev = de._previous;
- if (prev == null)
- {
- _head = de._next;
- }
- else
- {
- prev._next = de._next;
- }
-
- LinkedDictionaryEntry next = de._next;
- if (next == null)
- {
- _tail = de;
- }
- else
- {
- next._previous = de._previous;
- }
- }
-
- private LinkedDictionaryEntry Head
- {
- get
- {
- return _head;
- }
- }
-
-// private LinkedDictionaryEntry Tail
-// {
-// get
-// {
-// return _tail;
-// }
-// }
-
- private class LHTEnumerator : IDictionaryEnumerator
- {
- private LinkedHashtable _container;
-
- private LinkedDictionaryEntry _current;
-
- /// <summary>
- /// Set once we have navigated off the end of the collection
- /// </summary>
- private bool _needsReset = false;
-
- public LHTEnumerator(LinkedHashtable container)
- {
- _container = container;
- }
-
- public object Current
- {
- get
- {
- if (_current == null)
- {
- throw new Exception("Iterator before first element");
- }
- else
- {
- return _current._entry;
- }
- }
- }
-
- public object Key
- {
- get { return _current._entry.Key; }
- }
-
- public object Value
- {
- get { return _current._entry.Value; }
- }
-
- public DictionaryEntry Entry
- {
- get
- {
- return _current._entry;
- }
- }
-
- public bool MoveNext()
- {
- if (_needsReset)
- {
- return false;
- }
- else if (_current == null)
- {
- _current = _container.Head;
- }
- else
- {
- _current = _current._next;
- }
- _needsReset = (_current == null);
- return !_needsReset;
- }
-
- public void Reset()
- {
- _current = null;
- _needsReset = false;
- }
- }
-
- public void MoveToHead(object key)
- {
- LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key];
- if (de == null)
- {
- throw new ArgumentException("Key " + key + " not found");
- }
- // if the head is the element then there is nothing to do
- if (_head == de)
- {
- return;
- }
- de._previous._next = de._next;
- if (de._next != null)
- {
- de._next._previous = de._previous;
- }
- else
- {
- _tail = de._previous;
- }
- de._next = _head;
- _head = de;
- de._previous = null;
- }
-
- public void CopyTo(Array array, int index)
- {
- _indexedValues.CopyTo(array, index);
- }
-
- public int Count
- {
- get { return _indexedValues.Count; }
- }
-
- public object SyncRoot
- {
- get { return _indexedValues.SyncRoot; }
- }
-
- public bool IsSynchronized
- {
- get { return _indexedValues.IsSynchronized; }
- }
-
- public IEnumerator GetEnumerator()
- {
- return new LHTEnumerator(this);
- }
- }
-}
diff --git a/dotnet/Qpid.Common/Collections/SynchronousQueue.cs b/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
deleted file mode 100644
index 3c12df6067..0000000000
--- a/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-
-namespace Apache.Qpid.Collections
-{
- public class SynchronousQueue : BlockingQueue
- {
- /// <summary>
- /// Lock protecting both wait queues
- /// </summary>
-// private readonly object _qlock = new object();
-
- /// <summary>
- /// Queue holding waiting puts
- /// </summary>
-// private readonly WaitQueue _waitingProducers;
-
- /// <summary>
- /// Queue holding waiting takes
- /// </summary>
-// private readonly WaitQueue _waitingConsumers;
-
- /**
- * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
- * These queues have all transient fields, but are serializable
- * in order to recover fairness settings when deserialized.
- */
- internal abstract class WaitQueue
- {
- /** Creates, adds, and returns node for x. */
- internal abstract Node Enq(Object x);
- /** Removes and returns node, or null if empty. */
- internal abstract Node Deq();
- /** Removes a cancelled node to avoid garbage retention. */
- internal abstract void Unlink(Node node);
- /** Returns true if a cancelled node might be on queue. */
- internal abstract bool ShouldUnlink(Node node);
- }
-
- /**
- * FIFO queue to hold waiting puts/takes.
- */
- sealed class FifoWaitQueue : WaitQueue
- {
- private Node head;
- private Node last;
-
- internal override Node Enq(Object x)
- {
- Node p = new Node(x);
- if (last == null)
- {
- last = head = p;
- }
- else
- {
- last = last.next = p;
- }
- return p;
- }
-
- internal override Node Deq()
- {
- Node p = head;
- if (p != null)
- {
- if ((head = p.next) == null)
- {
- last = null;
- }
- p.next = null;
- }
- return p;
- }
-
- internal override bool ShouldUnlink(Node node)
- {
- return (node == last || node.next != null);
- }
-
- internal override void Unlink(Node node)
- {
- Node p = head;
- Node trail = null;
- while (p != null)
- {
- if (p == node)
- {
- Node next = p.next;
- if (trail == null)
- {
- head = next;
- }
- else
- {
- trail.next = next;
- }
- if (last == node)
- {
- last = trail;
- }
- break;
- }
- trail = p;
- p = p.next;
- }
- }
- }
-
- /**
- * LIFO queue to hold waiting puts/takes.
- */
- sealed class LifoWaitQueue : WaitQueue
- {
- private Node head;
-
- internal override Node Enq(Object x)
- {
- return head = new Node(x, head);
- }
-
- internal override Node Deq()
- {
- Node p = head;
- if (p != null)
- {
- head = p.next;
- p.next = null;
- }
- return p;
- }
-
- internal override bool ShouldUnlink(Node node)
- {
- // Return false if already dequeued or is bottom node (in which
- // case we might retain at most one garbage node)
- return (node == head || node.next != null);
- }
-
- internal override void Unlink(Node node)
- {
- Node p = head;
- Node trail = null;
- while (p != null)
- {
- if (p == node)
- {
- Node next = p.next;
- if (trail == null)
- head = next;
- else
- trail.next = next;
- break;
- }
- trail = p;
- p = p.next;
- }
- }
- }
-
- /**
- * Nodes each maintain an item and handle waits and signals for
- * getting and setting it. The class extends
- * AbstractQueuedSynchronizer to manage blocking, using AQS state
- * 0 for waiting, 1 for ack, -1 for cancelled.
- */
- sealed internal class Node
- {
-
- /** Synchronization state value representing that node acked */
- private const int ACK = 1;
- /** Synchronization state value representing that node cancelled */
- private const int CANCEL = -1;
-
- internal int state = 0;
-
- /** The item being transferred */
- internal Object item;
- /** Next node in wait queue */
- internal Node next;
-
- /** Creates a node with initial item */
- internal Node(Object x)
- {
- item = x;
- }
-
- /** Creates a node with initial item and next */
- internal Node(Object x, Node n)
- {
- item = x;
- next = n;
- }
-
- /**
- * Takes item and nulls out field (for sake of GC)
- *
- * PRE: lock owned
- */
- private Object Extract()
- {
- Object x = item;
- item = null;
- return x;
- }
-
- /**
- * Tries to cancel on interrupt; if so rethrowing,
- * else setting interrupt state
- *
- * PRE: lock owned
- */
- /*private void checkCancellationOnInterrupt(InterruptedException ie)
- throws InterruptedException
- {
- if (state == 0) {
- state = CANCEL;
- notify();
- throw ie;
- }
- Thread.currentThread().interrupt();
- }*/
-
- /**
- * Fills in the slot created by the consumer and signal consumer to
- * continue.
- */
- internal bool SetItem(Object x)
- {
- lock (this)
- {
- if (state != 0) return false;
- item = x;
- state = ACK;
- Monitor.Pulse(this);
- return true;
- }
- }
-
- /**
- * Removes item from slot created by producer and signal producer
- * to continue.
- */
- internal Object GetItem()
- {
- if (state != 0) return null;
- state = ACK;
- Monitor.Pulse(this);
- return Extract();
- }
-
- /**
- * Waits for a consumer to take item placed by producer.
- */
- internal void WaitForTake() //throws InterruptedException {
- {
- while (state == 0)
- {
- Monitor.Wait(this);
- }
- }
-
- /**
- * Waits for a producer to put item placed by consumer.
- */
- internal object WaitForPut()
- {
- lock (this)
- {
- while (state == 0) Monitor.Wait(this);
- }
- return Extract();
- }
-
- private bool Attempt(long nanos)
- {
- if (state != 0) return true;
- if (nanos <= 0) {
- state = CANCEL;
- Monitor.Pulse(this);
- return false;
- }
-
- while (true)
- {
- Monitor.Wait(nanos);
- //TimeUnit.NANOSECONDS.timedWait(this, nanos);
- if (state != 0)
- {
- return true;
- }
- //nanos = deadline - Utils.nanoTime();
- //if (nanos <= 0)
- else
- {
- state = CANCEL;
- Monitor.Pulse(this);
- return false;
- }
- }
- }
-
- /**
- * Waits for a consumer to take item placed by producer or time out.
- */
- internal bool WaitForTake(long nanos)
- {
- return Attempt(nanos);
- }
-
- /**
- * Waits for a producer to put item placed by consumer, or time out.
- */
- internal object WaitForPut(long nanos)
- {
- if (!Attempt(nanos))
- {
- return null;
- }
- else
- {
- return Extract();
- }
- }
- }
-
- public SynchronousQueue(bool strict)
- {
- // TODO !!!!
- }
-
- public override bool EnqueueNoThrow(object e)
- {
- throw new NotImplementedException();
- }
-
- public override void EnqueueBlocking(object e)
- {
- throw new NotImplementedException();
- }
-
- public override object DequeueBlocking()
- {
- throw new NotImplementedException();
- }
-
- public override int RemainingCapacity
- {
- get
- {
- throw new NotImplementedException();
- }
- }
- }
-}