From fbeb3752a902f5cbf225dd9fa4c6f00dbcbc3a68 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 21 Nov 2008 16:32:53 +0000 Subject: This is related to QPID-1479. For starters I have changed the IoSender.java IoReceiver.java and AMQSession.java#Dispatcher to use the Thread factory to create the threads they require. The ThreadFactory has two implimentations, the default being the java.lang.Threads. The other is the RealtimeThreadFactory which uses reflection to create threads with a specific priority. -Dqpid.thread_factory= will decide which thread factory should be loaded. -Dqpid.rt_thread_priority= specifies the gloabl real time thread priority and defaults to 20. You could also set individual thread priorities by adding the nessacery config+code changes. I have also changed the Testkit and QpidBench to use the Thread factory so you could use them for testing/benchmarking work on RT JVMs. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719628 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 55 ++++++++++++------ .../apache/qpid/thread/DefaultThreadFactory.java | 18 ++++++ .../apache/qpid/thread/RealtimeThreadFactory.java | 47 ++++++++++++++++ .../java/org/apache/qpid/thread/ThreadFactory.java | 7 +++ .../java/org/apache/qpid/thread/Threading.java | 26 +++++++++ .../qpid/transport/network/io/IoReceiver.java | 26 ++++++--- .../apache/qpid/transport/network/io/IoSender.java | 27 ++++++--- .../org/apache/qpid/testkit/perf/LatencyTest.java | 38 +++++++++---- .../org/apache/qpid/testkit/perf/PerfConsumer.java | 23 +++++++- .../org/apache/qpid/testkit/perf/PerfProducer.java | 22 +++++++- .../qpid/testkit/soak/MultiThreadedConsumer.java | 17 +++++- .../qpid/testkit/soak/MultiThreadedProducer.java | 14 ++++- .../apache/qpid/testkit/soak/ResourceLeakTest.java | 20 ++++++- .../apache/qpid/testkit/soak/SimpleConsumer.java | 23 +++++++- .../apache/qpid/testkit/soak/SimpleProducer.java | 22 +++++++- .../main/java/org/apache/qpid/tools/QpidBench.java | 65 ++++++++++++++++++---- 16 files changed, 379 insertions(+), 71 deletions(-) create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b5d12d9520..af0ed3faa3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -67,16 +67,26 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.*; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.CloseConsumerMessage; +import org.apache.qpid.client.message.JMSBytesMessage; +import org.apache.qpid.client.message.JMSMapMessage; +import org.apache.qpid.client.message.JMSObjectMessage; +import org.apache.qpid.client.message.JMSStreamMessage; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.util.FlowControllingBlockingQueue; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; +import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -271,6 +281,8 @@ public abstract class AMQSession receiver, int bufferSize, long timeout) @@ -55,10 +57,18 @@ final class IoReceiver extends Thread this.bufferSize = bufferSize; this.socket = transport.getSocket(); this.timeout = timeout; - - setDaemon(true); - setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); - start(); + + try + { + receiverThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new Error("Error creating IOReceiver thread",e); + } + receiverThread.setDaemon(true); + receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + receiverThread.start(); } void close(boolean block) @@ -75,10 +85,10 @@ final class IoReceiver extends Thread { socket.shutdownInput(); } - if (block && Thread.currentThread() != this) + if (block && Thread.currentThread() != receiverThread) { - join(timeout); - if (isAlive()) + receiverThread.join(timeout); + if (receiverThread.isAlive()) { throw new TransportException("join timed out"); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 36ea14856a..29f0c766fc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -24,6 +24,7 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; @@ -32,7 +33,7 @@ import org.apache.qpid.transport.util.Logger; import static org.apache.qpid.transport.util.Functions.*; -public final class IoSender extends Thread implements Sender +public final class IoSender implements Runnable, Sender { private static final Logger log = Logger.get(IoSender.class); @@ -54,7 +55,8 @@ public final class IoSender extends Thread implements Sender private final Object notFull = new Object(); private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); - + private final Thread senderThread; + private volatile Throwable exception = null; @@ -74,9 +76,18 @@ public final class IoSender extends Thread implements Sender throw new TransportException("Error getting output stream for socket", e); } - setDaemon(true); - setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); - start(); + try + { + senderThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new Error("Error creating IOSender thread",e); + } + + senderThread.setDaemon(true); + senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + senderThread.start(); } private static final int pof2(int n) @@ -188,10 +199,10 @@ public final class IoSender extends Thread implements Sender try { - if (Thread.currentThread() != this) + if (Thread.currentThread() != senderThread) { - join(timeout); - if (isAlive()) + senderThread.join(timeout); + if (senderThread.isAlive()) { throw new SenderException("join timed out"); } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java index 35a2374fbc..4a4f3d124b 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java @@ -37,6 +37,7 @@ import javax.jms.MessageProducer; import javax.jms.TextMessage; import org.apache.qpid.testkit.MessageFactory; +import org.apache.qpid.thread.Threading; /** * Latency test sends an x number of messages in warmup mode and wait for a confirmation @@ -314,19 +315,36 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - LatencyTest latencyTest = new LatencyTest(); - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) + final LatencyTest latencyTest = new LatencyTest(); + Runnable r = new Runnable() { - try + public void run() { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); + latencyTest.test(); + latencyTest.printToConsole(); + if (System.getProperty("file") != null) + { + try + { + latencyTest.writeToFile(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating latency test thread",e); } + t.start(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java index cd12c7010d..9781a7e839 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java @@ -27,6 +27,8 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * PerfConsumer will receive x no of messages in warmup mode. * Once it receives the Start message it will then signal the PerfProducer. @@ -242,7 +244,24 @@ public class PerfConsumer extends PerfBase implements MessageListener public static void main(String[] args) { - PerfConsumer cons = new PerfConsumer(); - cons.test(); + final PerfConsumer cons = new PerfConsumer(); + Runnable r = new Runnable() + { + public void run() + { + cons.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java index 757b1bfcda..e9421d7f22 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java @@ -27,6 +27,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import org.apache.qpid.testkit.MessageFactory; +import org.apache.qpid.thread.Threading; /** * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation @@ -201,7 +202,24 @@ public class PerfProducer extends PerfBase public static void main(String[] args) { - PerfProducer prod = new PerfProducer(); - prod.test(); + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() + { + public void run() + { + prod.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java index a91d9e7e85..d5514873e6 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java @@ -29,6 +29,8 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * Test Description * ================ @@ -67,7 +69,7 @@ public class MultiThreadedConsumer extends BaseTest { final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Thread t = new Thread(new Runnable() + Runnable r = new Runnable() { public void run() { @@ -131,7 +133,18 @@ public class MultiThreadedConsumer extends BaseTest } - }); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.setName("session-" + i); t.start(); } // for loop diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java index 279e5ea0bf..1cf4ee28ca 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java @@ -32,6 +32,7 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -79,7 +80,7 @@ public class MultiThreadedProducer extends SimpleProducer for (int i = 0; i < session_count; i++) { final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - Thread t = new Thread(new Runnable() + Runnable r = new Runnable() { private Random gen = new Random(); @@ -142,7 +143,16 @@ public class MultiThreadedProducer extends SimpleProducer } - }); + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } t.setName("session-" + i); t.start(); diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java index c33f9ffbf2..1ae2c35970 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java @@ -30,6 +30,7 @@ import javax.jms.Session; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -131,8 +132,23 @@ public class ResourceLeakTest extends BaseTest public static void main(String[] args) { - ResourceLeakTest test = new ResourceLeakTest(); - test.test(); + final ResourceLeakTest test = new ResourceLeakTest(); + Runnable r = new Runnable(){ + public void run() + { + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating test thread",e); + } } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java index b3eb97dafe..cd6d9013f8 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java @@ -29,6 +29,8 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * Test Description * ================ @@ -126,9 +128,24 @@ public class SimpleConsumer extends BaseTest public static void main(String[] args) { - SimpleConsumer test = new SimpleConsumer(); - test.setUp(); - test.test(); + final SimpleConsumer test = new SimpleConsumer(); + Runnable r = new Runnable(){ + public void run() + { + test.setUp(); + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java index 1080092536..805ce7ac29 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java @@ -33,6 +33,7 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -138,9 +139,24 @@ public class SimpleProducer extends BaseTest public static void main(String[] args) { - SimpleProducer test = new SimpleProducer(); - test.setUp(); - test.test(); + final SimpleProducer test = new SimpleProducer(); + Runnable r = new Runnable(){ + public void run() + { + test.setUp(); + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } } } diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 7411e81bd6..4bba7b113d 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -20,23 +20,45 @@ */ package org.apache.qpid.tools; -import java.lang.reflect.InvocationTargetException; +import static org.apache.qpid.tools.QpidBench.Mode.BOTH; +import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; +import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; + import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.UUID; -import javax.jms.*; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.ExchangeBind; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageSubscribe; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.QueueDeclare; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; -import static org.apache.qpid.tools.QpidBench.Mode.*; - /** * QpidBench * @@ -412,7 +434,7 @@ public class QpidBench { case CONSUME: case BOTH: - new Thread() + Runnable r = new Runnable() { public void run() { @@ -432,7 +454,18 @@ public class QpidBench throw new RuntimeException(e); } } - }.start(); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); break; } @@ -440,7 +473,7 @@ public class QpidBench { case PUBLISH: case BOTH: - new Thread() + Runnable r = new Runnable() { public void run() { @@ -460,7 +493,17 @@ public class QpidBench throw new RuntimeException(e); } } - }.start(); + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating publisher thread",e); + } + t.start(); break; } } -- cgit v1.2.1