diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-11 15:22:03 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-11 15:22:03 +0000 |
| commit | 531f130c20f73c08ffc963c1e15c7b98cea05d8f (patch) | |
| tree | d30c93bde8eb6515c076568102a008702c98567a /java/common/src | |
| parent | 1a7004fd9dab8306ec31b96d9d5e2d5a44256d98 (diff) | |
| download | qpid-python-531f130c20f73c08ffc963c1e15c7b98cea05d8f.tar.gz | |
Updates on the refactoring work
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
6 files changed, 461 insertions, 48 deletions
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java index 03838ca3f1..fcaae7740b 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java @@ -766,7 +766,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor session.increaseWrittenBytes(writtenBytes); } - if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) + if (buf.hasRemaining()) { // Kernel buffer is full synchronized (writeLock) diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index ad2ab2ac0b..f1a7d4970a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -469,6 +469,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return false; } + + return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data)) || Arrays.equals(getBytes(),otherString.getBytes()); diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index b2a09ac592..d65fce0fd0 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -67,18 +67,22 @@ public class Job implements Runnable /** Holds the completion continuation, called upon completion of a run of the job. */ private final JobCompletionHandler _completionHandler; + private final boolean _readJob; + /** * Creates a new job that aggregates many continuations together. * * @param session The Mina session. * @param completionHandler The per job run, terminal continuation. * @param maxEvents The maximum number of aggregated continuations to process per run of the job. + * @param readJob */ - Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) + Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) { _session = session; _completionHandler = completionHandler; _maxEvents = maxEvents; + _readJob = readJob; } /** @@ -157,6 +161,11 @@ public class Job implements Runnable } } + public boolean isReadJob() + { + return _readJob; + } + /** * Another interface for a continuation. * diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 8352b5af77..dd2d7038fd 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -59,24 +59,6 @@ import java.util.concurrent.ExecutorService; * <td> {@link Job}, {@link Job.JobCompletionHandler} * </table> * - * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events. - * The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread - * pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads - * so there is concurrency. But why go to the trouble of seperating out the read and write events in that case? - * Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these - * stages. - * - * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in - * it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run - * and trips off another batch of 10 until they are all done. Why not just have a straight forward - * consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10 - * in them, just have one queue of events and worker threads taking the next event. There will be coordination - * between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same - * amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker - * pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be - * better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily - * be substituted in. - * * @todo The static helper methods are pointless. Could just call new. */ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler @@ -95,17 +77,20 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo private final int _maxEvents; + private final boolean _readFilter; + /** * Creates a named pooling filter, on the specified shared thread pool. * * @param refCountingPool The thread pool reference. * @param name The identifying name of the filter type. */ - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents) + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter) { _poolReference = refCountingPool; _name = name; _maxEvents = maxEvents; + _readFilter = readFilter; } /** @@ -166,7 +151,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo void fireAsynchEvent(Job job, Event event) { - // job.acquire(); //prevents this job being removed from _jobs job.add(event); final ExecutorService pool = _poolReference.getPool(); @@ -200,7 +184,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void createNewJobForSession(IoSession session) { - Job job = new Job(session, this, MAX_JOB_EVENTS); + Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); session.setAttribute(_name, job); } @@ -216,18 +200,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo return (Job) session.getAttribute(_name); } - /*private Job createJobForSession(IoSession session) - { - return addJobForSession(session, new Job(session, this, _maxEvents)); - }*/ - - /*private Job addJobForSession(IoSession session, Job job) - { - // atomic so ensures all threads agree on the same job - Job existing = _jobs.putIfAbsent(session, job); - - return (existing == null) ? job : existing; - }*/ /** * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing @@ -238,16 +210,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void completed(IoSession session, Job job) { - // if (job.isComplete()) - // { - // job.release(); - // if (!job.isReferenced()) - // { - // _jobs.remove(session); - // } - // } - // else - if (!job.isComplete()) { @@ -454,7 +416,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS)); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true); } /** @@ -497,7 +459,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS)); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false); } /** diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java new file mode 100644 index 0000000000..05141aea7b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java @@ -0,0 +1,432 @@ +package org.apache.qpid.pool; + +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.atomic.AtomicInteger; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> +{ + + private final AtomicInteger _count = new AtomicInteger(0); + + private final ReentrantLock _takeLock = new ReentrantLock(); + + private final Condition _notEmpty = _takeLock.newCondition(); + + private final ReentrantLock _putLock = new ReentrantLock(); + + private final ConcurrentLinkedQueue<Job> _readJobQueue = new ConcurrentLinkedQueue<Job>(); + + private final ConcurrentLinkedQueue<Job> _writeJobQueue = new ConcurrentLinkedQueue<Job>(); + + + private class ReadWriteJobIterator implements Iterator<Runnable> + { + + private boolean _onReads; + private Iterator<Job> _iter = _writeJobQueue.iterator(); + + public boolean hasNext() + { + if(!_iter.hasNext()) + { + if(_onReads) + { + _iter = _readJobQueue.iterator(); + _onReads = true; + return _iter.hasNext(); + } + else + { + return false; + } + } + else + { + return true; + } + } + + public Runnable next() + { + if(_iter.hasNext()) + { + return _iter.next(); + } + else + { + return null; + } + } + + public void remove() + { + _takeLock.lock(); + try + { + _iter.remove(); + _count.decrementAndGet(); + } + finally + { + _takeLock.unlock(); + } + } + } + + public Iterator<Runnable> iterator() + { + return new ReadWriteJobIterator(); + } + + public int size() + { + return _count.get(); + } + + public boolean offer(final Runnable runnable) + { + final Job job = (Job) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + try + { + if(job.isReadJob()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + return true; + } + finally + { + putLock.unlock(); + } + } + + public void put(final Runnable runnable) throws InterruptedException + { + final Job job = (Job) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + + try + { + if(job.isReadJob()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + + } + finally + { + putLock.unlock(); + } + } + + + + public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException + { + final Job job = (Job) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + + try + { + if(job.isReadJob()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + + return true; + } + finally + { + putLock.unlock(); + } + + } + + public Runnable take() throws InterruptedException + { + final ReentrantLock takeLock = _takeLock; + takeLock.lockInterruptibly(); + try + { + try + { + while (_count.get() == 0) + { + _notEmpty.await(); + } + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + + Job job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + int c = _count.getAndDecrement(); + if (c > 1) + { + _notEmpty.signal(); + } + return job; + } + finally + { + takeLock.unlock(); + } + + + } + + public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException + { + final ReentrantLock takeLock = _takeLock; + final AtomicInteger count = _count; + long nanos = unit.toNanos(timeout); + takeLock.lockInterruptibly(); + Job job = null; + try + { + + for (;;) + { + if (count.get() > 0) + { + job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + int c = count.getAndDecrement(); + if (c > 1) + { + _notEmpty.signal(); + } + break; + } + if (nanos <= 0) + { + return null; + } + try + { + nanos = _notEmpty.awaitNanos(nanos); + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + } + } + finally + { + takeLock.unlock(); + } + + return job; + } + + public int remainingCapacity() + { + return Integer.MAX_VALUE; + } + + public int drainTo(final Collection<? super Runnable> c) + { + int total = 0; + + _putLock.lock(); + _takeLock.lock(); + try + { + Job job; + while((job = _writeJobQueue.peek())!= null) + { + c.add(job); + _writeJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + while((job = _readJobQueue.peek())!= null) + { + c.add(job); + _readJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + } + finally + { + _takeLock.unlock(); + _putLock.unlock(); + } + return total; + } + + public int drainTo(final Collection<? super Runnable> c, final int maxElements) + { + int total = 0; + + _putLock.lock(); + _takeLock.lock(); + try + { + Job job; + while(total<=maxElements && (job = _writeJobQueue.peek())!= null) + { + c.add(job); + _writeJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + while(total<=maxElements && (job = _readJobQueue.peek())!= null) + { + c.add(job); + _readJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + } + finally + { + _takeLock.unlock(); + _putLock.unlock(); + } + return total; + + } + + public Runnable poll() + { + final ReentrantLock takeLock = _takeLock; + takeLock.lock(); + try + { + if(_count.get() > 0) + { + Job job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + _count.decrementAndGet(); + return job; + } + else + { + return null; + } + } + finally + { + takeLock.unlock(); + } + + } + + public Runnable peek() + { + final ReentrantLock takeLock = _takeLock; + takeLock.lock(); + try + { + Job job = _writeJobQueue.peek(); + if(job == null) + { + job = _readJobQueue.peek(); + } + return job; + } + finally + { + takeLock.unlock(); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 84c9e1f465..1359e56958 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -22,6 +22,9 @@ package org.apache.qpid.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts @@ -111,7 +114,12 @@ public class ReferenceCountingExecutorService { if (_refCount++ == 0) { - _pool = Executors.newFixedThreadPool(_poolSize); +// _pool = Executors.newFixedThreadPool(_poolSize); + + // Use a job queue that biases to writes + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new ReadWriteJobQueue()); } return _pool; |
