summaryrefslogtreecommitdiff
path: root/qpid/java/common/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-11-21 16:32:53 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-11-21 16:32:53 +0000
commitfbeb3752a902f5cbf225dd9fa4c6f00dbcbc3a68 (patch)
treee6b60960e326e941970b04ff45b00abb59d74986 /qpid/java/common/src
parentdd41ba8fdfecb5e60e41b714945a7ef852d7b876 (diff)
downloadqpid-python-fbeb3752a902f5cbf225dd9fa4c6f00dbcbc3a68.tar.gz
This is related to QPID-1479.
For starters I have changed the IoSender.java IoReceiver.java and AMQSession.java#Dispatcher to use the Thread factory to create the threads they require. The ThreadFactory has two implimentations, the default being the java.lang.Threads. The other is the RealtimeThreadFactory which uses reflection to create threads with a specific priority. -Dqpid.thread_factory=<thread_factory_class> will decide which thread factory should be loaded. -Dqpid.rt_thread_priority=<int> specifies the gloabl real time thread priority and defaults to 20. You could also set individual thread priorities by adding the nessacery config+code changes. I have also changed the Testkit and QpidBench to use the Thread factory so you could use them for testing/benchmarking work on RT JVMs. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719628 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java47
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java27
6 files changed, 135 insertions, 16 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
new file mode 100644
index 0000000000..94869ab205
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
@@ -0,0 +1,18 @@
+package org.apache.qpid.thread;
+
+public class DefaultThreadFactory implements ThreadFactory
+{
+
+ public Thread createThread(Runnable r)
+ {
+ return new Thread(r);
+ }
+
+ public Thread createThread(Runnable r, int priority)
+ {
+ Thread t = new Thread(r);
+ t.setPriority(priority);
+ return t;
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
new file mode 100644
index 0000000000..b711f749f8
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
@@ -0,0 +1,47 @@
+package org.apache.qpid.thread;
+
+import java.lang.reflect.Constructor;
+
+public class RealtimeThreadFactory implements ThreadFactory
+{
+ private Class threadClass;
+ private Constructor threadConstructor;
+ private Constructor priorityParameterConstructor;
+ private int defaultRTThreadPriority = 20;
+
+ public RealtimeThreadFactory() throws Exception
+ {
+ defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority",20);
+ threadClass = Class.forName("javax.realtime.RealtimeThread");
+
+ Class schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters");
+ Class releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters");
+ Class memoryParametersClass = Class.forName("javax.realtime.MemoryParameters");
+ Class memoryAreaClass = Class.forName("javax.realtime.MemoryArea");
+ Class processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters");
+
+ Class[] paramTypes = new Class[]{schedulingParametersClass,
+ releaseParametersClass,
+ memoryParametersClass,
+ memoryAreaClass,
+ processingGroupParametersClass,
+ java.lang.Runnable.class};
+
+ threadConstructor = threadClass.getConstructor(paramTypes);
+
+ Class priorityParameterClass = Class.forName("javax.realtime.PriorityParameters");
+ priorityParameterConstructor = priorityParameterClass.getConstructor(new Class[]{int.class});
+ }
+
+ public Thread createThread(Runnable r) throws Exception
+ {
+ return createThread(r,defaultRTThreadPriority);
+ }
+
+ public Thread createThread(Runnable r, int priority) throws Exception
+ {
+ Object priorityParams = priorityParameterConstructor.newInstance(priority);
+ return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r);
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
new file mode 100644
index 0000000000..f9bcabfa3d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
@@ -0,0 +1,7 @@
+package org.apache.qpid.thread;
+
+public interface ThreadFactory
+{
+ public Thread createThread(Runnable r) throws Exception;
+ public Thread createThread(Runnable r, int priority) throws Exception;
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
new file mode 100644
index 0000000000..0fb113d22c
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
@@ -0,0 +1,26 @@
+package org.apache.qpid.thread;
+
+public final class Threading
+{
+ private static ThreadFactory threadFactory;
+
+ static {
+ try
+ {
+ Class threadFactoryClass =
+ Class.forName(System.getProperty("qpid.thread_factory",
+ "org.apache.qpid.thread.DefaultThreadFactory"));
+
+ threadFactory = (ThreadFactory)threadFactoryClass.newInstance();
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error occured while loading thread factory",e);
+ }
+ }
+
+ public static ThreadFactory getThreadFactory()
+ {
+ return threadFactory;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 5efd51d5db..b245e47336 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.transport.network.io;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -35,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
*/
-final class IoReceiver extends Thread
+final class IoReceiver implements Runnable
{
private static final Logger log = Logger.get(IoReceiver.class);
@@ -46,6 +47,7 @@ final class IoReceiver extends Thread
private final Socket socket;
private final long timeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Thread receiverThread;
public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
int bufferSize, long timeout)
@@ -55,10 +57,18 @@ final class IoReceiver extends Thread
this.bufferSize = bufferSize;
this.socket = transport.getSocket();
this.timeout = timeout;
-
- setDaemon(true);
- setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
- start();
+
+ try
+ {
+ receiverThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating IOReceiver thread",e);
+ }
+ receiverThread.setDaemon(true);
+ receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+ receiverThread.start();
}
void close(boolean block)
@@ -75,10 +85,10 @@ final class IoReceiver extends Thread
{
socket.shutdownInput();
}
- if (block && Thread.currentThread() != this)
+ if (block && Thread.currentThread() != receiverThread)
{
- join(timeout);
- if (isAlive())
+ receiverThread.join(timeout);
+ if (receiverThread.isAlive())
{
throw new TransportException("join timed out");
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 36ea14856a..29f0c766fc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -24,6 +24,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
@@ -32,7 +33,7 @@ import org.apache.qpid.transport.util.Logger;
import static org.apache.qpid.transport.util.Functions.*;
-public final class IoSender extends Thread implements Sender<ByteBuffer>
+public final class IoSender implements Runnable, Sender<ByteBuffer>
{
private static final Logger log = Logger.get(IoSender.class);
@@ -54,7 +55,8 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
private final Object notFull = new Object();
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
-
+ private final Thread senderThread;
+
private volatile Throwable exception = null;
@@ -74,9 +76,18 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
throw new TransportException("Error getting output stream for socket", e);
}
- setDaemon(true);
- setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
- start();
+ try
+ {
+ senderThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating IOSender thread",e);
+ }
+
+ senderThread.setDaemon(true);
+ senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+ senderThread.start();
}
private static final int pof2(int n)
@@ -188,10 +199,10 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
try
{
- if (Thread.currentThread() != this)
+ if (Thread.currentThread() != senderThread)
{
- join(timeout);
- if (isAlive())
+ senderThread.join(timeout);
+ if (senderThread.isAlive())
{
throw new SenderException("join timed out");
}