summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java55
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java47
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java27
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java38
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java23
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java22
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java17
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java14
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java20
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java23
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java22
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java65
16 files changed, 379 insertions, 71 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b5d12d9520..af0ed3faa3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -67,16 +67,26 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.CloseConsumerMessage;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.JMSObjectMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,6 +281,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the dispatcher thread for this session. */
protected Dispatcher _dispatcher;
+
+ protected Thread _dispatcherThread;
/** Holds the message factory factory for this session. */
protected MessageFactoryRegistry _messageFactoryRegistry;
@@ -668,7 +680,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcher != null)
{
// Failover failed and ain't coming back. Knife the dispatcher.
- _dispatcher.interrupt();
+ _dispatcherThread.interrupt();
}
}
@@ -1852,7 +1864,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void startDispatcherIfNecessary()
{
//If we are the dispatcher then we don't need to check we are started
- if (Thread.currentThread() == _dispatcher)
+ if (Thread.currentThread() == _dispatcherThread)
{
return;
}
@@ -1883,9 +1895,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcher == null)
{
_dispatcher = new Dispatcher();
- _dispatcher.setDaemon(true);
+ try
+ {
+ _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
+
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating Dispatcher thread",e);
+ }
+ _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
+ _dispatcherThread.setDaemon(true);
_dispatcher.setConnectionStopped(initiallyStopped);
- _dispatcher.start();
+ _dispatcherThread.start();
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(_dispatcherThread.getName() + " created");
+ }
}
else
{
@@ -2606,7 +2632,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
- class Dispatcher extends Thread
+ class Dispatcher implements Runnable
{
/** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
@@ -2615,21 +2641,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final Object _lock = new Object();
private String dispatcherID = "" + System.identityHashCode(this);
-
-
public Dispatcher()
{
- super("Dispatcher-Channel-" + _channelId);
- if (_dispatcherLogger.isInfoEnabled())
- {
- _dispatcherLogger.info(getName() + " created");
- }
}
public void close()
{
_closed.set(true);
- interrupt();
+ _dispatcherThread.interrupt();
// fixme awaitTermination
@@ -2708,7 +2727,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(getName() + " started");
+ _dispatcherLogger.info(_dispatcherThread.getName() + " started");
}
UnprocessedMessage message;
@@ -2771,7 +2790,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
+ _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
new file mode 100644
index 0000000000..94869ab205
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
@@ -0,0 +1,18 @@
+package org.apache.qpid.thread;
+
+public class DefaultThreadFactory implements ThreadFactory
+{
+
+ public Thread createThread(Runnable r)
+ {
+ return new Thread(r);
+ }
+
+ public Thread createThread(Runnable r, int priority)
+ {
+ Thread t = new Thread(r);
+ t.setPriority(priority);
+ return t;
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
new file mode 100644
index 0000000000..b711f749f8
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
@@ -0,0 +1,47 @@
+package org.apache.qpid.thread;
+
+import java.lang.reflect.Constructor;
+
+public class RealtimeThreadFactory implements ThreadFactory
+{
+ private Class threadClass;
+ private Constructor threadConstructor;
+ private Constructor priorityParameterConstructor;
+ private int defaultRTThreadPriority = 20;
+
+ public RealtimeThreadFactory() throws Exception
+ {
+ defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority",20);
+ threadClass = Class.forName("javax.realtime.RealtimeThread");
+
+ Class schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters");
+ Class releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters");
+ Class memoryParametersClass = Class.forName("javax.realtime.MemoryParameters");
+ Class memoryAreaClass = Class.forName("javax.realtime.MemoryArea");
+ Class processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters");
+
+ Class[] paramTypes = new Class[]{schedulingParametersClass,
+ releaseParametersClass,
+ memoryParametersClass,
+ memoryAreaClass,
+ processingGroupParametersClass,
+ java.lang.Runnable.class};
+
+ threadConstructor = threadClass.getConstructor(paramTypes);
+
+ Class priorityParameterClass = Class.forName("javax.realtime.PriorityParameters");
+ priorityParameterConstructor = priorityParameterClass.getConstructor(new Class[]{int.class});
+ }
+
+ public Thread createThread(Runnable r) throws Exception
+ {
+ return createThread(r,defaultRTThreadPriority);
+ }
+
+ public Thread createThread(Runnable r, int priority) throws Exception
+ {
+ Object priorityParams = priorityParameterConstructor.newInstance(priority);
+ return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r);
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
new file mode 100644
index 0000000000..f9bcabfa3d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
@@ -0,0 +1,7 @@
+package org.apache.qpid.thread;
+
+public interface ThreadFactory
+{
+ public Thread createThread(Runnable r) throws Exception;
+ public Thread createThread(Runnable r, int priority) throws Exception;
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
new file mode 100644
index 0000000000..0fb113d22c
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
@@ -0,0 +1,26 @@
+package org.apache.qpid.thread;
+
+public final class Threading
+{
+ private static ThreadFactory threadFactory;
+
+ static {
+ try
+ {
+ Class threadFactoryClass =
+ Class.forName(System.getProperty("qpid.thread_factory",
+ "org.apache.qpid.thread.DefaultThreadFactory"));
+
+ threadFactory = (ThreadFactory)threadFactoryClass.newInstance();
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error occured while loading thread factory",e);
+ }
+ }
+
+ public static ThreadFactory getThreadFactory()
+ {
+ return threadFactory;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 5efd51d5db..b245e47336 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.transport.network.io;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -35,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
*/
-final class IoReceiver extends Thread
+final class IoReceiver implements Runnable
{
private static final Logger log = Logger.get(IoReceiver.class);
@@ -46,6 +47,7 @@ final class IoReceiver extends Thread
private final Socket socket;
private final long timeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Thread receiverThread;
public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
int bufferSize, long timeout)
@@ -55,10 +57,18 @@ final class IoReceiver extends Thread
this.bufferSize = bufferSize;
this.socket = transport.getSocket();
this.timeout = timeout;
-
- setDaemon(true);
- setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
- start();
+
+ try
+ {
+ receiverThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating IOReceiver thread",e);
+ }
+ receiverThread.setDaemon(true);
+ receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+ receiverThread.start();
}
void close(boolean block)
@@ -75,10 +85,10 @@ final class IoReceiver extends Thread
{
socket.shutdownInput();
}
- if (block && Thread.currentThread() != this)
+ if (block && Thread.currentThread() != receiverThread)
{
- join(timeout);
- if (isAlive())
+ receiverThread.join(timeout);
+ if (receiverThread.isAlive())
{
throw new TransportException("join timed out");
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 36ea14856a..29f0c766fc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -24,6 +24,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
@@ -32,7 +33,7 @@ import org.apache.qpid.transport.util.Logger;
import static org.apache.qpid.transport.util.Functions.*;
-public final class IoSender extends Thread implements Sender<ByteBuffer>
+public final class IoSender implements Runnable, Sender<ByteBuffer>
{
private static final Logger log = Logger.get(IoSender.class);
@@ -54,7 +55,8 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
private final Object notFull = new Object();
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
-
+ private final Thread senderThread;
+
private volatile Throwable exception = null;
@@ -74,9 +76,18 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
throw new TransportException("Error getting output stream for socket", e);
}
- setDaemon(true);
- setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
- start();
+ try
+ {
+ senderThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating IOSender thread",e);
+ }
+
+ senderThread.setDaemon(true);
+ senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+ senderThread.start();
}
private static final int pof2(int n)
@@ -188,10 +199,10 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
try
{
- if (Thread.currentThread() != this)
+ if (Thread.currentThread() != senderThread)
{
- join(timeout);
- if (isAlive())
+ senderThread.join(timeout);
+ if (senderThread.isAlive())
{
throw new SenderException("join timed out");
}
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
index 35a2374fbc..4a4f3d124b 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
@@ -37,6 +37,7 @@ import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.apache.qpid.testkit.MessageFactory;
+import org.apache.qpid.thread.Threading;
/**
* Latency test sends an x number of messages in warmup mode and wait for a confirmation
@@ -314,19 +315,36 @@ public class LatencyTest extends PerfBase implements MessageListener
public static void main(String[] args)
{
- LatencyTest latencyTest = new LatencyTest();
- latencyTest.test();
- latencyTest.printToConsole();
- if (System.getProperty("file") != null)
+ final LatencyTest latencyTest = new LatencyTest();
+ Runnable r = new Runnable()
{
- try
+ public void run()
{
- latencyTest.writeToFile();
- }
- catch(Exception e)
- {
- e.printStackTrace();
+ latencyTest.test();
+ latencyTest.printToConsole();
+ if (System.getProperty("file") != null)
+ {
+ try
+ {
+ latencyTest.writeToFile();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
}
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating latency test thread",e);
}
+ t.start();
}
} \ No newline at end of file
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
index cd12c7010d..9781a7e839 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
@@ -27,6 +27,8 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
+import org.apache.qpid.thread.Threading;
+
/**
* PerfConsumer will receive x no of messages in warmup mode.
* Once it receives the Start message it will then signal the PerfProducer.
@@ -242,7 +244,24 @@ public class PerfConsumer extends PerfBase implements MessageListener
public static void main(String[] args)
{
- PerfConsumer cons = new PerfConsumer();
- cons.test();
+ final PerfConsumer cons = new PerfConsumer();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ cons.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
}
} \ No newline at end of file
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
index 757b1bfcda..e9421d7f22 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
@@ -27,6 +27,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import org.apache.qpid.testkit.MessageFactory;
+import org.apache.qpid.thread.Threading;
/**
* PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
@@ -201,7 +202,24 @@ public class PerfProducer extends PerfBase
public static void main(String[] args)
{
- PerfProducer prod = new PerfProducer();
- prod.test();
+ final PerfProducer prod = new PerfProducer();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ prod.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
+ t.start();
}
} \ No newline at end of file
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
index a91d9e7e85..d5514873e6 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
@@ -29,6 +29,8 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.qpid.thread.Threading;
+
/**
* Test Description
* ================
@@ -67,7 +69,7 @@ public class MultiThreadedConsumer extends BaseTest
{
final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread(new Runnable()
+ Runnable r = new Runnable()
{
public void run()
{
@@ -131,7 +133,18 @@ public class MultiThreadedConsumer extends BaseTest
}
- });
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+
t.setName("session-" + i);
t.start();
} // for loop
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
index 279e5ea0bf..1cf4ee28ca 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
@@ -32,6 +32,7 @@ import javax.jms.TextMessage;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
@@ -79,7 +80,7 @@ public class MultiThreadedProducer extends SimpleProducer
for (int i = 0; i < session_count; i++)
{
final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread(new Runnable()
+ Runnable r = new Runnable()
{
private Random gen = new Random();
@@ -142,7 +143,16 @@ public class MultiThreadedProducer extends SimpleProducer
}
- });
+ };
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
t.setName("session-" + i);
t.start();
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
index c33f9ffbf2..1ae2c35970 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
@@ -30,6 +30,7 @@ import javax.jms.Session;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
@@ -131,8 +132,23 @@ public class ResourceLeakTest extends BaseTest
public static void main(String[] args)
{
- ResourceLeakTest test = new ResourceLeakTest();
- test.test();
+ final ResourceLeakTest test = new ResourceLeakTest();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating test thread",e);
+ }
}
}
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
index b3eb97dafe..cd6d9013f8 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
@@ -29,6 +29,8 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.qpid.thread.Threading;
+
/**
* Test Description
* ================
@@ -126,9 +128,24 @@ public class SimpleConsumer extends BaseTest
public static void main(String[] args)
{
- SimpleConsumer test = new SimpleConsumer();
- test.setUp();
- test.test();
+ final SimpleConsumer test = new SimpleConsumer();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.setUp();
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
}
}
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
index 1080092536..805ce7ac29 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
@@ -33,6 +33,7 @@ import javax.jms.TextMessage;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
@@ -138,9 +139,24 @@ public class SimpleProducer extends BaseTest
public static void main(String[] args)
{
- SimpleProducer test = new SimpleProducer();
- test.setUp();
- test.test();
+ final SimpleProducer test = new SimpleProducer();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.setUp();
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
}
}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
index 7411e81bd6..4bba7b113d 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
@@ -20,23 +20,45 @@
*/
package org.apache.qpid.tools;
-import java.lang.reflect.InvocationTargetException;
+import static org.apache.qpid.tools.QpidBench.Mode.BOTH;
+import static org.apache.qpid.tools.QpidBench.Mode.CONSUME;
+import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH;
+
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
-import javax.jms.*;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.ExchangeBind;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageSubscribe;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.QueueDeclare;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
-import static org.apache.qpid.tools.QpidBench.Mode.*;
-
/**
* QpidBench
*
@@ -412,7 +434,7 @@ public class QpidBench
{
case CONSUME:
case BOTH:
- new Thread()
+ Runnable r = new Runnable()
{
public void run()
{
@@ -432,7 +454,18 @@ public class QpidBench
throw new RuntimeException(e);
}
}
- }.start();
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
break;
}
@@ -440,7 +473,7 @@ public class QpidBench
{
case PUBLISH:
case BOTH:
- new Thread()
+ Runnable r = new Runnable()
{
public void run()
{
@@ -460,7 +493,17 @@ public class QpidBench
throw new RuntimeException(e);
}
}
- }.start();
+ };
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating publisher thread",e);
+ }
+ t.start();
break;
}
}