From 8de38d68d7efe3b286da38c996cd8eabbe67ddc8 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 23 Apr 2007 15:52:43 +0000 Subject: QPID-472 - Creation of TemporaryQueues will not guarantee unique queue names if created rapidly. Updated TemporaryQueueTest.java so that it checks Headers/Queue/Topic for unroutable/mandatory messages beig returned. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@531512 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQTemporaryQueue.java | 19 ++- .../client/temporaryqueue/TemporaryQueueTest.java | 135 ++++++++++++++++++++- 2 files changed, 139 insertions(+), 15 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java index ce8e14506f..f54cb782c8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java @@ -25,9 +25,10 @@ import javax.jms.TemporaryQueue; import org.apache.qpid.framing.AMQShortString; -/** - * AMQ implementation of a TemporaryQueue. - */ +import java.util.Random; +import java.util.UUID; + +/** AMQ implementation of a TemporaryQueue. */ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination { @@ -35,21 +36,17 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, Tempor private final AMQSession _session; private boolean _deleted; - /** - * Create a new instance of an AMQTemporaryQueue - */ + /** Create a new instance of an AMQTemporaryQueue */ public AMQTemporaryQueue(AMQSession session) { - super(session.getTemporaryQueueExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true); + super(session.getTemporaryQueueExchangeName(), new AMQShortString("TempQueue" + UUID.randomUUID()), true); _session = session; } - /** - * @see javax.jms.TemporaryQueue#delete() - */ + /** @see javax.jms.TemporaryQueue#delete() */ public synchronized void delete() throws JMSException { - if(_session.hasConsumer(this)) + if (_session.hasConsumer(this)) { throw new JMSException("Temporary Queue has consumers so cannot be deleted"); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java index a177cf86d3..2ba5bed0b1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -7,14 +7,20 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; +import javax.jms.Queue; import junit.framework.TestCase; +import junit.framework.Assert; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; +import java.util.List; +import java.util.LinkedList; + public class TemporaryQueueTest extends TestCase { @@ -35,7 +41,7 @@ public class TemporaryQueueTest extends TestCase protected Connection createConnection() throws AMQException, URLSyntaxException { return new AMQConnection(_broker, "guest", "guest", - "fred", "test"); + "fred", "test"); } public void testTempoaryQueue() throws Exception @@ -50,14 +56,14 @@ public class TemporaryQueueTest extends TestCase producer.send(session.createTextMessage("hello")); TextMessage tm = (TextMessage) consumer.receive(2000); assertNotNull(tm); - assertEquals("hello",tm.getText()); + assertEquals("hello", tm.getText()); try { queue.delete(); fail("Expected JMSException : should not be able to delete while there are active consumers"); } - catch(JMSException je) + catch (JMSException je) { ; //pass } @@ -68,7 +74,7 @@ public class TemporaryQueueTest extends TestCase { queue.delete(); } - catch(JMSException je) + catch (JMSException je) { fail("Unexpected Exception: " + je.getMessage()); } @@ -76,6 +82,127 @@ public class TemporaryQueueTest extends TestCase conn.close(); } + public void tUniqueness() throws JMSException, AMQException, URLSyntaxException + { + int numProcs = Runtime.getRuntime().availableProcessors(); + final int threadsProc = 5; + + runUniqueness(1, 10); + runUniqueness(numProcs * threadsProc, 10); + runUniqueness(numProcs * threadsProc, 100); + runUniqueness(numProcs * threadsProc, 500); + } + + void runUniqueness(int makers, int queues) throws JMSException, AMQException, URLSyntaxException + { + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + List tqList = new LinkedList(); + + //Create Makers + for (int m = 0; m < makers; m++) + { + tqList.add(new TempQueueMaker(session, queues)); + } + + + List threadList = new LinkedList(); + + //Create Makers + for (TempQueueMaker maker : tqList) + { + threadList.add(new Thread(maker)); + } + + //Start threads + for (Thread thread : threadList) + { + thread.start(); + } + + // Join Threads + for (Thread thread : threadList) + { + try + { + thread.join(); + } + catch (InterruptedException e) + { + fail("Couldn't correctly join threads"); + } + } + + + List list = new LinkedList(); + + // Test values + for (TempQueueMaker maker : tqList) + { + check(maker, list); + } + + Assert.assertEquals("Not enough queues made.", makers * queues, list.size()); + + connection.close(); + } + + private void check(TempQueueMaker tq, List list) + { + for (AMQQueue q : tq.getList()) + { + if (list.contains(q)) + { + fail(q + " already exists."); + } + else + { + list.add(q); + } + } + } + + + class TempQueueMaker implements Runnable + { + List _queues; + Session _session; + private int _count; + + + TempQueueMaker(Session session, int queues) throws JMSException + { + _queues = new LinkedList(); + + _count = queues; + + _session = session; + } + + public void run() + { + int i = 0; + try + { + for (; i < _count; i++) + { + _queues.add((AMQQueue) _session.createTemporaryQueue()); + } + } + catch (JMSException jmse) + { + //stop + } + } + + List getList() + { + return _queues; + } + } + public static junit.framework.Test suite() { -- cgit v1.2.1