From 1d5212bd70b7e4e9837b88f39be06e40a0fc224d Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Mon, 9 Jan 2012 12:30:54 +0000 Subject: QPID-3730: remove ReadWriteJobQueue, it is no longer necessary as the only things now using the thread pool are write jobs, so there is no need/benefit to request it be write-biased. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1229112 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/configuration/ServerConfiguration.java | 6 - .../org/apache/qpid/server/queue/QueueRunner.java | 13 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 3 +- .../apache/qpid/server/queue/SubFlushRunner.java | 23 +- .../configuration/ServerConfigurationTest.java | 13 - .../org/apache/qpid/pool/ReadWriteJobQueue.java | 432 --------------------- .../org/apache/qpid/pool/ReadWriteRunnable.java | 26 -- .../pool/ReferenceCountingExecutorService.java | 24 +- 8 files changed, 12 insertions(+), 528 deletions(-) delete mode 100644 java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java delete mode 100644 java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 4b42e39aa1..d3b89649c7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -89,7 +89,6 @@ public class ServerConfiguration extends ConfigurationPlugin envVarMap.put("QPID_PORT", "connector.port"); envVarMap.put("QPID_ENABLEDIRECTBUFFERS", "advanced.enableDirectBuffers"); envVarMap.put("QPID_SSLPORT", "connector.ssl.port"); - envVarMap.put("QPID_WRITEBIASED", "advanced.useWriteBiasedPool"); envVarMap.put("QPID_JMXPORT_REGISTRYSERVER", MGMT_JMXPORT_REGISTRYSERVER); envVarMap.put("QPID_JMXPORT_CONNECTORSERVER", MGMT_JMXPORT_CONNECTORSERVER); envVarMap.put("QPID_FRAMESIZE", "advanced.framesize"); @@ -736,11 +735,6 @@ public class ServerConfiguration extends ConfigurationPlugin return getStringValue("connector.ssl.certType", "SunX509"); } - public boolean getUseBiasedWrites() - { - return getBooleanValue("advanced.useWriteBiasedPool"); - } - public String getDefaultVirtualHost() { return getStringValue("virtualhosts.default"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index a092145958..0d44fe7cf3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.queue.QueueRunner; import org.apache.qpid.server.queue.SimpleAMQQueue; @@ -38,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; * when straight-through delivery of a message to a subscription isn't * possible during the enqueue operation. */ -public class QueueRunner implements ReadWriteRunnable +public class QueueRunner implements Runnable { private static final Logger _logger = Logger.getLogger(QueueRunner.class); @@ -99,16 +98,6 @@ public class QueueRunner implements ReadWriteRunnable } } - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - public String toString() { return "QueueRunner-" + _queue.getLogActor(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index d48445930a..f1efbc575a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -23,7 +23,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -288,7 +287,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // ------ Getters and Setters - public void execute(ReadWriteRunnable runnable) + public void execute(Runnable runnable) { _asyncDelivery.execute(runnable); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index fbef23dca1..48f2efb342 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -21,18 +21,17 @@ package org.apache.qpid.server.queue; */ -import org.apache.qpid.pool.ReadWriteRunnable; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; - import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.subscription.Subscription; + -class SubFlushRunner implements ReadWriteRunnable +class SubFlushRunner implements Runnable { private static final Logger _logger = Logger.getLogger(SubFlushRunner.class); @@ -90,16 +89,6 @@ class SubFlushRunner implements ReadWriteRunnable return (SimpleAMQQueue) _sub.getQueue(); } - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - public String toString() { return "SubFlushRunner-" + _sub.getLogActor(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 7739f9976e..eb4a90d9f3 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -600,19 +600,6 @@ public class ServerConfigurationTest extends QpidTestCase assertEquals("a", _serverConfig.getConnectorCertType()); } - public void testGetUseBiasedWrites() throws ConfigurationException - { - // Check default - _serverConfig.initialise(); - assertEquals(false, _serverConfig.getUseBiasedWrites()); - - // Check value we set - _config.setProperty("advanced.useWriteBiasedPool", true); - _serverConfig = new ServerConfiguration(_config); - _serverConfig.initialise(); - assertEquals(true, _serverConfig.getUseBiasedWrites()); - } - public void testGetHousekeepingCheckPeriod() throws ConfigurationException { // Check default diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java deleted file mode 100644 index 8de0f93ce9..0000000000 --- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java +++ /dev/null @@ -1,432 +0,0 @@ -package org.apache.qpid.pool; - -import java.util.AbstractQueue; -import java.util.Iterator; -import java.util.Collection; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.atomic.AtomicInteger; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public class ReadWriteJobQueue extends AbstractQueue implements BlockingQueue -{ - - private final AtomicInteger _count = new AtomicInteger(0); - - private final ReentrantLock _takeLock = new ReentrantLock(); - - private final Condition _notEmpty = _takeLock.newCondition(); - - private final ReentrantLock _putLock = new ReentrantLock(); - - private final ConcurrentLinkedQueue _readJobQueue = new ConcurrentLinkedQueue(); - - private final ConcurrentLinkedQueue _writeJobQueue = new ConcurrentLinkedQueue(); - - - private class ReadWriteJobIterator implements Iterator - { - - private boolean _onReads; - private Iterator _iter = _writeJobQueue.iterator(); - - public boolean hasNext() - { - if(!_iter.hasNext()) - { - if(_onReads) - { - _iter = _readJobQueue.iterator(); - _onReads = true; - return _iter.hasNext(); - } - else - { - return false; - } - } - else - { - return true; - } - } - - public Runnable next() - { - if(_iter.hasNext()) - { - return _iter.next(); - } - else - { - return null; - } - } - - public void remove() - { - _takeLock.lock(); - try - { - _iter.remove(); - _count.decrementAndGet(); - } - finally - { - _takeLock.unlock(); - } - } - } - - public Iterator iterator() - { - return new ReadWriteJobIterator(); - } - - public int size() - { - return _count.get(); - } - - public boolean offer(final Runnable runnable) - { - final ReadWriteRunnable job = (ReadWriteRunnable) runnable; - final ReentrantLock putLock = _putLock; - putLock.lock(); - try - { - if(job.isRead()) - { - _readJobQueue.offer(job); - } - else - { - _writeJobQueue.offer(job); - } - if(_count.getAndIncrement() == 0) - { - _takeLock.lock(); - try - { - _notEmpty.signal(); - } - finally - { - _takeLock.unlock(); - } - } - return true; - } - finally - { - putLock.unlock(); - } - } - - public void put(final Runnable runnable) throws InterruptedException - { - final ReadWriteRunnable job = (ReadWriteRunnable) runnable; - final ReentrantLock putLock = _putLock; - putLock.lock(); - - try - { - if(job.isRead()) - { - _readJobQueue.offer(job); - } - else - { - _writeJobQueue.offer(job); - } - if(_count.getAndIncrement() == 0) - { - _takeLock.lock(); - try - { - _notEmpty.signal(); - } - finally - { - _takeLock.unlock(); - } - } - - } - finally - { - putLock.unlock(); - } - } - - - - public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException - { - final ReadWriteRunnable job = (ReadWriteRunnable) runnable; - final ReentrantLock putLock = _putLock; - putLock.lock(); - - try - { - if(job.isRead()) - { - _readJobQueue.offer(job); - } - else - { - _writeJobQueue.offer(job); - } - if(_count.getAndIncrement() == 0) - { - _takeLock.lock(); - try - { - _notEmpty.signal(); - } - finally - { - _takeLock.unlock(); - } - } - - return true; - } - finally - { - putLock.unlock(); - } - - } - - public Runnable take() throws InterruptedException - { - final ReentrantLock takeLock = _takeLock; - takeLock.lockInterruptibly(); - try - { - try - { - while (_count.get() == 0) - { - _notEmpty.await(); - } - } - catch (InterruptedException ie) - { - _notEmpty.signal(); - throw ie; - } - - ReadWriteRunnable job = _writeJobQueue.poll(); - if(job == null) - { - job = _readJobQueue.poll(); - } - int c = _count.getAndDecrement(); - if (c > 1) - { - _notEmpty.signal(); - } - return job; - } - finally - { - takeLock.unlock(); - } - - - } - - public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException - { - final ReentrantLock takeLock = _takeLock; - final AtomicInteger count = _count; - long nanos = unit.toNanos(timeout); - takeLock.lockInterruptibly(); - ReadWriteRunnable job = null; - try - { - - for (;;) - { - if (count.get() > 0) - { - job = _writeJobQueue.poll(); - if(job == null) - { - job = _readJobQueue.poll(); - } - int c = count.getAndDecrement(); - if (c > 1) - { - _notEmpty.signal(); - } - break; - } - if (nanos <= 0) - { - return null; - } - try - { - nanos = _notEmpty.awaitNanos(nanos); - } - catch (InterruptedException ie) - { - _notEmpty.signal(); - throw ie; - } - } - } - finally - { - takeLock.unlock(); - } - - return job; - } - - public int remainingCapacity() - { - return Integer.MAX_VALUE; - } - - public int drainTo(final Collection c) - { - int total = 0; - - _putLock.lock(); - _takeLock.lock(); - try - { - ReadWriteRunnable job; - while((job = _writeJobQueue.peek())!= null) - { - c.add(job); - _writeJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - while((job = _readJobQueue.peek())!= null) - { - c.add(job); - _readJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - } - finally - { - _takeLock.unlock(); - _putLock.unlock(); - } - return total; - } - - public int drainTo(final Collection c, final int maxElements) - { - int total = 0; - - _putLock.lock(); - _takeLock.lock(); - try - { - ReadWriteRunnable job; - while(total<=maxElements && (job = _writeJobQueue.peek())!= null) - { - c.add(job); - _writeJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - while(total<=maxElements && (job = _readJobQueue.peek())!= null) - { - c.add(job); - _readJobQueue.poll(); - _count.decrementAndGet(); - total++; - } - - } - finally - { - _takeLock.unlock(); - _putLock.unlock(); - } - return total; - - } - - public Runnable poll() - { - final ReentrantLock takeLock = _takeLock; - takeLock.lock(); - try - { - if(_count.get() > 0) - { - ReadWriteRunnable job = _writeJobQueue.poll(); - if(job == null) - { - job = _readJobQueue.poll(); - } - _count.decrementAndGet(); - return job; - } - else - { - return null; - } - } - finally - { - takeLock.unlock(); - } - - } - - public Runnable peek() - { - final ReentrantLock takeLock = _takeLock; - takeLock.lock(); - try - { - ReadWriteRunnable job = _writeJobQueue.peek(); - if(job == null) - { - job = _readJobQueue.peek(); - } - return job; - } - finally - { - takeLock.unlock(); - } - } -} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java deleted file mode 100644 index 140c93ca8d..0000000000 --- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.apache.qpid.pool; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public interface ReadWriteRunnable extends Runnable -{ - boolean isRead(); -} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 8152a1f5e9..3e99b244c4 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -96,8 +96,6 @@ public class ReferenceCountingExecutorService */ private ThreadFactory _threadFactory = Executors.defaultThreadFactory(); - private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); - /** * Retrieves the singleton instance of this reference counter. * @@ -125,26 +123,12 @@ public class ReferenceCountingExecutorService { if (_refCount++ == 0) { - // Use a job queue that biases to writes - if(_useBiasedPool) - { - _pool = new ThreadPoolExecutor(_poolSize, _poolSize, - 0L, TimeUnit.MILLISECONDS, - new ReadWriteJobQueue(), - _threadFactory); - - } - else - { - _pool = new ThreadPoolExecutor(_poolSize, _poolSize, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), - _threadFactory); - } - + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + _threadFactory); } - return _pool; } } -- cgit v1.2.1