diff options
16 files changed, 379 insertions, 71 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b5d12d9520..af0ed3faa3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -67,16 +67,26 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.*; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.CloseConsumerMessage; +import org.apache.qpid.client.message.JMSBytesMessage; +import org.apache.qpid.client.message.JMSMapMessage; +import org.apache.qpid.client.message.JMSObjectMessage; +import org.apache.qpid.client.message.JMSStreamMessage; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.util.FlowControllingBlockingQueue; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; +import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -271,6 +281,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the dispatcher thread for this session. */ protected Dispatcher _dispatcher; + + protected Thread _dispatcherThread; /** Holds the message factory factory for this session. */ protected MessageFactoryRegistry _messageFactoryRegistry; @@ -668,7 +680,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcher != null) { // Failover failed and ain't coming back. Knife the dispatcher. - _dispatcher.interrupt(); + _dispatcherThread.interrupt(); } } @@ -1852,7 +1864,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void startDispatcherIfNecessary() { //If we are the dispatcher then we don't need to check we are started - if (Thread.currentThread() == _dispatcher) + if (Thread.currentThread() == _dispatcherThread) { return; } @@ -1883,9 +1895,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcher == null) { _dispatcher = new Dispatcher(); - _dispatcher.setDaemon(true); + try + { + _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher); + + } + catch(Exception e) + { + throw new Error("Error creating Dispatcher thread",e); + } + _dispatcherThread.setName("Dispatcher-Channel-" + _channelId); + _dispatcherThread.setDaemon(true); _dispatcher.setConnectionStopped(initiallyStopped); - _dispatcher.start(); + _dispatcherThread.start(); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(_dispatcherThread.getName() + " created"); + } } else { @@ -2606,7 +2632,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - class Dispatcher extends Thread + class Dispatcher implements Runnable { /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ @@ -2615,21 +2641,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final Object _lock = new Object(); private String dispatcherID = "" + System.identityHashCode(this); - - public Dispatcher() { - super("Dispatcher-Channel-" + _channelId); - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " created"); - } } public void close() { _closed.set(true); - interrupt(); + _dispatcherThread.interrupt(); // fixme awaitTermination @@ -2708,7 +2727,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (_dispatcherLogger.isInfoEnabled()) { - _dispatcherLogger.info(getName() + " started"); + _dispatcherLogger.info(_dispatcherThread.getName() + " started"); } UnprocessedMessage message; @@ -2771,7 +2790,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcherLogger.isInfoEnabled()) { - _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); + _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java new file mode 100644 index 0000000000..94869ab205 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java @@ -0,0 +1,18 @@ +package org.apache.qpid.thread; + +public class DefaultThreadFactory implements ThreadFactory +{ + + public Thread createThread(Runnable r) + { + return new Thread(r); + } + + public Thread createThread(Runnable r, int priority) + { + Thread t = new Thread(r); + t.setPriority(priority); + return t; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java new file mode 100644 index 0000000000..b711f749f8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java @@ -0,0 +1,47 @@ +package org.apache.qpid.thread; + +import java.lang.reflect.Constructor; + +public class RealtimeThreadFactory implements ThreadFactory +{ + private Class threadClass; + private Constructor threadConstructor; + private Constructor priorityParameterConstructor; + private int defaultRTThreadPriority = 20; + + public RealtimeThreadFactory() throws Exception + { + defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority",20); + threadClass = Class.forName("javax.realtime.RealtimeThread"); + + Class schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters"); + Class releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters"); + Class memoryParametersClass = Class.forName("javax.realtime.MemoryParameters"); + Class memoryAreaClass = Class.forName("javax.realtime.MemoryArea"); + Class processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters"); + + Class[] paramTypes = new Class[]{schedulingParametersClass, + releaseParametersClass, + memoryParametersClass, + memoryAreaClass, + processingGroupParametersClass, + java.lang.Runnable.class}; + + threadConstructor = threadClass.getConstructor(paramTypes); + + Class priorityParameterClass = Class.forName("javax.realtime.PriorityParameters"); + priorityParameterConstructor = priorityParameterClass.getConstructor(new Class[]{int.class}); + } + + public Thread createThread(Runnable r) throws Exception + { + return createThread(r,defaultRTThreadPriority); + } + + public Thread createThread(Runnable r, int priority) throws Exception + { + Object priorityParams = priorityParameterConstructor.newInstance(priority); + return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java new file mode 100644 index 0000000000..f9bcabfa3d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java @@ -0,0 +1,7 @@ +package org.apache.qpid.thread; + +public interface ThreadFactory +{ + public Thread createThread(Runnable r) throws Exception; + public Thread createThread(Runnable r, int priority) throws Exception; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java new file mode 100644 index 0000000000..0fb113d22c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java @@ -0,0 +1,26 @@ +package org.apache.qpid.thread; + +public final class Threading +{ + private static ThreadFactory threadFactory; + + static { + try + { + Class threadFactoryClass = + Class.forName(System.getProperty("qpid.thread_factory", + "org.apache.qpid.thread.DefaultThreadFactory")); + + threadFactory = (ThreadFactory)threadFactoryClass.newInstance(); + } + catch(Exception e) + { + throw new Error("Error occured while loading thread factory",e); + } + } + + public static ThreadFactory getThreadFactory() + { + return threadFactory; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 5efd51d5db..b245e47336 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.transport.network.io; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; @@ -35,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * */ -final class IoReceiver extends Thread +final class IoReceiver implements Runnable { private static final Logger log = Logger.get(IoReceiver.class); @@ -46,6 +47,7 @@ final class IoReceiver extends Thread private final Socket socket; private final long timeout; private final AtomicBoolean closed = new AtomicBoolean(false); + private final Thread receiverThread; public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) @@ -55,10 +57,18 @@ final class IoReceiver extends Thread this.bufferSize = bufferSize; this.socket = transport.getSocket(); this.timeout = timeout; - - setDaemon(true); - setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); - start(); + + try + { + receiverThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new Error("Error creating IOReceiver thread",e); + } + receiverThread.setDaemon(true); + receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + receiverThread.start(); } void close(boolean block) @@ -75,10 +85,10 @@ final class IoReceiver extends Thread { socket.shutdownInput(); } - if (block && Thread.currentThread() != this) + if (block && Thread.currentThread() != receiverThread) { - join(timeout); - if (isAlive()) + receiverThread.join(timeout); + if (receiverThread.isAlive()) { throw new TransportException("join timed out"); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 36ea14856a..29f0c766fc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -24,6 +24,7 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; @@ -32,7 +33,7 @@ import org.apache.qpid.transport.util.Logger; import static org.apache.qpid.transport.util.Functions.*; -public final class IoSender extends Thread implements Sender<ByteBuffer> +public final class IoSender implements Runnable, Sender<ByteBuffer> { private static final Logger log = Logger.get(IoSender.class); @@ -54,7 +55,8 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> private final Object notFull = new Object(); private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); - + private final Thread senderThread; + private volatile Throwable exception = null; @@ -74,9 +76,18 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> throw new TransportException("Error getting output stream for socket", e); } - setDaemon(true); - setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); - start(); + try + { + senderThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new Error("Error creating IOSender thread",e); + } + + senderThread.setDaemon(true); + senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + senderThread.start(); } private static final int pof2(int n) @@ -188,10 +199,10 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> try { - if (Thread.currentThread() != this) + if (Thread.currentThread() != senderThread) { - join(timeout); - if (isAlive()) + senderThread.join(timeout); + if (senderThread.isAlive()) { throw new SenderException("join timed out"); } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java index 35a2374fbc..4a4f3d124b 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java @@ -37,6 +37,7 @@ import javax.jms.MessageProducer; import javax.jms.TextMessage; import org.apache.qpid.testkit.MessageFactory; +import org.apache.qpid.thread.Threading; /** * Latency test sends an x number of messages in warmup mode and wait for a confirmation @@ -314,19 +315,36 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - LatencyTest latencyTest = new LatencyTest(); - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) + final LatencyTest latencyTest = new LatencyTest(); + Runnable r = new Runnable() { - try + public void run() { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); + latencyTest.test(); + latencyTest.printToConsole(); + if (System.getProperty("file") != null) + { + try + { + latencyTest.writeToFile(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating latency test thread",e); } + t.start(); } }
\ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java index cd12c7010d..9781a7e839 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java @@ -27,6 +27,8 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * PerfConsumer will receive x no of messages in warmup mode. * Once it receives the Start message it will then signal the PerfProducer. @@ -242,7 +244,24 @@ public class PerfConsumer extends PerfBase implements MessageListener public static void main(String[] args) { - PerfConsumer cons = new PerfConsumer(); - cons.test(); + final PerfConsumer cons = new PerfConsumer(); + Runnable r = new Runnable() + { + public void run() + { + cons.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); } }
\ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java index 757b1bfcda..e9421d7f22 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java @@ -27,6 +27,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import org.apache.qpid.testkit.MessageFactory; +import org.apache.qpid.thread.Threading; /** * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation @@ -201,7 +202,24 @@ public class PerfProducer extends PerfBase public static void main(String[] args) { - PerfProducer prod = new PerfProducer(); - prod.test(); + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() + { + public void run() + { + prod.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); } }
\ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java index a91d9e7e85..d5514873e6 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java @@ -29,6 +29,8 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * Test Description * ================ @@ -67,7 +69,7 @@ public class MultiThreadedConsumer extends BaseTest { final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Thread t = new Thread(new Runnable() + Runnable r = new Runnable() { public void run() { @@ -131,7 +133,18 @@ public class MultiThreadedConsumer extends BaseTest } - }); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.setName("session-" + i); t.start(); } // for loop diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java index 279e5ea0bf..1cf4ee28ca 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java @@ -32,6 +32,7 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -79,7 +80,7 @@ public class MultiThreadedProducer extends SimpleProducer for (int i = 0; i < session_count; i++) { final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - Thread t = new Thread(new Runnable() + Runnable r = new Runnable() { private Random gen = new Random(); @@ -142,7 +143,16 @@ public class MultiThreadedProducer extends SimpleProducer } - }); + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } t.setName("session-" + i); t.start(); diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java index c33f9ffbf2..1ae2c35970 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java @@ -30,6 +30,7 @@ import javax.jms.Session; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -131,8 +132,23 @@ public class ResourceLeakTest extends BaseTest public static void main(String[] args) { - ResourceLeakTest test = new ResourceLeakTest(); - test.test(); + final ResourceLeakTest test = new ResourceLeakTest(); + Runnable r = new Runnable(){ + public void run() + { + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating test thread",e); + } } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java index b3eb97dafe..cd6d9013f8 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java @@ -29,6 +29,8 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * Test Description * ================ @@ -126,9 +128,24 @@ public class SimpleConsumer extends BaseTest public static void main(String[] args) { - SimpleConsumer test = new SimpleConsumer(); - test.setUp(); - test.test(); + final SimpleConsumer test = new SimpleConsumer(); + Runnable r = new Runnable(){ + public void run() + { + test.setUp(); + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java index 1080092536..805ce7ac29 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java @@ -33,6 +33,7 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -138,9 +139,24 @@ public class SimpleProducer extends BaseTest public static void main(String[] args) { - SimpleProducer test = new SimpleProducer(); - test.setUp(); - test.test(); + final SimpleProducer test = new SimpleProducer(); + Runnable r = new Runnable(){ + public void run() + { + test.setUp(); + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } } } diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 7411e81bd6..4bba7b113d 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -20,23 +20,45 @@ */ package org.apache.qpid.tools; -import java.lang.reflect.InvocationTargetException; +import static org.apache.qpid.tools.QpidBench.Mode.BOTH; +import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; +import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; + import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.UUID; -import javax.jms.*; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.ExchangeBind; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageSubscribe; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.QueueDeclare; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; -import static org.apache.qpid.tools.QpidBench.Mode.*; - /** * QpidBench * @@ -412,7 +434,7 @@ public class QpidBench { case CONSUME: case BOTH: - new Thread() + Runnable r = new Runnable() { public void run() { @@ -432,7 +454,18 @@ public class QpidBench throw new RuntimeException(e); } } - }.start(); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); break; } @@ -440,7 +473,7 @@ public class QpidBench { case PUBLISH: case BOTH: - new Thread() + Runnable r = new Runnable() { public void run() { @@ -460,7 +493,17 @@ public class QpidBench throw new RuntimeException(e); } } - }.start(); + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating publisher thread",e); + } + t.start(); break; } } |
