diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2009-01-13 18:19:00 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2009-01-13 18:19:00 +0000 |
| commit | ebfdd9a2ba48e77bdf0309c4dae20053c96a1b61 (patch) | |
| tree | 536ceb90e78aea530eee53f94bf24cd7bf33ec32 /java/common/src | |
| parent | aa5745b6adde680eb12d6c6c6e94e6118094ed91 (diff) | |
| download | qpid-python-ebfdd9a2ba48e77bdf0309c4dae20053c96a1b61.tar.gz | |
This is related to QPID-1479
This commit contains the core classes for adding the thread abstraction patch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@734205 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
7 files changed, 184 insertions, 16 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java new file mode 100644 index 0000000000..94869ab205 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java @@ -0,0 +1,18 @@ +package org.apache.qpid.thread; + +public class DefaultThreadFactory implements ThreadFactory +{ + + public Thread createThread(Runnable r) + { + return new Thread(r); + } + + public Thread createThread(Runnable r, int priority) + { + Thread t = new Thread(r); + t.setPriority(priority); + return t; + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java new file mode 100644 index 0000000000..b711f749f8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java @@ -0,0 +1,47 @@ +package org.apache.qpid.thread; + +import java.lang.reflect.Constructor; + +public class RealtimeThreadFactory implements ThreadFactory +{ + private Class threadClass; + private Constructor threadConstructor; + private Constructor priorityParameterConstructor; + private int defaultRTThreadPriority = 20; + + public RealtimeThreadFactory() throws Exception + { + defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority",20); + threadClass = Class.forName("javax.realtime.RealtimeThread"); + + Class schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters"); + Class releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters"); + Class memoryParametersClass = Class.forName("javax.realtime.MemoryParameters"); + Class memoryAreaClass = Class.forName("javax.realtime.MemoryArea"); + Class processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters"); + + Class[] paramTypes = new Class[]{schedulingParametersClass, + releaseParametersClass, + memoryParametersClass, + memoryAreaClass, + processingGroupParametersClass, + java.lang.Runnable.class}; + + threadConstructor = threadClass.getConstructor(paramTypes); + + Class priorityParameterClass = Class.forName("javax.realtime.PriorityParameters"); + priorityParameterConstructor = priorityParameterClass.getConstructor(new Class[]{int.class}); + } + + public Thread createThread(Runnable r) throws Exception + { + return createThread(r,defaultRTThreadPriority); + } + + public Thread createThread(Runnable r, int priority) throws Exception + { + Object priorityParams = priorityParameterConstructor.newInstance(priority); + return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java new file mode 100644 index 0000000000..f9bcabfa3d --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java @@ -0,0 +1,7 @@ +package org.apache.qpid.thread; + +public interface ThreadFactory +{ + public Thread createThread(Runnable r) throws Exception; + public Thread createThread(Runnable r, int priority) throws Exception; +} diff --git a/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/java/common/src/main/java/org/apache/qpid/thread/Threading.java new file mode 100644 index 0000000000..0fb113d22c --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/thread/Threading.java @@ -0,0 +1,26 @@ +package org.apache.qpid.thread; + +public final class Threading +{ + private static ThreadFactory threadFactory; + + static { + try + { + Class threadFactoryClass = + Class.forName(System.getProperty("qpid.thread_factory", + "org.apache.qpid.thread.DefaultThreadFactory")); + + threadFactory = (ThreadFactory)threadFactoryClass.newInstance(); + } + catch(Exception e) + { + throw new Error("Error occured while loading thread factory",e); + } + } + + public static ThreadFactory getThreadFactory() + { + return threadFactory; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 351d8d24e8..60abb326f6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.transport.network.io; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; @@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * */ -final class IoReceiver extends Thread +final class IoReceiver implements Runnable { private static final Logger log = Logger.get(IoReceiver.class); @@ -47,6 +48,7 @@ final class IoReceiver extends Thread private final Socket socket; private final long timeout; private final AtomicBoolean closed = new AtomicBoolean(false); + private final Thread receiverThread; private final boolean shutdownBroken = ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); @@ -58,10 +60,18 @@ final class IoReceiver extends Thread this.bufferSize = bufferSize; this.socket = transport.getSocket(); this.timeout = timeout; - - setDaemon(true); - setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); - start(); + + try + { + receiverThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new Error("Error creating IOReceiver thread",e); + } + receiverThread.setDaemon(true); + receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + receiverThread.start(); } void close(boolean block) @@ -78,10 +88,10 @@ final class IoReceiver extends Thread { socket.shutdownInput(); } - if (block && Thread.currentThread() != this) + if (block && Thread.currentThread() != receiverThread) { - join(timeout); - if (isAlive()) + receiverThread.join(timeout); + if (receiverThread.isAlive()) { throw new TransportException("join timed out"); } @@ -133,6 +143,9 @@ final class IoReceiver extends Thread t.getMessage().equalsIgnoreCase("socket closed") && closed.get())) { + log.error(t, "==========================================================="); + log.error(t, "Exception"); + log.error(t, "==========================================================="); receiver.exception(t); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 36ea14856a..29f0c766fc 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -24,6 +24,7 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; @@ -32,7 +33,7 @@ import org.apache.qpid.transport.util.Logger; import static org.apache.qpid.transport.util.Functions.*; -public final class IoSender extends Thread implements Sender<ByteBuffer> +public final class IoSender implements Runnable, Sender<ByteBuffer> { private static final Logger log = Logger.get(IoSender.class); @@ -54,7 +55,8 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> private final Object notFull = new Object(); private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); - + private final Thread senderThread; + private volatile Throwable exception = null; @@ -74,9 +76,18 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> throw new TransportException("Error getting output stream for socket", e); } - setDaemon(true); - setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); - start(); + try + { + senderThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new Error("Error creating IOSender thread",e); + } + + senderThread.setDaemon(true); + senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + senderThread.start(); } private static final int pof2(int n) @@ -188,10 +199,10 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> try { - if (Thread.currentThread() != this) + if (Thread.currentThread() != senderThread) { - join(timeout); - if (isAlive()) + senderThread.join(timeout); + if (senderThread.isAlive()) { throw new SenderException("join timed out"); } diff --git a/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java b/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java new file mode 100644 index 0000000000..9932633cb9 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java @@ -0,0 +1,46 @@ +package org.apache.qpid.thread; + +import junit.framework.TestCase; + +public class ThreadFactoryTest extends TestCase +{ + public void testThreadFactory() + { + Class threadFactoryClass = null; + try + { + threadFactoryClass = Class.forName(System.getProperty("qpid.thread_factory", + "org.apache.qpid.thread.DefaultThreadFactory")); + } + // If the thread factory class was wrong it will flagged way before it gets here. + catch(Exception e) + { + fail("Invalid thread factory class"); + } + + assertEquals(threadFactoryClass, Threading.getThreadFactory().getClass()); + } + + public void testThreadCreate() + { + Runnable r = new Runnable(){ + + public void run(){ + + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r,5); + } + catch(Exception e) + { + fail("Error creating thread using Qpid thread factory"); + } + + assertNotNull(t); + assertEquals(5,t.getPriority()); + } +} |
