summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-01-09 12:30:54 +0000
committerRobert Gemmell <robbie@apache.org>2012-01-09 12:30:54 +0000
commit1d5212bd70b7e4e9837b88f39be06e40a0fc224d (patch)
treeaa926663a4c84eceade95739c9e5ac0e7dfa3cea /java
parent592c256cd4f97e7aa2f05676c48e27a3f274054e (diff)
downloadqpid-python-1d5212bd70b7e4e9837b88f39be06e40a0fc224d.tar.gz
QPID-3730: remove ReadWriteJobQueue, it is no longer necessary as the only things now using the thread pool are write jobs, so there is no need/benefit to request it be write-biased.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1229112 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java3
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java23
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java432
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java24
8 files changed, 12 insertions, 528 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 4b42e39aa1..d3b89649c7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -89,7 +89,6 @@ public class ServerConfiguration extends ConfigurationPlugin
envVarMap.put("QPID_PORT", "connector.port");
envVarMap.put("QPID_ENABLEDIRECTBUFFERS", "advanced.enableDirectBuffers");
envVarMap.put("QPID_SSLPORT", "connector.ssl.port");
- envVarMap.put("QPID_WRITEBIASED", "advanced.useWriteBiasedPool");
envVarMap.put("QPID_JMXPORT_REGISTRYSERVER", MGMT_JMXPORT_REGISTRYSERVER);
envVarMap.put("QPID_JMXPORT_CONNECTORSERVER", MGMT_JMXPORT_CONNECTORSERVER);
envVarMap.put("QPID_FRAMESIZE", "advanced.framesize");
@@ -736,11 +735,6 @@ public class ServerConfiguration extends ConfigurationPlugin
return getStringValue("connector.ssl.certType", "SunX509");
}
- public boolean getUseBiasedWrites()
- {
- return getBooleanValue("advanced.useWriteBiasedPool");
- }
-
public String getDefaultVirtualHost()
{
return getStringValue("virtualhosts.default");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index a092145958..0d44fe7cf3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.queue.QueueRunner;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -38,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
* when straight-through delivery of a message to a subscription isn't
* possible during the enqueue operation.
*/
-public class QueueRunner implements ReadWriteRunnable
+public class QueueRunner implements Runnable
{
private static final Logger _logger = Logger.getLogger(QueueRunner.class);
@@ -99,16 +98,6 @@ public class QueueRunner implements ReadWriteRunnable
}
}
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
public String toString()
{
return "QueueRunner-" + _queue.getLogActor();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d48445930a..f1efbc575a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -23,7 +23,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -288,7 +287,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// ------ Getters and Setters
- public void execute(ReadWriteRunnable runnable)
+ public void execute(Runnable runnable)
{
_asyncDelivery.execute(runnable);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index fbef23dca1..48f2efb342 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -21,18 +21,17 @@ package org.apache.qpid.server.queue;
*/
-import org.apache.qpid.pool.ReadWriteRunnable;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.subscription.Subscription;
+
-class SubFlushRunner implements ReadWriteRunnable
+class SubFlushRunner implements Runnable
{
private static final Logger _logger = Logger.getLogger(SubFlushRunner.class);
@@ -90,16 +89,6 @@ class SubFlushRunner implements ReadWriteRunnable
return (SimpleAMQQueue) _sub.getQueue();
}
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
public String toString()
{
return "SubFlushRunner-" + _sub.getLogActor();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 7739f9976e..eb4a90d9f3 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -600,19 +600,6 @@ public class ServerConfigurationTest extends QpidTestCase
assertEquals("a", _serverConfig.getConnectorCertType());
}
- public void testGetUseBiasedWrites() throws ConfigurationException
- {
- // Check default
- _serverConfig.initialise();
- assertEquals(false, _serverConfig.getUseBiasedWrites());
-
- // Check value we set
- _config.setProperty("advanced.useWriteBiasedPool", true);
- _serverConfig = new ServerConfiguration(_config);
- _serverConfig.initialise();
- assertEquals(true, _serverConfig.getUseBiasedWrites());
- }
-
public void testGetHousekeepingCheckPeriod() throws ConfigurationException
{
// Check default
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
deleted file mode 100644
index 8de0f93ce9..0000000000
--- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
+++ /dev/null
@@ -1,432 +0,0 @@
-package org.apache.qpid.pool;
-
-import java.util.AbstractQueue;
-import java.util.Iterator;
-import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
-{
-
- private final AtomicInteger _count = new AtomicInteger(0);
-
- private final ReentrantLock _takeLock = new ReentrantLock();
-
- private final Condition _notEmpty = _takeLock.newCondition();
-
- private final ReentrantLock _putLock = new ReentrantLock();
-
- private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
-
- private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
-
-
- private class ReadWriteJobIterator implements Iterator<Runnable>
- {
-
- private boolean _onReads;
- private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
-
- public boolean hasNext()
- {
- if(!_iter.hasNext())
- {
- if(_onReads)
- {
- _iter = _readJobQueue.iterator();
- _onReads = true;
- return _iter.hasNext();
- }
- else
- {
- return false;
- }
- }
- else
- {
- return true;
- }
- }
-
- public Runnable next()
- {
- if(_iter.hasNext())
- {
- return _iter.next();
- }
- else
- {
- return null;
- }
- }
-
- public void remove()
- {
- _takeLock.lock();
- try
- {
- _iter.remove();
- _count.decrementAndGet();
- }
- finally
- {
- _takeLock.unlock();
- }
- }
- }
-
- public Iterator<Runnable> iterator()
- {
- return new ReadWriteJobIterator();
- }
-
- public int size()
- {
- return _count.get();
- }
-
- public boolean offer(final Runnable runnable)
- {
- final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
- final ReentrantLock putLock = _putLock;
- putLock.lock();
- try
- {
- if(job.isRead())
- {
- _readJobQueue.offer(job);
- }
- else
- {
- _writeJobQueue.offer(job);
- }
- if(_count.getAndIncrement() == 0)
- {
- _takeLock.lock();
- try
- {
- _notEmpty.signal();
- }
- finally
- {
- _takeLock.unlock();
- }
- }
- return true;
- }
- finally
- {
- putLock.unlock();
- }
- }
-
- public void put(final Runnable runnable) throws InterruptedException
- {
- final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
- final ReentrantLock putLock = _putLock;
- putLock.lock();
-
- try
- {
- if(job.isRead())
- {
- _readJobQueue.offer(job);
- }
- else
- {
- _writeJobQueue.offer(job);
- }
- if(_count.getAndIncrement() == 0)
- {
- _takeLock.lock();
- try
- {
- _notEmpty.signal();
- }
- finally
- {
- _takeLock.unlock();
- }
- }
-
- }
- finally
- {
- putLock.unlock();
- }
- }
-
-
-
- public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
- {
- final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
- final ReentrantLock putLock = _putLock;
- putLock.lock();
-
- try
- {
- if(job.isRead())
- {
- _readJobQueue.offer(job);
- }
- else
- {
- _writeJobQueue.offer(job);
- }
- if(_count.getAndIncrement() == 0)
- {
- _takeLock.lock();
- try
- {
- _notEmpty.signal();
- }
- finally
- {
- _takeLock.unlock();
- }
- }
-
- return true;
- }
- finally
- {
- putLock.unlock();
- }
-
- }
-
- public Runnable take() throws InterruptedException
- {
- final ReentrantLock takeLock = _takeLock;
- takeLock.lockInterruptibly();
- try
- {
- try
- {
- while (_count.get() == 0)
- {
- _notEmpty.await();
- }
- }
- catch (InterruptedException ie)
- {
- _notEmpty.signal();
- throw ie;
- }
-
- ReadWriteRunnable job = _writeJobQueue.poll();
- if(job == null)
- {
- job = _readJobQueue.poll();
- }
- int c = _count.getAndDecrement();
- if (c > 1)
- {
- _notEmpty.signal();
- }
- return job;
- }
- finally
- {
- takeLock.unlock();
- }
-
-
- }
-
- public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException
- {
- final ReentrantLock takeLock = _takeLock;
- final AtomicInteger count = _count;
- long nanos = unit.toNanos(timeout);
- takeLock.lockInterruptibly();
- ReadWriteRunnable job = null;
- try
- {
-
- for (;;)
- {
- if (count.get() > 0)
- {
- job = _writeJobQueue.poll();
- if(job == null)
- {
- job = _readJobQueue.poll();
- }
- int c = count.getAndDecrement();
- if (c > 1)
- {
- _notEmpty.signal();
- }
- break;
- }
- if (nanos <= 0)
- {
- return null;
- }
- try
- {
- nanos = _notEmpty.awaitNanos(nanos);
- }
- catch (InterruptedException ie)
- {
- _notEmpty.signal();
- throw ie;
- }
- }
- }
- finally
- {
- takeLock.unlock();
- }
-
- return job;
- }
-
- public int remainingCapacity()
- {
- return Integer.MAX_VALUE;
- }
-
- public int drainTo(final Collection<? super Runnable> c)
- {
- int total = 0;
-
- _putLock.lock();
- _takeLock.lock();
- try
- {
- ReadWriteRunnable job;
- while((job = _writeJobQueue.peek())!= null)
- {
- c.add(job);
- _writeJobQueue.poll();
- _count.decrementAndGet();
- total++;
- }
-
- while((job = _readJobQueue.peek())!= null)
- {
- c.add(job);
- _readJobQueue.poll();
- _count.decrementAndGet();
- total++;
- }
-
- }
- finally
- {
- _takeLock.unlock();
- _putLock.unlock();
- }
- return total;
- }
-
- public int drainTo(final Collection<? super Runnable> c, final int maxElements)
- {
- int total = 0;
-
- _putLock.lock();
- _takeLock.lock();
- try
- {
- ReadWriteRunnable job;
- while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
- {
- c.add(job);
- _writeJobQueue.poll();
- _count.decrementAndGet();
- total++;
- }
-
- while(total<=maxElements && (job = _readJobQueue.peek())!= null)
- {
- c.add(job);
- _readJobQueue.poll();
- _count.decrementAndGet();
- total++;
- }
-
- }
- finally
- {
- _takeLock.unlock();
- _putLock.unlock();
- }
- return total;
-
- }
-
- public Runnable poll()
- {
- final ReentrantLock takeLock = _takeLock;
- takeLock.lock();
- try
- {
- if(_count.get() > 0)
- {
- ReadWriteRunnable job = _writeJobQueue.poll();
- if(job == null)
- {
- job = _readJobQueue.poll();
- }
- _count.decrementAndGet();
- return job;
- }
- else
- {
- return null;
- }
- }
- finally
- {
- takeLock.unlock();
- }
-
- }
-
- public Runnable peek()
- {
- final ReentrantLock takeLock = _takeLock;
- takeLock.lock();
- try
- {
- ReadWriteRunnable job = _writeJobQueue.peek();
- if(job == null)
- {
- job = _readJobQueue.peek();
- }
- return job;
- }
- finally
- {
- takeLock.unlock();
- }
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
deleted file mode 100644
index 140c93ca8d..0000000000
--- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.qpid.pool;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public interface ReadWriteRunnable extends Runnable
-{
- boolean isRead();
-}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
index 8152a1f5e9..3e99b244c4 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -96,8 +96,6 @@ public class ReferenceCountingExecutorService
*/
private ThreadFactory _threadFactory = Executors.defaultThreadFactory();
- private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
-
/**
* Retrieves the singleton instance of this reference counter.
*
@@ -125,26 +123,12 @@ public class ReferenceCountingExecutorService
{
if (_refCount++ == 0)
{
- // Use a job queue that biases to writes
- if(_useBiasedPool)
- {
- _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
- 0L, TimeUnit.MILLISECONDS,
- new ReadWriteJobQueue(),
- _threadFactory);
-
- }
- else
- {
- _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(),
- _threadFactory);
- }
-
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ _threadFactory);
}
-
return _pool;
}
}