From 9930676624d4c27f866c9d40227fd7c282f4dac2 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 21 Apr 2008 14:03:35 +0000 Subject: Initial checkpoint of queue refactoring work git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@650148 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 12 +- .../apache/qpid/client/BasicMessageConsumer.java | 6 +- .../client/handler/BasicDeliverMethodHandler.java | 2 +- .../qpid/client/protocol/AMQProtocolSession.java | 15 +- .../qpid/client/AMQQueueDeferredOrderingTest.java | 2 +- .../client/MessageListenerMultiConsumerTest.java | 16 +- .../qpid/client/ResetMessageListenerTest.java | 11 +- .../org/apache/qpid/test/unit/ack/RecoverTest.java | 1 + .../qpid/test/unit/basic/SessionStartTest.java | 4 + .../client/channelclose/ChannelCloseOkTest.java | 24 ++- .../qpid/test/unit/client/forwardall/Client.java | 6 +- .../test/unit/client/forwardall/CombinedTest.java | 11 +- .../qpid/test/unit/client/forwardall/Service.java | 10 +- .../unit/client/forwardall/ServiceCreator.java | 9 +- .../qpid/test/unit/close/MessageRequeueTest.java | 73 +++++-- .../test/unit/topic/DurableSubscriptionTest.java | 2 +- .../qpid/test/unit/topic/TopicSessionTest.java | 225 ++++++++++++++++++++- 17 files changed, 355 insertions(+), 74 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c3219e6564..bba39403a5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -182,6 +182,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _fastAccessConsumers[i] = null; } } + + + + public String toString() + { + return "{ Fast: " + Arrays.asList(_fastAccessConsumers) + " ; Slow: " + _slowAccessConsumers + "}"; + } } @@ -299,9 +306,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final IdToConsumerMap _consumers = new IdToConsumerMap(); - //Map _consumers = - //new ConcurrentHashMap(); - /** * Contains a list of consumers which have been removed but which might still have * messages to acknowledge, eg in client ack or transacted modes @@ -1419,7 +1423,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (message.isDeliverMessage()) { _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag()); + _queue.add(message); + } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index efbce6033b..5b1c1aeeee 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -809,9 +809,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeDelivered() { - while (!_receivedDeliveryTags.isEmpty()) + while (!_receivedDeliveryTags.isEmpty()) { - _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); + _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); } } @@ -1017,7 +1017,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); rollback(); - } + } clearReceiveQueue(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index d05e99d210..aa7599f355 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -46,7 +46,7 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener received, int count) @@ -199,15 +199,17 @@ public class ChannelCloseOkTest extends TestCase producer1.send(session.createTextMessage(message)); } - private void waitFor(List received, int count) throws InterruptedException + private void waitFor(List received, final int count) throws InterruptedException { + int lastSeen = -1; synchronized (received) { - while (received.size() < count) + while ((lastSeen != received.size()) && (lastSeen = received.size()) < count) { + try { - received.wait(); + received.wait(2000L); } catch (InterruptedException e) { @@ -216,6 +218,10 @@ public class ChannelCloseOkTest extends TestCase } } } + if(received.size() < count) + { + throw new RuntimeException("Expected: " + count + " got: " + received.size()); + } } private static String randomize(String in) diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java index 19ef612bcc..2ee29e3da4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java @@ -75,7 +75,9 @@ public class Client implements MessageListener public synchronized void onMessage(Message response) { + _logger.info("Received " + (++_count) + " of " + _expected + " responses."); + if (_count == _expected) { @@ -89,10 +91,10 @@ public class Client implements MessageListener if (_count < _expected) { - wait(10000L); + wait(1000L); } - if (_count < _expected) + if (_count != _expected) { throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java index 9cde24dd92..81227b9540 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java @@ -22,9 +22,13 @@ package org.apache.qpid.test.unit.client.forwardall; import junit.framework.TestCase; import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.server.queue.SimpleAMQQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; +import java.util.HashSet; + /** * Runs the Service's and Client parts of the test in the same process * as the broker @@ -34,6 +38,7 @@ public class CombinedTest extends TestCase private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class); private int run = 0; + protected void setUp() throws Exception { super.setUp(); @@ -47,16 +52,16 @@ public class CombinedTest extends TestCase public void testForwardAll() throws Exception { - while (run < 10) + while (run < 100) { int services = 2; ServiceCreator.start("vm://:1", services); - + Thread.sleep(100); _logger.info("Starting " + ++run + " client..."); new Client("vm://:1", services).shutdownWhenComplete(); - + ServiceCreator.closeAll(); _logger.info("Completed " + run + " successfully!"); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java index 6593f7d86a..bf03ce6899 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java @@ -37,14 +37,16 @@ public class Service implements MessageListener { private final AMQConnection _connection; private final AMQSession _session; + private final int _id; - Service(String broker) throws Exception + Service(String broker, int id) throws Exception { - this(connect(broker)); + this(connect(broker), id); } - Service(AMQConnection connection) throws Exception + Service(AMQConnection connection, int id) throws Exception { + _id = id; _connection = connection; AMQQueue queue = new SpecialQueue(connection, "ServiceQueue"); _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -56,7 +58,7 @@ public class Service implements MessageListener { try { - Message response = _session.createTextMessage("Response!"); + Message response = _session.createTextMessage("Response! " + _id); Destination replyTo = request.getJMSReplyTo(); _session.createProducer(replyTo).send(response); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java index be16f6b7ae..310a0993bc 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java @@ -34,17 +34,19 @@ public class ServiceCreator implements Runnable private final String broker; private Service service; + private final int id; - ServiceCreator(String broker) + ServiceCreator(String broker, final int id) { this.broker = broker; + this.id = id; } public void run() { try { - service = new Service(broker); + service = new Service(broker, id); } catch (Exception e) { @@ -76,11 +78,12 @@ public class ServiceCreator implements Runnable { threads = new Thread[services]; _services = new ServiceCreator[services]; - ServiceCreator runner = new ServiceCreator(broker); + //ServiceCreator runner = new ServiceCreator(broker); // start services _logger.info("Starting " + services + " services..."); for (int i = 0; i < services; i++) { + ServiceCreator runner = new ServiceCreator(broker,i); threads[i] = new Thread(runner); _services[i] = runner; threads[i].start(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index 8d7645c1fd..56904f20de 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -40,30 +40,40 @@ import javax.jms.Queue; import javax.jms.Session; import java.util.concurrent.atomic.AtomicInteger; +import java.util.Random; +import java.util.UUID; public class MessageRequeueTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class); + + protected static AtomicInteger consumerIds = new AtomicInteger(0); protected final Integer numTestMessages = 150; protected final int consumeTimeout = 3000; - protected final String queue = "direct://amq.direct//queue"; + //protected final String queue = "direct://amq.direct//queue"; protected String payload = "Message:"; protected final String BROKER = "vm://:1"; private boolean testReception = true; private long[] receieved = new long[numTestMessages + 1]; - private boolean passed = false; + //private boolean passed = false; protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); + + } + + private void putMessagesOnQueueThenClose(String queue) + throws JMSException, InterruptedException + { QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -76,20 +86,25 @@ public class MessageRequeueTest extends TestCase conn.disconnect(); } - protected void tearDown() throws Exception + + + private void tearDownQueue(String queue) + throws JMSException, InterruptedException + { - super.tearDown(); + QpidClientConnection conn = new QpidClientConnection(BROKER); - if (!passed) // clean up - { - QpidClientConnection conn = new QpidClientConnection(BROKER); + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); - conn.connect(); - // clear queue - conn.consume(queue, consumeTimeout); + conn.disconnect(); + } - conn.disconnect(); - } + + protected void tearDown() throws Exception + { + super.tearDown(); TransportConnection.killVMBroker(1); } @@ -102,6 +117,11 @@ public class MessageRequeueTest extends TestCase */ public void testDrain() throws JMSException, InterruptedException { + + String queue = "direct://amq.direct//queue" + UUID.randomUUID(); + + putMessagesOnQueueThenClose(queue); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -172,18 +192,22 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); - passed = true; + tearDownQueue(queue); } /** multiple consumers * Based on code subbmitted by client FT-304 */ - public void testCompetingConsumers() + public void testCompetingConsumers() throws JMSException, InterruptedException { - Consumer c1 = new Consumer(); - Consumer c2 = new Consumer(); - Consumer c3 = new Consumer(); - Consumer c4 = new Consumer(); + String queue = "direct://amq.direct//queue" + UUID.randomUUID(); + + putMessagesOnQueueThenClose(queue); + + Consumer c1 = new Consumer(queue); + Consumer c2 = new Consumer(queue); + Consumer c3 = new Consumer(queue); + Consumer c4 = new Consumer(queue); Thread t1 = new Thread(c1); Thread t2 = new Thread(c2); @@ -237,16 +261,18 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertTrue("number of consumed messages does not match initial data: " + totalConsumed, numTestMessages <= totalConsumed); - passed = true; + tearDownQueue(queue); } class Consumer implements Runnable { private Integer count = 0; private Integer id; + private final String _queue; - public Consumer() + public Consumer(String queue) { + _queue = queue; id = consumerIds.addAndGet(1); } @@ -263,7 +289,7 @@ public class MessageRequeueTest extends TestCase Message result; do { - result = conn.getNextMessage(queue, consumeTimeout); + result = conn.getNextMessage(_queue, consumeTimeout); if (result != null) { @@ -322,8 +348,11 @@ public class MessageRequeueTest extends TestCase } } - public void testRequeue() throws JMSException, AMQException, URLSyntaxException + public void testRequeue() throws JMSException, AMQException, URLSyntaxException, InterruptedException { + String queue = "direct://amq.direct//queue" + UUID.randomUUID(); + putMessagesOnQueueThenClose(queue); + int run = 0; // while (run < 10) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 101cba2352..98c0225096 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -88,7 +88,7 @@ public class DurableSubscriptionTest extends TestCase Message msg; _logger.info("Receive message on consumer 1:expecting A"); - msg = consumer1.receive(); + msg = consumer1.receive(1000); assertEquals("A", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(1000); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 065b06a87d..39730ef3ac 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -20,28 +20,33 @@ */ package org.apache.qpid.test.unit.topic; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; +import javax.jms.*; import javax.jms.MessageConsumer; import javax.jms.Session; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; +import javax.jms.Message; import junit.framework.TestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.qpid.jms.*; + +import java.util.UUID; /** @author Apache Software Foundation */ public class TopicSessionTest extends TestCase { private static final String BROKER = "vm://:1"; + private static final int THREADS = 20; + private static final int MESSAGE_COUNT = 10000; + private static final int MESSAGE_SIZE = 128; protected void setUp() throws Exception { @@ -102,6 +107,60 @@ public class TopicSessionTest extends TestCase subscriptionNameReuseForDifferentTopic(true); } + public void notestSilly() throws Exception + { + + + final ExceptionListener listener = new ExceptionListener() + { + public void onException(JMSException jmsException) + { + //To change body of implemented methods use File | Settings | File Templates. + } + }; + + + Thread[] threads = new Thread[100]; + + for(int j = 0; j < 20; j++) + { + threads[j] = new Thread(new Runnable() { + public void run() + { + try + { + AMQConnection con = new AMQConnection("tcp://127.0.0.1:5672?retries='0'", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con, "MyTopic1" + UUID.randomUUID()); + + + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + + con.setExceptionListener(listener); + + TopicPublisher publisher = session1.createPublisher(topic); + + con.start(); + + while(true) + { + publisher.publish(session1.createTextMessage("hello")); + Thread.sleep(THREADS); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + }); + threads[j].run(); + } + + threads[0].join(); + + } + + private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception { AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); @@ -368,8 +427,160 @@ public class TopicSessionTest extends TestCase con2.close(); } + + public void noTestPublishToManyConsumers() throws Exception + { + + + final ExceptionListener exceptionListener = new ExceptionListener() + { + public void onException(JMSException jmsException) + { + jmsException.printStackTrace(); + } + }; + + + + SubscribingThread[] threads = new SubscribingThread[100]; + + final String topicName = "MyTopic1" + UUID.randomUUID(); + for(int j = 0; j < 21; j++) + { + final int threadId = j; + threads[threadId] = new SubscribingThread(threadId, topicName, exceptionListener); + threads[j].start(); + Thread.sleep(100); + } + + + threads[1].join(); + + int totalMessages = 0; + + for(int j = 1; j < 21; j++) + { + + System.err.println("Thread " + j + ": " + threads[j].msgId); + totalMessages += threads[j].msgId; + } + + System.err.println("****** Total: " + totalMessages); + + + } + + + + public static junit.framework.Test suite() { return new junit.framework.TestSuite(TopicSessionTest.class); } + + private static class SubscribingThread extends Thread + { + private final int _threadId; + private final String _topicName; + private final ExceptionListener _exceptionListener; + int msgId = 0; + + public SubscribingThread(final int threadId, final String topicName, final ExceptionListener exceptionListener) + { + _threadId = threadId; + _topicName = topicName; + _exceptionListener = exceptionListener; + } + + public void run() + { + try + { + System.err.println("Thread: " + _threadId); + + + if(_threadId >0) + { + + AMQConnection con2 = new AMQConnection("tcp://127.0.0.1:5672?retries='0'", "guest", "guest", "test", "test"); + //AMQConnection con2 = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test"); + AMQTopic topic2 = new AMQTopic(con2, _topicName); + TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSubscriber sub = session2.createSubscriber(topic2); + con2.setExceptionListener(_exceptionListener); + + + + final MessageListener messageListener = new MessageListener() + { + + public void onMessage(Message message) + { + try + { + msgId = message.getIntProperty("MessageId"); + if(msgId % 1000 == 0) + { + System.err.println("Thread: " + _threadId + ": " + msgId + "messages"); + } + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + }; + + + sub.setMessageListener(messageListener); + con2.start(); + + Thread.sleep(125000); + + +// Thread.sleep(1200000); + } + else + { + int messageId = 0; + + AMQConnection con = new AMQConnection("tcp://127.0.0.1:5672?retries='0'", "guest", "guest", "test", "test"); + //AMQConnection con = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test"); + + + AMQTopic topic = new AMQTopic(con, _topicName); + + + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + + con.setExceptionListener(_exceptionListener); + + TopicPublisher publisher = session1.createPublisher(topic); + publisher.setDisableMessageID(true); + publisher.setDisableMessageTimestamp(true); + con.start(); + + Thread.sleep(5000); + + publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + while(messageId <= 240000) + //while(messageId <= 10000) + { + final TextMessage textMessage = session1.createTextMessage("hello"); + textMessage.setIntProperty("MessageId", messageId++); + + + publisher.publish(textMessage); + + } + } + + } + catch(Exception e) + { + e.printStackTrace(); + } + } + } } -- cgit v1.2.1