From dea98703539c212fbb2ac0db304e107f48d237ac Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 13 Jan 2009 18:19:00 +0000 Subject: This is related to QPID-1479 This commit contains the core classes for adding the thread abstraction patch git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@734205 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 55 +++++++++++++++------- .../apache/qpid/thread/DefaultThreadFactory.java | 18 +++++++ .../apache/qpid/thread/RealtimeThreadFactory.java | 47 ++++++++++++++++++ .../java/org/apache/qpid/thread/ThreadFactory.java | 7 +++ .../java/org/apache/qpid/thread/Threading.java | 26 ++++++++++ .../qpid/transport/network/io/IoReceiver.java | 29 ++++++++---- .../apache/qpid/transport/network/io/IoSender.java | 27 +++++++---- .../org/apache/qpid/thread/ThreadFactoryTest.java | 46 ++++++++++++++++++ 8 files changed, 221 insertions(+), 34 deletions(-) create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java create mode 100644 qpid/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b5d12d9520..af0ed3faa3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -67,16 +67,26 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.*; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.CloseConsumerMessage; +import org.apache.qpid.client.message.JMSBytesMessage; +import org.apache.qpid.client.message.JMSMapMessage; +import org.apache.qpid.client.message.JMSObjectMessage; +import org.apache.qpid.client.message.JMSStreamMessage; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.util.FlowControllingBlockingQueue; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; +import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -271,6 +281,8 @@ public abstract class AMQSession +public final class IoSender implements Runnable, Sender { private static final Logger log = Logger.get(IoSender.class); @@ -54,7 +55,8 @@ public final class IoSender extends Thread implements Sender private final Object notFull = new Object(); private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); - + private final Thread senderThread; + private volatile Throwable exception = null; @@ -74,9 +76,18 @@ public final class IoSender extends Thread implements Sender throw new TransportException("Error getting output stream for socket", e); } - setDaemon(true); - setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); - start(); + try + { + senderThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new Error("Error creating IOSender thread",e); + } + + senderThread.setDaemon(true); + senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + senderThread.start(); } private static final int pof2(int n) @@ -188,10 +199,10 @@ public final class IoSender extends Thread implements Sender try { - if (Thread.currentThread() != this) + if (Thread.currentThread() != senderThread) { - join(timeout); - if (isAlive()) + senderThread.join(timeout); + if (senderThread.isAlive()) { throw new SenderException("join timed out"); } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java b/qpid/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java new file mode 100644 index 0000000000..9932633cb9 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java @@ -0,0 +1,46 @@ +package org.apache.qpid.thread; + +import junit.framework.TestCase; + +public class ThreadFactoryTest extends TestCase +{ + public void testThreadFactory() + { + Class threadFactoryClass = null; + try + { + threadFactoryClass = Class.forName(System.getProperty("qpid.thread_factory", + "org.apache.qpid.thread.DefaultThreadFactory")); + } + // If the thread factory class was wrong it will flagged way before it gets here. + catch(Exception e) + { + fail("Invalid thread factory class"); + } + + assertEquals(threadFactoryClass, Threading.getThreadFactory().getClass()); + } + + public void testThreadCreate() + { + Runnable r = new Runnable(){ + + public void run(){ + + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r,5); + } + catch(Exception e) + { + fail("Error creating thread using Qpid thread factory"); + } + + assertNotNull(t); + assertEquals(5,t.getPriority()); + } +} -- cgit v1.2.1