summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java70
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java72
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java81
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java28
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java118
5 files changed, 325 insertions, 44 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
new file mode 100644
index 0000000000..9e48914431
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerImmediatePrefetch extends MessageListenerMultiConsumerTest
+{
+
+
+ protected void setUp() throws Exception
+ {
+
+ System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
+ super.setUp();
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerMultiConsumerImmediatePrefetch.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 794fd5c8c1..c9407d8ff6 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -62,7 +62,8 @@ public class MessageListenerMultiConsumerTest extends TestCase
private Connection _clientConnection;
private MessageConsumer _consumer1;
private MessageConsumer _consumer2;
-
+ private Session _clientSession1;
+ private Queue _queue;
private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
@@ -76,25 +77,25 @@ public class MessageListenerMultiConsumerTest extends TestCase
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection", "amqp://guest:guest@MLT_ID/test?brokerlist='vm://:1'");
- env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+ env.put("queue.queue", "direct://amq.direct//"+this.getClass().getName());
_context = factory.getInitialContext(env);
- Queue queue = (Queue) _context.lookup("queue");
+ _queue = (Queue) _context.lookup("queue");
//Create Client 1
_clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
_clientConnection.start();
- Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer1 = clientSession1.createConsumer(queue);
+ _consumer1 = _clientSession1.createConsumer(_queue);
//Create Client 2
Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer2 = clientSession2.createConsumer(queue);
+ _consumer2 = clientSession2.createConsumer(_queue);
//Create Producer
Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
@@ -104,7 +105,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ MessageProducer producer = producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
@@ -123,20 +124,6 @@ public class MessageListenerMultiConsumerTest extends TestCase
TransportConnection.killAllVMBrokers();
}
-// public void testRecieveC1thenC2() throws Exception
-// {
-//
-// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-// {
-//
-// assertTrue(_consumer1.receive() != null);
-// }
-//
-// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-// {
-// assertTrue(_consumer2.receive() != null);
-// }
-// }
public void testRecieveInterleaved() throws Exception
{
@@ -206,10 +193,12 @@ public class MessageListenerMultiConsumerTest extends TestCase
assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
}
- public void testRecieveC2Only_OnlyRunWith_REGISTER_CONSUMERS_FLOWED() throws Exception
+ public void testRecieveC2Only() throws Exception
{
- if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ if (!Boolean.parseBoolean(System.getProperties().
+ getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
{
+ _logger.info("Performing Receive only on C2");
for (int msg = 0; msg < MSG_COUNT; msg++)
{
assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg,
@@ -218,6 +207,43 @@ public class MessageListenerMultiConsumerTest extends TestCase
}
}
+ public void testRecieveBoth() throws Exception
+ {
+ if (!Boolean.parseBoolean(System.getProperties().
+ getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
+ {
+ _logger.info("Performing Receive only with two consumers on one session ");
+
+ MessageConsumer consumer2 = _clientSession1.createConsumer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(consumer2.receive() != null);
+ }
+ }
+ else
+ {
+ _logger.info("Performing Receive only on both C1 and C2");
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(_consumer2.receive() != null);
+ }
+ }
+ }
+
public static junit.framework.Test suite()
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java
new file mode 100644
index 0000000000..505af361bc
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.basic.close;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.log4j.Logger;
+
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+
+public class CloseTests extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(CloseTests.class);
+
+
+ private static final String BROKER = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.setUp();
+
+ TransportConnection.killVMBroker(1);
+ }
+
+
+ public void testCloseQueueReceiver() throws AMQException, URLSyntaxException, JMSException
+ {
+ AMQConnection connection = new AMQConnection(BROKER, "guest", "guest", this.getName(), "test");
+
+ Session session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ AMQQueue queue = new AMQQueue(new AMQBindingURL("test-queue"));
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer_not_used_but_created_for_testing = session.createProducer(queue);
+
+ connection.start();
+
+ _logger.info("About to close consumer");
+
+ consumer.close();
+
+ _logger.info("Closed Consumer");
+
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 190b3861f0..15cb9678e4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -36,9 +36,11 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
public class DurableSubscriptionTest extends TestCase
{
+ private static final Logger _logger = Logger.getLogger(DurableSubscriptionTest.class);
protected void setUp() throws Exception
{
@@ -55,41 +57,59 @@ public class DurableSubscriptionTest extends TestCase
public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic");
+ AMQTopic topic = new AMQTopic(con, "MyTopic");
+ _logger.info("Create Session 1");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _logger.info("Create Consumer on Session 1");
MessageConsumer consumer1 = session1.createConsumer(topic);
+ _logger.info("Create Producer on Session 1");
MessageProducer producer = session1.createProducer(topic);
+ _logger.info("Create Session 2");
Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _logger.info("Create Durable Subscriber on Session 2");
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
+ _logger.info("Starting connection");
con.start();
+ _logger.info("Producer sending message A");
producer.send(session1.createTextMessage("A"));
Message msg;
+ _logger.info("Receive message on consumer 1:expecting A");
msg = consumer1.receive();
assertEquals("A", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(1000);
assertEquals(null, msg);
+
+ _logger.info("Receive message on consumer 1:expecting A");
msg = consumer2.receive();
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer2.receive(1000);
+ _logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
+ _logger.info("Unsubscribe session2/consumer2");
session2.unsubscribe("MySubscription");
+ _logger.info("Producer sending message B");
producer.send(session1.createTextMessage("B"));
+ _logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive();
assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(1000);
assertEquals(null, msg);
+ _logger.info("Receive message on consumer 2 :expecting null");
msg = consumer2.receive(1000);
assertEquals(null, msg);
+ _logger.info("Close connection");
con.close();
}
@@ -97,7 +117,7 @@ public class DurableSubscriptionTest extends TestCase
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic");
+ AMQTopic topic = new AMQTopic(con, "MyTopic");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
@@ -129,13 +149,17 @@ public class DurableSubscriptionTest extends TestCase
producer.send(session1.createTextMessage("B"));
+ _logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(100);
assertEquals(null, msg);
+ _logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(100);
assertEquals(null, msg);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index fe7efb4e88..a19687b07c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -38,11 +38,11 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
-/**
- * @author Apache Software Foundation
- */
+/** @author Apache Software Foundation */
public class TopicSessionTest extends TestCase
{
+ private static final String BROKER = "vm://:1";
+
protected void setUp() throws Exception
{
super.setUp();
@@ -53,17 +53,16 @@ public class TopicSessionTest extends TestCase
{
super.tearDown();
TransportConnection.killAllVMBrokers();
- //Thread.sleep(2000);
}
public void testTopicSubscriptionUnsubscription() throws Exception
{
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(),"MyTopic");
+ AMQConnection con = new AMQConnection(BROKER+"?retries='0'", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
- TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0");
+ TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(topic);
con.start();
@@ -81,11 +80,11 @@ public class TopicSessionTest extends TestCase
session1.unsubscribe("not a subscription");
fail("expected InvalidDestinationException when unsubscribing from unknown subscription");
}
- catch(InvalidDestinationException e)
+ catch (InvalidDestinationException e)
{
; // PASS
}
- catch(Exception e)
+ catch (Exception e)
{
fail("expected InvalidDestinationException when unsubscribing from unknown subscription, got: " + e);
}
@@ -106,8 +105,8 @@ public class TopicSessionTest extends TestCase
private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception
{
AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic1" + String.valueOf(shutdown));
- AMQTopic topic2 = new AMQTopic(con,"MyOtherTopic1" + String.valueOf(shutdown));
+ AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
+ AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
@@ -145,7 +144,7 @@ public class TopicSessionTest extends TestCase
public void testUnsubscriptionAfterConnectionClose() throws Exception
{
AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con1,"MyTopic3");
+ AMQTopic topic = new AMQTopic(con1, "MyTopic3");
TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
@@ -176,7 +175,7 @@ public class TopicSessionTest extends TestCase
{
AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic4");
+ AMQTopic topic = new AMQTopic(con, "MyTopic4");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
@@ -226,11 +225,11 @@ public class TopicSessionTest extends TestCase
producer.send(sentMessage);
TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
assertNotNull(receivedMessage);
- assertEquals(sentMessage.getText(),receivedMessage.getText());
+ assertEquals(sentMessage.getText(), receivedMessage.getText());
producer.send(sentMessage);
receivedMessage = (TextMessage) consumer.receive(2000);
assertNotNull(receivedMessage);
- assertEquals(sentMessage.getText(),receivedMessage.getText());
+ assertEquals(sentMessage.getText(), receivedMessage.getText());
conn.close();
@@ -248,14 +247,14 @@ public class TopicSessionTest extends TestCase
producer.send(session.createTextMessage("hello"));
TextMessage tm = (TextMessage) consumer.receive(2000);
assertNotNull(tm);
- assertEquals("hello",tm.getText());
+ assertEquals("hello", tm.getText());
try
{
topic.delete();
fail("Expected JMSException : should not be able to delete while there are active consumers");
}
- catch(JMSException je)
+ catch (JMSException je)
{
; //pass
}
@@ -266,7 +265,7 @@ public class TopicSessionTest extends TestCase
{
topic.delete();
}
- catch(JMSException je)
+ catch (JMSException je)
{
fail("Unexpected Exception: " + je.getMessage());
}
@@ -283,11 +282,92 @@ public class TopicSessionTest extends TestCase
}
-
conn.close();
}
+ public void testNoLocal() throws Exception
+ {
+
+ AMQConnection con = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test");
+
+ AMQTopic topic = new AMQTopic(con, "testNoLocal");
+
+ TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSubscriber noLocal = session1.createDurableSubscriber(topic, "noLocal", "", true);
+ TopicSubscriber select = session1.createDurableSubscriber(topic, "select", "Selector = 'select'", false);
+ TopicSubscriber normal = session1.createDurableSubscriber(topic, "normal");
+
+ TopicPublisher publisher = session1.createPublisher(topic);
+
+ con.start();
+ TextMessage m;
+ TextMessage message;
+
+ //send message to all consumers
+ publisher.publish(session1.createTextMessage("hello-new2"));
+
+ //test normal subscriber gets message
+ m = (TextMessage) normal.receive(1000);
+ assertNotNull(m);
+
+ //test selector subscriber doesn't message
+ m = (TextMessage) select.receive(1000);
+ assertNull(m);
+
+ //test nolocal subscriber doesn't message
+ m = (TextMessage) noLocal.receive(1000);
+ if (m != null)
+ {
+ System.out.println("Message:" + m.getText());
+ }
+ assertNull(m);
+
+ //send message to all consumers
+ message = session1.createTextMessage("hello2");
+ message.setStringProperty("Selector", "select");
+
+ publisher.publish(message);
+
+ //test normal subscriber gets message
+ m = (TextMessage) normal.receive(1000);
+ assertNotNull(m);
+
+ //test selector subscriber does get message
+ m = (TextMessage) select.receive(100);
+ assertNotNull(m);
+
+ //test nolocal subscriber doesn't message
+ m = (TextMessage) noLocal.receive(100);
+ assertNull(m);
+
+ AMQConnection con2 = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test2", "test");
+ TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicPublisher publisher2 = session2.createPublisher(topic);
+
+
+ message = session2.createTextMessage("hello2");
+ message.setStringProperty("Selector", "select");
+
+ publisher2.publish(message);
+
+ //test normal subscriber gets message
+ m = (TextMessage) normal.receive(1000);
+ assertNotNull(m);
+
+ //test selector subscriber does get message
+ m = (TextMessage) select.receive(100);
+ assertNotNull(m);
+
+ //test nolocal subscriber does message
+ m = (TextMessage) noLocal.receive(100);
+ assertNotNull(m);
+
+
+ con.close();
+ con2.close();
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(TopicSessionTest.class);