summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java19
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java23
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java135
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java4
5 files changed, 154 insertions, 29 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 7b65f279be..93724c9920 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -644,7 +644,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
catch (AMQException e)
{
- throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+ throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
index ce8e14506f..f54cb782c8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
@@ -25,9 +25,10 @@ import javax.jms.TemporaryQueue;
import org.apache.qpid.framing.AMQShortString;
-/**
- * AMQ implementation of a TemporaryQueue.
- */
+import java.util.Random;
+import java.util.UUID;
+
+/** AMQ implementation of a TemporaryQueue. */
final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
{
@@ -35,21 +36,17 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, Tempor
private final AMQSession _session;
private boolean _deleted;
- /**
- * Create a new instance of an AMQTemporaryQueue
- */
+ /** Create a new instance of an AMQTemporaryQueue */
public AMQTemporaryQueue(AMQSession session)
{
- super(session.getTemporaryQueueExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true);
+ super(session.getTemporaryQueueExchangeName(), new AMQShortString("TempQueue" + UUID.randomUUID()), true);
_session = session;
}
- /**
- * @see javax.jms.TemporaryQueue#delete()
- */
+ /** @see javax.jms.TemporaryQueue#delete() */
public synchronized void delete() throws JMSException
{
- if(_session.hasConsumer(this))
+ if (_session.hasConsumer(this))
{
throw new JMSException("Temporary Queue has consumers so cannot be deleted");
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
index 42594fff8e..97fbf9876f 100644
--- a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
@@ -43,16 +43,13 @@ import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
- * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
*/
public class ResetMessageListenerTest extends TestCase
{
@@ -78,6 +75,8 @@ public class ResetMessageListenerTest extends TestCase
super.setUp();
TransportConnection.createVMBroker(1);
+ System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
+
InitialContextFactory factory = new PropertiesFileInitialContextFactory();
Hashtable<String, String> env = new Hashtable<String, String>();
@@ -203,7 +202,7 @@ public class ResetMessageListenerTest extends TestCase
try
{
- _clientConnection.stop();
+ _clientConnection.stop();
}
catch (JMSException e)
{
@@ -226,7 +225,7 @@ public class ResetMessageListenerTest extends TestCase
}
}
});
-
+
_clientConnection.start();
}
catch (javax.jms.IllegalStateException e)
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
index a177cf86d3..2ba5bed0b1 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
@@ -7,14 +7,20 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
+import javax.jms.Queue;
import junit.framework.TestCase;
+import junit.framework.Assert;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
+import java.util.List;
+import java.util.LinkedList;
+
public class TemporaryQueueTest extends TestCase
{
@@ -35,7 +41,7 @@ public class TemporaryQueueTest extends TestCase
protected Connection createConnection() throws AMQException, URLSyntaxException
{
return new AMQConnection(_broker, "guest", "guest",
- "fred", "test");
+ "fred", "test");
}
public void testTempoaryQueue() throws Exception
@@ -50,14 +56,14 @@ public class TemporaryQueueTest extends TestCase
producer.send(session.createTextMessage("hello"));
TextMessage tm = (TextMessage) consumer.receive(2000);
assertNotNull(tm);
- assertEquals("hello",tm.getText());
+ assertEquals("hello", tm.getText());
try
{
queue.delete();
fail("Expected JMSException : should not be able to delete while there are active consumers");
}
- catch(JMSException je)
+ catch (JMSException je)
{
; //pass
}
@@ -68,7 +74,7 @@ public class TemporaryQueueTest extends TestCase
{
queue.delete();
}
- catch(JMSException je)
+ catch (JMSException je)
{
fail("Unexpected Exception: " + je.getMessage());
}
@@ -76,6 +82,127 @@ public class TemporaryQueueTest extends TestCase
conn.close();
}
+ public void tUniqueness() throws JMSException, AMQException, URLSyntaxException
+ {
+ int numProcs = Runtime.getRuntime().availableProcessors();
+ final int threadsProc = 5;
+
+ runUniqueness(1, 10);
+ runUniqueness(numProcs * threadsProc, 10);
+ runUniqueness(numProcs * threadsProc, 100);
+ runUniqueness(numProcs * threadsProc, 500);
+ }
+
+ void runUniqueness(int makers, int queues) throws JMSException, AMQException, URLSyntaxException
+ {
+ Connection connection = createConnection();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
+
+ //Create Makers
+ for (int m = 0; m < makers; m++)
+ {
+ tqList.add(new TempQueueMaker(session, queues));
+ }
+
+
+ List<Thread> threadList = new LinkedList<Thread>();
+
+ //Create Makers
+ for (TempQueueMaker maker : tqList)
+ {
+ threadList.add(new Thread(maker));
+ }
+
+ //Start threads
+ for (Thread thread : threadList)
+ {
+ thread.start();
+ }
+
+ // Join Threads
+ for (Thread thread : threadList)
+ {
+ try
+ {
+ thread.join();
+ }
+ catch (InterruptedException e)
+ {
+ fail("Couldn't correctly join threads");
+ }
+ }
+
+
+ List<AMQQueue> list = new LinkedList<AMQQueue>();
+
+ // Test values
+ for (TempQueueMaker maker : tqList)
+ {
+ check(maker, list);
+ }
+
+ Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
+
+ connection.close();
+ }
+
+ private void check(TempQueueMaker tq, List<AMQQueue> list)
+ {
+ for (AMQQueue q : tq.getList())
+ {
+ if (list.contains(q))
+ {
+ fail(q + " already exists.");
+ }
+ else
+ {
+ list.add(q);
+ }
+ }
+ }
+
+
+ class TempQueueMaker implements Runnable
+ {
+ List<AMQQueue> _queues;
+ Session _session;
+ private int _count;
+
+
+ TempQueueMaker(Session session, int queues) throws JMSException
+ {
+ _queues = new LinkedList<AMQQueue>();
+
+ _count = queues;
+
+ _session = session;
+ }
+
+ public void run()
+ {
+ int i = 0;
+ try
+ {
+ for (; i < _count; i++)
+ {
+ _queues.add((AMQQueue) _session.createTemporaryQueue());
+ }
+ }
+ catch (JMSException jmse)
+ {
+ //stop
+ }
+ }
+
+ List<AMQQueue> getList()
+ {
+ return _queues;
+ }
+ }
+
public static junit.framework.Test suite()
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index 62234ad21f..d52707d965 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -181,7 +181,9 @@ public class MessageRequeueTest extends TestCase
passed = true;
}
- /** multiple consumers */
+ /** multiple consumers
+ * Based on code subbmitted by client FT-304
+ */
public void testTwoCompetingConsumers()
{
Consumer c1 = new Consumer();