From 92e00544516bb3dc046e36007e7526750019daa8 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Tue, 22 May 2007 11:35:10 +0000 Subject: Added batch synched queue implementation. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540533 13f79535-47bb-0310-9956-ffa450edef68 --- .../util/concurrent/AlreadyUnblockedException.java | 13 + .../qpid/util/concurrent/BatchSynchQueue.java | 101 +++ .../qpid/util/concurrent/BatchSynchQueueBase.java | 816 +++++++++++++++++++++ .../apache/qpid/util/concurrent/BooleanLatch.java | 107 +++ .../org/apache/qpid/util/concurrent/Capacity.java | 14 + .../apache/qpid/util/concurrent/SynchBuffer.java | 29 + .../qpid/util/concurrent/SynchException.java | 31 + .../apache/qpid/util/concurrent/SynchQueue.java | 27 + .../apache/qpid/util/concurrent/SynchRecord.java | 53 ++ .../org/apache/qpid/util/concurrent/SynchRef.java | 30 + 10 files changed, 1221 insertions(+) create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java new file mode 100644 index 0000000000..4acea0e2ec --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java @@ -0,0 +1,13 @@ +package org.apache.qpid.util.concurrent; + +/** + * Used to signal that a data element and its producer cannot be requeued or sent an error message when using a + * {@link BatchSynchQueue} because the producer has already been unblocked by an unblocking take on the queue. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Signal that an unblocking take has already occurred. + *
+ */ +public class AlreadyUnblockedException extends RuntimeException +{ } diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java new file mode 100644 index 0000000000..cf2abfffb8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java @@ -0,0 +1,101 @@ +package org.apache.qpid.util.concurrent; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; + +/** + * BatchSynchQueue is an abstraction of the classic producer/consumer buffer pattern for thread interaction. In this + * pattern threads can deposit data onto a buffer whilst other threads take data from the buffer and perform usefull + * work with it. A BatchSynchQueue adds to this the possibility that producers can be blocked until their data is + * consumed or until a consumer chooses to release the producer some time after consuming the data from the queue. + * + *

There are a number of possible advantages to using this technique when compared with having the producers + * processing their own data: + * + *

+ * + *

The asynchronous type of producer/consumer buffers is already well supported by the java.util.concurrent package + * (in Java 5) and there is also a synchronous queue implementation available there too. This interface extends the + * blocking queue with some more methods for controlling a synchronous blocking queue. In particular it adds additional + * take methods that can be used to take data from a queue without releasing producers, so that consumers have an + * opportunity to confirm correct processing of the data before producers are released. It also adds a put method with + * exceptions so that consumers can signal exception cases back to producers where there are errors in the data. + * + *

This type of queue is usefull in situations where consumers can obtain an efficiency gain by batching data + * from many threads but where synchronous handling of that data is neccessary because producers need to know that + * their data has been processed before they continue. For example, sending a bundle of messages together, or writing + * many records to disk at once, may result in improved performance but the originators of the messages or disk records + * need confirmation that their data has really been sent or saved to disk. + * + *

The consumer can put an element back onto the queue or send an error message to the elements producer using the + * {@link SynchRecord} interface. + * + *

The {@link #take()}, {@link #drainTo(java.util.Collection)} and + * {@link #drainTo(java.util.Collection, int)} methods from {@link BlockingQueue} should behave as if they + * have been called with unblock set to false. That is they take elements from the queue but leave the producers + * blocked. These methods do not return collections of {@link SynchRecord}s so they do not supply an interface through + * which errors or re-queuings can be applied. If these methods are used then the consumer must succesfully process + * all the records it takes. + * + *

The {@link #put} method should silently swallow any exceptions that consumers attempt to return to the caller. + * In order to handle exceptions the {@link #tryPut} method must be used. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Handle synchronous puts, with possible exceptions. + *
Allow consumers to take many records from a queue in a batch. + *
Allow consumers to decide when to unblock synchronous producers. + *
+ */ +public interface BatchSynchQueue extends BlockingQueue +{ + /** + * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the + * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}. + * + * @param e The data element to put into the queue. + * + * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting + * on its entry in the queue being consumed. + * @throws SynchException If a consumer encounters an error whilst processing the data element. + */ + public void tryPut(E e) throws InterruptedException, SynchException; + + /** + * Takes all available data items from the queue or blocks until some become available. The returned items + * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their + * producers, where the producers are still blocked. + * + * @param c The collection to drain the data items into. + * @param unblock If set to true the producers for the taken items will be immediately unblocked. + * + * @return A count of the number of elements that were drained from the queue. + */ + public SynchRef drainTo(Collection> c, boolean unblock); + + /** + * Takes up to maxElements available data items from the queue or blocks until some become available. The returned + * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their + * producers, where the producers are still blocked. + * + * @param c The collection to drain the data items into. + * @param maxElements The maximum number of elements to drain. + * @param unblock If set to true the producers for the taken items will be immediately unblocked. + * + * @return A count of the number of elements that were drained from the queue. + */ + public SynchRef drainTo(Collection> c, int maxElements, boolean unblock); +} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java new file mode 100644 index 0000000000..521c341ca3 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java @@ -0,0 +1,816 @@ +package org.apache.qpid.util.concurrent; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; + +/** + * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data. + * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being + * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and + * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is + * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch. + * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls. + * + *

Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete + * implementation of this. This queue is only accessed through the methods {@link #insert}, {@link #extract}, + * {@link #getBufferCapacity()}, {@link #peekAtBufferHead()}. An implementation can override these methods to implement + * the buffer other than by a queue, for example, by using an array. + * + *

Normal queue methods to work asynchronously. + *

Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately + * when their data is taken. + *

The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the + * option to keep producers blocked until the consumer decides to release them. + * + *

Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to + * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency + * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io) + * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the + * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an + * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to + * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ * + * @todo To create zero garbage collecting implemention will need to adapt the queue element containers + * (SynchRefImpl) in such a way that one is needed per array element, they can be taken from/put back/cleared in + * the queue without actually being moved from the array and they implement a way of forming them into a + * collection (or Iterable) to pass to consumers (using a linked list scheme?). May not be worth the trouble. + */ +public abstract class BatchSynchQueueBase extends AbstractQueue implements BatchSynchQueue +{ + /** Used for logging. */ + private static final Logger log = Logger.getLogger(BatchSynchQueueBase.class); + + /** Holds a reference to the queue implementation that holds the buffer. */ + Queue> buffer; + + /** Holds the number of items in the queue */ + private int count; + + /** Main lock guarding all access */ + private ReentrantLock lock; + + /** Condition for waiting takes */ + private Condition notEmpty; + + /** Condition for waiting puts */ + private Condition notFull; + + /** + * Creates a batch synch queue without fair thread scheduling. + */ + public BatchSynchQueueBase() + { + this(false); + } + + /** + * Ensures that the underlying buffer implementation is created. + * + * @param fair true if fairness is to be applied to threads waiting to access the buffer. + */ + public BatchSynchQueueBase(boolean fair) + { + buffer = this.createQueue(); + + // Create the buffer lock with the fairness flag set accordingly. + lock = new ReentrantLock(fair); + + // Create the non-empty and non-full condition monitors on the buffer lock. + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); + } + + /** + * Returns an iterator over the elements contained in this collection. + * + * @return An iterator over the elements contained in this collection. + */ + public Iterator iterator() + { + throw new RuntimeException("Not implemented."); + } + + /** + * Returns the number of elements in this collection. If the collection contains more than + * Integer.MAX_VALUE elements, returns Integer.MAX_VALUE. + * + * @return The number of elements in this collection. + */ + public int size() + { + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + return count; + } + finally + { + lock.unlock(); + } + } + + /** + * Inserts the specified element into this queue, if possible. When using queues that may impose insertion + * restrictions (for example capacity bounds), method offer is generally preferable to method + * {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception. + * + * @param e The element to insert. + * + * @return true if it was possible to add the element to this queue, else false + */ + public boolean offer(E e) + { + if (e == null) + { + throw new NullPointerException(); + } + + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + return insert(e, false); + } + finally + { + lock.unlock(); + } + } + + /** + * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to + * become available. + * + * @param e The element to add. + * @param timeout How long to wait before giving up, in units of unit + * @param unit A TimeUnit determining how to interpret the timeout parameter. + * + * @return true if successful, or false if the specified waiting time elapses before space is + * available. + * + * @throws InterruptedException If interrupted while waiting. + * @throws NullPointerException If the specified element is null. + */ + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException + { + if (e == null) + { + throw new NullPointerException(); + } + + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + + long nanos = unit.toNanos(timeout); + + try + { + do + { + if (insert(e, false)) + { + return true; + } + + try + { + nanos = notFull.awaitNanos(nanos); + } + catch (InterruptedException ie) + { + notFull.signal(); // propagate to non-interrupted thread + throw ie; + } + } + while (nanos > 0); + + return false; + } + finally + { + lock.unlock(); + } + } + + /** + * Retrieves and removes the head of this queue, or null if this queue is empty. + * + * @return The head of this queue, or null if this queue is empty. + */ + public E poll() + { + final ReentrantLock lock = this.lock; + + lock.lock(); + try + { + if (count == 0) + { + return null; + } + + E x = extract(true, true).getElement(); + + return x; + } + finally + { + lock.unlock(); + } + } + + /** + * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements + * are present on this queue. + * + * @param timeout How long to wait before giving up, in units of unit. + * @param unit A TimeUnit determining how to interpret the timeout parameter. + * + * @return The head of this queue, or null if the specified waiting time elapses before an element is present. + * + * @throws InterruptedException If interrupted while waiting. + */ + public E poll(long timeout, TimeUnit unit) throws InterruptedException + { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try + { + long nanos = unit.toNanos(timeout); + + do + { + if (count != 0) + { + E x = extract(true, true).getElement(); + + return x; + } + + try + { + nanos = notEmpty.awaitNanos(nanos); + } + catch (InterruptedException ie) + { + notEmpty.signal(); // propagate to non-interrupted thread + throw ie; + } + } + while (nanos > 0); + + return null; + } + finally + { + lock.unlock(); + } + } + + /** + * Retrieves, but does not remove, the head of this queue, returning null if this queue is empty. + * + * @return The head of this queue, or null if this queue is empty. + */ + public E peek() + { + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + return peekAtBufferHead(); + } + finally + { + lock.unlock(); + } + } + + /** + * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints) + * accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit. + * + *

Note that you cannot always tell if an attempt to add an element will succeed by + * inspecting remainingCapacity because it may be the case that another thread is about to put + * or take an element. + * + * @return The remaining capacity. + */ + public int remainingCapacity() + { + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + return getBufferCapacity() - count; + } + finally + { + lock.unlock(); + } + } + + /** + * Adds the specified element to this queue, waiting if necessary for space to become available. + * + *

This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised + * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these + * exceptions. + * + * @param e The element to add. + * + * @throws InterruptedException If interrupted while waiting. + */ + public void put(E e) throws InterruptedException + { + try + { + tryPut(e); + } + catch (SynchException ex) + { + // This exception is deliberately ignored. See the method comment for information about this. + } + } + + /** + * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the + * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}. + * + * @param e The data element to put into the queue. + * + * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting + * on its entry in the queue being consumed. + * @throws SynchException If a consumer encounters an error whilst processing the data element. + */ + public void tryPut(E e) throws InterruptedException, SynchException + { + if (e == null) + { + throw new NullPointerException(); + } + + // final Queue items = this.buffer; + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + + try + { + while (count == getBufferCapacity()) + { + // Release the lock and wait until the queue is not full. + notFull.await(); + } + } + catch (InterruptedException ie) + { + notFull.signal(); // propagate to non-interrupted thread + throw ie; + } + + // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block + // the producer until its data is taken. + insert(e, true); + } + + /** + * Retrieves and removes the head of this queue, waiting if no elements are present on this queue. + * Any producer that has its data element taken by this call will be immediately unblocked. To keep the + * producer blocked whilst taking just a single item, use the + * {@link #drainTo(java.util.Collection>, int, boolean)} + * method. There is no take method to do that because there is not usually any advantage in a synchronous hand + * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption + * latencies accross many producers where possible. + * + * @return The head of this queue. + * + * @throws InterruptedException if interrupted while waiting. + */ + public E take() throws InterruptedException + { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + + try + { + try + { + while (count == 0) + { + // Release the lock and wait until the queue becomes non-empty. + notEmpty.await(); + } + } + catch (InterruptedException ie) + { + notEmpty.signal(); // propagate to non-interrupted thread + throw ie; + } + + // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is + // not full, and unblock the producer that owns the data item that is taken. + E x = extract(true, true).getElement(); + + return x; + } + finally + { + lock.unlock(); + } + } + + /** + * Removes all available elements from this queue and adds them into the given collection. This operation may be + * more efficient than repeatedly polling this queue. A failure encountered while attempting to add elements + * to collection c may result in elements being in neither, either or both collections when the associated + * exception is thrown. Attempts to drain a queue to itself result in IllegalArgumentException. Further, + * the behavior of this operation is undefined if the specified collection is modified while the operation is in + * progress. + * + * @param objects The collection to transfer elements into. + * + * @return The number of elements transferred. + * + * @throws NullPointerException If objects is null. + * @throws IllegalArgumentException If objects is this queue. + */ + public int drainTo(Collection objects) + { + return drainTo(objects, -1); + } + + /** + * Removes at most the given number of available elements from this queue and adds them into the given collection. + * A failure encountered while attempting to add elements to collection c may result in elements + * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue + * to itself result in IllegalArgumentException. Further, the behavior of this operation is undefined if + * the specified collection is modified while the operation is in progress. + * + * @param objects The collection to transfer elements into. + * @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning + * all elements. + * + * @return The number of elements transferred. + * + * @throws NullPointerException If c is null. + * @throws IllegalArgumentException If c is this queue. + */ + public int drainTo(Collection objects, int maxElements) + { + if (objects == null) + { + throw new NullPointerException(); + } + + if (objects == this) + { + throw new IllegalArgumentException(); + } + + // final Queue items = this.buffer; + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + int n = 0; + + for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++) + { + // Take items from the queue, do unblock the producers, but don't send not full signals yet. + objects.add(extract(true, false).getElement()); + } + + if (n > 0) + { + // count -= n; + notFull.signalAll(); + } + + return n; + } + finally + { + lock.unlock(); + } + } + + /** + * Takes all available data items from the queue or blocks until some become available. The returned items + * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their + * producers, where the producers are still blocked. + * + * @param c The collection to drain the data items into. + * @param unblock If set to true the producers for the taken items will be immediately unblocked. + * + * @return A count of the number of elements that were drained from the queue. + */ + public SynchRef drainTo(Collection> c, boolean unblock) + { + return drainTo(c, -1, unblock); + } + + /** + * Takes up to maxElements available data items from the queue or blocks until some become available. The returned + * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their + * producers, where the producers are still blocked. + * + * @param coll The collection to drain the data items into. + * @param maxElements The maximum number of elements to drain. + * @param unblock If set to true the producers for the taken items will be immediately unblocked. + * + * @return A count of the number of elements that were drained from the queue. + */ + public SynchRef drainTo(Collection> coll, int maxElements, boolean unblock) + { + if (coll == null) + { + throw new NullPointerException(); + } + + // final Queue items = this.buffer; + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + int n = 0; + + for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++) + { + // Extract the next record from the queue, don't signall the not full condition yet and release + // producers depending on whether the caller wants to or not. + coll.add(extract(false, unblock)); + } + + if (n > 0) + { + // count -= n; + notFull.signalAll(); + } + + return new SynchRefImpl(n, coll); + } + finally + { + lock.unlock(); + } + } + + /** + * This abstract method should be overriden to return an empty queue. Different implementations of producer + * consumer buffers can control the order in which data is accessed using different queue implementations. + * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete + * implementations. + * + * @return An empty queue. + */ + protected abstract Queue createQueue(); + + /** + * Insert element into the queue, then possibly signal that the queue is not empty and block the producer + * on the element until permission to procede is given. + * + *

If the producer is to be blocked then the lock must be released first, otherwise no other process + * will be able to get access to the queue. Hence, unlock and block are always set together. + * + *

Call only when holding the global lock. + * + * @param unlockAndBlock trueIf the global queue lock should be released and the producer should be blocked. + * + * @return true if the operation succeeded, false otherwise. If the result is true this + * method may not return straight away, but only after the producer is unblocked by having its data + * consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no + * matter what value the unlockAndBlock flag has, leaving the global lock on. + */ + protected boolean insert(E x, boolean unlockAndBlock) + { + // Create a new record for the data item. + SynchRecordImpl record = new SynchRecordImpl(x); + + boolean result = buffer.offer(record); + + if (result) + { + count++; + + // Tell any waiting consumers that the queue is not empty. + notEmpty.signal(); + + if (unlockAndBlock) + { + // Allow other threads to read/write the queue. + lock.unlock(); + + // Wait until a consumer takes this data item. + record.waitForConsumer(); + } + + return true; + } + else + { + return false; + } + } + + /** + * Extract element at current take position, advance, and signal. + * + *

Call only when holding lock. + */ + protected SynchRecordImpl extract(boolean unblock, boolean signal) + { + SynchRecordImpl result = buffer.remove(); + count--; + + if (signal) + { + notFull.signal(); + } + + if (unblock) + { + result.releaseImmediately(); + } + + return result; + } + + /** + * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned. + * + *

Call only when holding lock. + * + * @return The maximum capacity of the buffer. + */ + protected int getBufferCapacity() + { + if (buffer instanceof Capacity) + { + return ((Capacity) buffer).getCapacity(); + } + else + { + return Integer.MAX_VALUE; + } + } + + /** + * Return the head element from the buffer. + * + *

Call only when holding lock. + * + * @return The head element from the buffer. + */ + protected E peekAtBufferHead() + { + return buffer.peek().getElement(); + } + + public class SynchRefImpl implements SynchRef + { + /** Holds the number of synch records associated with this reference. */ + int numRecords; + + /** Holds a reference to the collection of synch records managed by this. */ + Collection> records; + + public SynchRefImpl(int n, Collection> records) + { + this.numRecords = n; + this.records = records; + } + + public int getNumRecords() + { + return numRecords; + } + + /** + * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked + * when this method is called. The exception to this is producers that have had their data put back onto the queue + * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked + * but will not return from their put call normally, but with an exception instead. + */ + public void unblockProducers() + { + log.debug("public void unblockProducers(): called"); + + if (records != null) + { + for (SynchRecord record : records) + { + // This call takes account of items that have already been released, are to be requeued or are in + // error. + record.releaseImmediately(); + } + } + + records = null; + } + } + + /** + * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows + * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when + * its data cannot be consumed. + */ + public class SynchRecordImpl implements SynchRecord + { + /** A boolean latch that determines when the producer for this data item will be allowed to continue. */ + BooleanLatch latch = new BooleanLatch(); + + /** The data element associated with this item. */ + E element; + + /** + * Create a new synch record. + * + * @param e The data element that the record encapsulates. + */ + public SynchRecordImpl(E e) + { + // Keep the data element. + element = e; + } + + /** + * Waits until the producer is given permission to proceded by a consumer. + */ + public void waitForConsumer() + { + latch.await(); + } + + /** + * Gets the data element contained by this record. + * + * @return The data element contained by this record. + */ + public E getElement() + { + return element; + } + + /** + * Immediately releases the producer of this data record. Consumers can bring the synchronization time of + * producers to a minimum by using this method to release them at the earliest possible moment when batch + * consuming records from sychronized producers. + */ + public void releaseImmediately() + { + // Check that the record has not already been released, is in error or is to be requeued. + latch.signal(); + + // Propagate errors to the producer. + + // Requeue items to be requeued. + } + + /** + * Tells the synch queue to put this element back onto the queue instead of releasing its producer. + * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or + * the {@link #releaseImmediately()} method. + * + *

This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this + * element has already been unblocked. + */ + public void reQueue() + { + throw new RuntimeException("Not implemented."); + } + + /** + * Tells the synch queue to raise an exception with this elements producer. The exception is not raised + * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the + * {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is + * raised on the producer. + * + *

This method is unusual in that it accepts an exception as an argument. This is non-standard but is used + * because the exception is to be passed onto a different thread. + * + *

This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this + * element has already been unblocked. + * + * @param e The exception to raise on the producer. + */ + public void inError(Exception e) + { + throw new RuntimeException("Not implemented."); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java new file mode 100644 index 0000000000..2a5b0d0c3e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java @@ -0,0 +1,107 @@ +package org.apache.qpid.util.concurrent; + +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + * A BooleanLatch is like a set of traffic lights, where threads can wait at a red light until another thread gives + * the green light. When threads arrive at the latch it is initially red. They queue up until the green signal is + * given, at which point they can all acquire the latch in shared mode and continue to run concurrently. Once the latch + * is signalled it cannot be reset to red again. + * + *

The latch uses a {@link java.util.concurrent.locks.AbstractQueuedSynchronizer} to implement its synchronization. + * This has two internal states, 0 which means that the latch is blocked, and 1 which means that the latch is open. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Block threads until a go signal is given. + *
+ * + * @todo Might be better to use a countdown latch to count down from 1. Its await method can throw interrupted + * exception which makes the possibility of interruption more explicit, and provides a reminder to recheck the + * latch condition before continuing. + */ +public class BooleanLatch +{ + /** Holds the synchronizer that provides the thread queueing synchronization. */ + private final Sync sync = new Sync(); + + /** + * Tests whether or not the latch has been signalled, that is to say that, the light is green. + * + *

This method is non-blocking. + * + * @return true if the latch may be acquired; the light is green. + */ + public boolean isSignalled() + { + return sync.isSignalled(); + } + + /** + * Waits on the latch until the signal is given and the light is green. If the light is already green then the + * latch will be acquired and the thread will not have to wait. + * + *

This method will block until the go signal is given or the thread is otherwise interrupted. Before carrying + * out any processing threads that return from this method should confirm that the go signal has really been given + * on this latch by calling the {@link #isSignalled()} method. + */ + public void await() + { + sync.acquireShared(1); + } + + /** + * Releases any threads currently waiting on the latch. This flips the light to green allowing any threads that + * were waiting for this condition to now run. + * + *

This method is non-blocking. + */ + public void signal() + { + sync.releaseShared(1); + } + + /** + * Implements a thread queued synchronizer. The internal state 0 means that the queue is blocked and the internl + * state 1 means that the queue is released and that all waiting threads can acquire the synchronizer in shared + * mode. + */ + private static class Sync extends AbstractQueuedSynchronizer + { + /** + * Attempts to acquire this synchronizer in shared mode. It may be acquired once it has been released. + * + * @param ignore This parameter is ignored. + * + * @return 1 if the shared acquisition succeeds and -1 if it fails. + */ + protected int tryAcquireShared(int ignore) + { + return isSignalled() ? 1 : -1; + } + + /** + * Releases the synchronizer, setting its internal state to 1. + * + * @param ignore This parameter is ignored. + * + * @return true always. + */ + protected boolean tryReleaseShared(int ignore) + { + setState(1); + + return true; + } + + /** + * Tests if the synchronizer is signalled. It is signalled when its internal state it 1. + * + * @return true if the internal state is 1, false otherwise. + */ + boolean isSignalled() + { + return getState() != 0; + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java new file mode 100644 index 0000000000..2b4a5f28a9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java @@ -0,0 +1,14 @@ +package org.apache.qpid.util.concurrent; + +/** + * An interface exposed by data structures that have a maximum capacity. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Report the maximum capacity. + *
+ */ +public interface Capacity +{ + public int getCapacity(); +} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java new file mode 100644 index 0000000000..8f682ec462 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java @@ -0,0 +1,29 @@ +package org.apache.qpid.util.concurrent; + +import java.util.Queue; + +/** + * SynchBuffer completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying + * queue as an array. This uses FIFO ordering for the queue but restricts the maximum size of the queue to a fixed + * amount. It also has the advantage that, as the buffer does not grow and shrink dynamically, memory for the buffer + * is allocated up front and does not create garbage during the operation of the queue. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Provide array based FIFO queue to create a batch synched queue around. + *
+ * + * @todo Write an array based buffer implementation that implements Queue. + */ +public class SynchBuffer extends BatchSynchQueueBase +{ + /** + * Returns an empty queue, implemented as an array. + * + * @return An empty queue, implemented as an array. + */ + protected Queue createQueue() + { + throw new RuntimeException("Not implemented."); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java new file mode 100644 index 0000000000..c6edff6320 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java @@ -0,0 +1,31 @@ +package org.apache.qpid.util.concurrent; + +/** + * SynchException is used to encapsulate exceptions with the data elements that caused them in order to send exceptions + * back from the consumers of a {@link BatchSynchQueue} to producers. The underlying exception should be retrieved from + * the {@link #getCause} method. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Encapsulate a data element and exception. + *
+ */ +public class SynchException extends Exception +{ + /** Holds the data element that is in error. */ + Object element; + + /** + * Creates a new BaseApplicationException object. + * + * @param message The exception message. + * @param cause The underlying throwable cause. This may be null. + */ + public SynchException(String message, Throwable cause, Object element) + { + super(message, cause); + + // Keep the data element that was in error. + this.element = element; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java new file mode 100644 index 0000000000..df3f2b849a --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java @@ -0,0 +1,27 @@ +package org.apache.qpid.util.concurrent; + +import java.util.LinkedList; +import java.util.Queue; + +/** + * SynchQueue completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying + * queue as a linked list. This uses FIFO ordering for the queue and allows the queue to grow to accomodate more + * elements as needed. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Provide linked list FIFO queue to create a batch synched queue around. + *
+ */ +public class SynchQueue extends BatchSynchQueueBase +{ + /** + * Returns an empty queue, implemented as a linked list. + * + * @return An empty queue, implemented as a linked list. + */ + protected Queue createQueue() + { + return new LinkedList(); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java new file mode 100644 index 0000000000..cacef472d6 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java @@ -0,0 +1,53 @@ +package org.apache.qpid.util.concurrent; + +/** + * SynchRecord associates a data item from a {@link BatchSynchQueue} with its producer. This enables the data item data + * item to be put back on the queue without unblocking its producer, or to send exceptions to the producer. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Get the underlying data element. + *
Put the data element back on the queue without unblocking its producer. + *
Send and exception to the data elements producer. + *
+ */ +public interface SynchRecord +{ + /** + * Gets the data element contained by this record. + * + * @return The data element contained by this record. + */ + public E getElement(); + + /** + * Tells the synch queue to put this element back onto the queue instead of releasing its producer. + * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method. + * + *

This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element + * has already been unblocked. + */ + public void reQueue(); + + /** + * Immediately releases the producer of this data record. Consumers can bring the synchronization time of + * producers to a minimum by using this method to release them at the earliest possible moment when batch + * consuming records from sychronized producers. + */ + public void releaseImmediately(); + + /** + * Tells the synch queue to raise an exception with this elements producer. The exception is not raised immediately + * but upon calling the {@link SynchRef#unblockProducers()} method. The exception will be wrapped in a + * {@link SynchException} before it is raised on the producer. + * + *

This method is unusual in that it accepts an exception as an argument. This is non-standard but is used + * because the exception is to be passed onto a different thread. + * + *

This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element + * has already been unblocked. + * + * @param e The exception to raise on the producer. + */ + public void inError(Exception e); +} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java new file mode 100644 index 0000000000..c044ed0a60 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java @@ -0,0 +1,30 @@ +package org.apache.qpid.util.concurrent; + +/** + * A SynchRef is an interface which is returned from the synchronous take and drain methods of {@link BatchSynchQueue}, + * allowing call-backs to be made against the synchronizing strucutre. It allows the consumer to communicate when it + * wants producers that have their data taken to be unblocked. + * + *

+ *
CRC Card
Responsibilities + *
Report number of records returned by a taking operation. + *
Provide call-back to release producers of taken records. + *
+ */ +public interface SynchRef +{ + /** + * Reports the number of records taken by the take or drain operation. + * + * @return The number of records taken by the take or drain operation. + */ + public int getNumRecords(); + + /** + * Any producers that have had their data elements taken from the queue but have not been unblocked are + * unblocked when this method is called. The exception to this is producers that have had their data put back + * onto the queue by a consumer. Producers that have had exceptions for their data items registered by consumers + * will be unblocked but will not return from their put call normally, but with an exception instead. + */ + public void unblockProducers(); +} -- cgit v1.2.1