summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-25 22:58:57 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-25 22:58:57 +0000
commitafcf8099695253651c73910a243fb29aa520b008 (patch)
treee514bc51797181c567500a8ddbfc20ea9b89b908 /qpid/java/systests/src
parentf315ac548e346ded9ed1d081db4118e703c362b4 (diff)
downloadqpid-python-afcf8099695253651c73910a243fb29aa520b008.tar.gz
Merged from java-broker-0-10 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@829675 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java60
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java165
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java12
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java14
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java30
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java2
11 files changed, 196 insertions, 110 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 784943b404..b2fdf48267 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -113,12 +113,12 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase
for (int loops = 0; (msg < MSG_COUNT) || (loops < MAX_LOOPS); loops++)
{
- if (_consumer1.receive(100) != null)
+ if (_consumer1.receive(1000) != null)
{
msg++;
}
- if (_consumer2.receive(100) != null)
+ if (_consumer2.receive(1000) != null)
{
msg++;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
index ba05dc6b3e..266cb42ad7 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
@@ -215,7 +215,7 @@ public class BindingLoggingTest extends AbstractTestLogging
List<String> results = _monitor.findMatches(BND_PREFIX);
// We will have two binds as we bind all queues to the default exchange
- assertEquals("Result set larger than expected.", 4, results.size());
+ assertEquals("Result not as expected." + results, 4, results.size());
String messageID = "BND-1001";
@@ -241,7 +241,7 @@ public class BindingLoggingTest extends AbstractTestLogging
String subject = fromSubject(log);
- assertTrue("Routing Key does not start with TempQueue:"+AbstractTestLogSubject.getSlice("rk", subject),
+ assertTrue("Routing Key does not start with TempQueue:"+AbstractTestLogSubject.getSlice("rk", subject),
AbstractTestLogSubject.getSlice("rk", subject).startsWith("TempQueue"));
assertEquals("Virtualhost not correct.", "/test",
AbstractTestLogSubject.getSlice("vh", subject));
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 509c027cbf..fe25bf07f0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,6 +34,7 @@ public class SubscriptionTestHelper implements Subscription
private final List<QueueEntry> messages;
private final Object key;
private boolean isSuspended;
+ private AMQQueue.Context _queueContext;
public SubscriptionTestHelper(Object key)
{
@@ -59,6 +60,11 @@ public class SubscriptionTestHelper implements Subscription
public void setQueue(AMQQueue queue, boolean exclusive)
{
+
+ }
+
+ public void setNoLocal(boolean noLocal)
+ {
}
@@ -102,24 +108,34 @@ public class SubscriptionTestHelper implements Subscription
//To change body of implemented methods use File | Settings | File Templates.
}
- public void restoreCredit(final QueueEntry queueEntry)
+ public void onDequeue(final QueueEntry queueEntry)
{
}
+ public void restoreCredit(QueueEntry queueEntry)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void setStateListener(final StateListener listener)
{
//To change body of implemented methods use File | Settings | File Templates.
}
-
+
public State getState()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueContext;
+ }
+
+ public void setQueueContext(AMQQueue.Context queueContext)
+ {
+ _queueContext = queueContext;
}
public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
@@ -131,7 +147,7 @@ public class SubscriptionTestHelper implements Subscription
{
return null;
}
-
+
public void start()
{
//no-op
@@ -152,6 +168,21 @@ public class SubscriptionTestHelper implements Subscription
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void confirmAutoClose()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void set(String key, Object value)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object get(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public LogActor getLogActor()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -167,6 +198,11 @@ public class SubscriptionTestHelper implements Subscription
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public QueueEntry.SubscriptionAssignedState getAssignedState()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void queueDeleted(AMQQueue queue)
{
}
@@ -216,6 +252,16 @@ public class SubscriptionTestHelper implements Subscription
return false;
}
+ public boolean acquires()
+ {
+ return true;
+ }
+
+ public boolean seesRequeues()
+ {
+ return true;
+ }
+
public boolean isBrowser()
{
return false;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
index dde235f73e..9487f72ac8 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
@@ -873,7 +873,12 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
{
Throwable cause = e.getLinkedException();
- if (!(cause instanceof AMQAuthenticationException))
+ if (cause == null)
+ {
+ e.printStackTrace(System.out);
+ fail("JMS Exception did not have cause");
+ }
+ else if (!(cause instanceof AMQAuthenticationException))
{
cause.printStackTrace(System.out);
assertEquals("Incorrect exception", IllegalStateException.class, cause.getClass());
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 7d8c81f4d5..b41aa661ea 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -25,15 +25,14 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.logging.LogSubject;
import java.util.HashMap;
import java.util.Iterator;
+import java.nio.ByteBuffer;
public class SlowMessageStore implements MessageStore
{
@@ -47,14 +46,21 @@ public class SlowMessageStore implements MessageStore
private static final String POST = "post";
private String DEFAULT_DELAY = "default";
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ // ***** MessageStore Interface.
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
{
- _logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName());
- Configuration delays = config.getStoreConfiguration().subset(DELAYS);
+ //To change body of implemented methods use File | Settings | File Templates.
+
+ _logger.info("Starting SlowMessageStore on Virtualhost:" + name);
+ Configuration delays = config.subset(DELAYS);
configureDelays(delays);
- String messageStoreClass = config.getStoreConfiguration().getString("realStore");
+ String messageStoreClass = config.getString("realStore");
if (delays.containsKey(DEFAULT_DELAY))
{
@@ -73,11 +79,11 @@ public class SlowMessageStore implements MessageStore
" does not.");
}
_realStore = (MessageStore) o;
- _realStore.configure(virtualHost, base + ".store", config);
+ _realStore.configureConfigStore(name, recoveryHandler, config, logSubject);
}
else
{
- _realStore.configure(virtualHost, base + ".store", config);
+ _realStore.configureConfigStore(name, recoveryHandler, config, logSubject);
}
}
@@ -133,7 +139,7 @@ public class SlowMessageStore implements MessageStore
}
long slept = (System.nanoTime() - start) / 1000000;
-
+
if (slept >= delay)
{
_logger.info("Done sleep for:" + slept+":"+delay);
@@ -146,7 +152,14 @@ public class SlowMessageStore implements MessageStore
}
}
- // ***** MessageStore Interface.
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
+ {
+ _realStore.configureMessageStore(name, recoveryHandler, config, logSubject);
+ }
public void close() throws Exception
{
@@ -155,13 +168,12 @@ public class SlowMessageStore implements MessageStore
doPostDelay("close");
}
- public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+ public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
{
- doPreDelay("removeMessage");
- _realStore.removeMessage(storeContext, messageId);
- doPostDelay("removeMessage");
+ return _realStore.addMessage(metaData);
}
+
public void createExchange(Exchange exchange) throws AMQException
{
doPreDelay("createExchange");
@@ -209,90 +221,93 @@ public class SlowMessageStore implements MessageStore
doPostDelay("removeQueue");
}
- public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
- {
- doPreDelay("enqueueMessage");
- _realStore.enqueueMessage(context, queue, messageId);
- doPostDelay("enqueueMessage");
- }
-
- public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration, LogSubject logSubject)
+ throws Exception
{
- doPreDelay("dequeueMessage");
- _realStore.dequeueMessage(context, queue, messageId);
- doPostDelay("dequeueMessage");
+ _realStore.configureTransactionLog(name, recoveryHandler, storeConfiguration, logSubject);
}
- public void beginTran(StoreContext context) throws AMQException
+ public Transaction newTransaction()
{
doPreDelay("beginTran");
- _realStore.beginTran(context);
+ Transaction txn = new SlowTransaction(_realStore.newTransaction());
doPostDelay("beginTran");
+ return txn;
}
- public void commitTran(StoreContext context) throws AMQException
- {
- doPreDelay("commitTran");
- _realStore.commitTran(context);
- doPostDelay("commitTran");
- }
- public void abortTran(StoreContext context) throws AMQException
+ public boolean isPersistent()
{
- doPreDelay("abortTran");
- _realStore.abortTran(context);
- doPostDelay("abortTran");
+ return _realStore.isPersistent();
}
- public boolean inTran(StoreContext context)
+ public void storeMessageHeader(Long messageNumber, ServerMessage message)
{
- doPreDelay("inTran");
- boolean b = _realStore.inTran(context);
- doPostDelay("inTran");
- return b;
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public Long getNewMessageId()
+ public void storeContent(Long messageNumber, long offset, ByteBuffer body)
{
- doPreDelay("getNewMessageId");
- Long l = _realStore.getNewMessageId();
- doPostDelay("getNewMessageId");
- return l;
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ public ServerMessage getMessage(Long messageNumber)
{
- doPreDelay("storeContentBodyChunk");
- _realStore.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
- doPostDelay("storeContentBodyChunk");
+ return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ private class SlowTransaction implements Transaction
{
- doPreDelay("storeMessageMetaData");
- _realStore.storeMessageMetaData(context, messageId, messageMetaData);
- doPostDelay("storeMessageMetaData");
- }
+ private final Transaction _underlying;
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
- {
- doPreDelay("getMessageMetaData");
- MessageMetaData mmd = _realStore.getMessageMetaData(context, messageId);
- doPostDelay("getMessageMetaData");
- return mmd;
- }
+ private SlowTransaction(Transaction underlying)
+ {
+ _underlying = underlying;
+ }
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
- {
- doPreDelay("getContentBodyChunk");
- ContentChunk c = _realStore.getContentBodyChunk(context, messageId, index);
- doPostDelay("getContentBodyChunk");
- return c;
- }
+ public void enqueueMessage(TransactionLogResource queue, Long messageId)
+ throws AMQException
+ {
+ doPreDelay("enqueueMessage");
+ _underlying.enqueueMessage(queue, messageId);
+ doPostDelay("enqueueMessage");
+ }
- public boolean isPersistent()
- {
- return _realStore.isPersistent();
+ public void dequeueMessage(TransactionLogResource queue, Long messageId)
+ throws AMQException
+ {
+ doPreDelay("dequeueMessage");
+ _underlying.dequeueMessage(queue, messageId);
+ doPostDelay("dequeueMessage");
+ }
+
+ public void commitTran()
+ throws AMQException
+ {
+ doPreDelay("commitTran");
+ _underlying.commitTran();
+ doPostDelay("commitTran");
+ }
+
+ public StoreFuture commitTranAsync()
+ throws AMQException
+ {
+ doPreDelay("commitTran");
+ StoreFuture future = _underlying.commitTranAsync();
+ doPostDelay("commitTran");
+ return future;
+ }
+
+ public void abortTran()
+ throws AMQException
+ {
+ doPreDelay("abortTran");
+ _underlying.abortTran();
+ doPostDelay("abortTran");
+ }
}
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
index 25b9b0ba14..5ea203bda3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
@@ -81,9 +81,14 @@ public class CancelTest extends QpidTestCase
assertTrue(e.hasMoreElements());
+ int i = 0;
while (e.hasMoreElements())
{
e.nextElement();
+ if(++i > 1)
+ {
+ fail("Two many elemnts to browse!");
+ }
}
browser.close();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
index d1bcaa1bb8..eb0c539a6e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
@@ -79,7 +79,7 @@ public class DupsOkTest extends QpidTestCase
* This test sends x messages and receives them with an async consumer.
* Waits for all messages to be received or for 60 s
* and checks whether the queue is empty.
- *
+ *
* @throws Exception
*/
public void testDupsOK() throws Exception
@@ -93,7 +93,7 @@ public class DupsOkTest extends QpidTestCase
assertEquals("The queue should have msgs at start", MSG_COUNT, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue));
- clientConnection.start();
+ clientConnection.start();
consumer.setMessageListener(new MessageListener()
{
@@ -110,7 +110,7 @@ public class DupsOkTest extends QpidTestCase
if (message instanceof TextMessage)
{
try
- {
+ {
if (message.getIntProperty("count") == MSG_COUNT)
{
try
@@ -156,7 +156,11 @@ public class DupsOkTest extends QpidTestCase
// before the dispatcher has sent the ack back to the broker.
consumer.close();
- assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue));
+ clientSession.close();
+
+ final Session clientSession2 = clientConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) clientSession2).getQueueDepth((AMQDestination) _queue));
clientConnection.close();
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
index e9aed4de01..9558f23b89 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
@@ -79,16 +79,22 @@ public class LargeMessageTest extends QpidTestCase
}
// Test boundary of 1 packet to 2 packets
- public void test64kminus1()
+ public void test64kminus9()
{
- checkLargeMessage((64 * 1024) - 1);
+ checkLargeMessage((64 * 1024) - 9);
}
- public void test64k()
+ public void test64kminus8()
{
- checkLargeMessage(64 * 1024);
+ checkLargeMessage((64 * 1024)-8);
}
+ public void test64kminus7()
+ {
+ checkLargeMessage((64 * 1024)-7);
+ }
+
+
public void test64kplus1()
{
checkLargeMessage((64 * 1024) + 1);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index ec23256f38..f1cac22f08 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -50,7 +50,8 @@ public class MessageRequeueTest extends QpidTestCase
protected final String queue = "direct://amq.direct//message-requeue-test-queue";
protected String payload = "Message:";
- protected final String BROKER = "vm://:1";
+ //protected final String BROKER = "vm://:1";
+ protected final String BROKER = "tcp://127.0.0.1:5672";
private boolean testReception = true;
private long[] receieved = new long[numTestMessages + 1];
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index f8ba7060a9..742e2ac518 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -311,11 +311,13 @@ public class TopicSessionTest extends QpidTestCase
AMQTopic topic = new AMQTopic(con, "testNoLocal");
- TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicSubscriber noLocal = session1.createSubscriber(topic, "", true);
+
TopicSubscriber select = session1.createSubscriber(topic, "Selector = 'select'", false);
TopicSubscriber normal = session1.createSubscriber(topic);
+
TopicPublisher publisher = session1.createPublisher(topic);
con.start();
@@ -329,12 +331,12 @@ public class TopicSessionTest extends QpidTestCase
m = (TextMessage) normal.receive(1000);
assertNotNull(m);
session1.commit();
-
+
//test selector subscriber doesn't message
m = (TextMessage) select.receive(1000);
assertNull(m);
session1.commit();
-
+
//test nolocal subscriber doesn't message
m = (TextMessage) noLocal.receive(1000);
if (m != null)
@@ -349,12 +351,12 @@ public class TopicSessionTest extends QpidTestCase
publisher.publish(message);
session1.commit();
-
+
//test normal subscriber gets message
m = (TextMessage) normal.receive(1000);
assertNotNull(m);
session1.commit();
-
+
//test selector subscriber does get message
m = (TextMessage) select.receive(1000);
assertNotNull(m);
@@ -365,7 +367,7 @@ public class TopicSessionTest extends QpidTestCase
assertNull(m);
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "foo");
- TopicSession session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session2 = con2.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicPublisher publisher2 = session2.createPublisher(topic);
@@ -386,18 +388,18 @@ public class TopicSessionTest extends QpidTestCase
session1.commit();
//test nolocal subscriber does message
- m = (TextMessage) noLocal.receive(100);
+ m = (TextMessage) noLocal.receive(1000);
assertNotNull(m);
con.close();
con2.close();
}
-
+
/**
* This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
* due to a selector can be leaked.
- * @throws Exception
+ * @throws Exception
*/
public void testNonMatchingMessagesDoNotFillQueue() throws Exception
{
@@ -420,27 +422,27 @@ public class TopicSessionTest extends QpidTestCase
message = session.createTextMessage("non-matching 1");
publisher.publish(message);
session.commit();
-
+
// Send and consume matching message
message = session.createTextMessage("hello");
message.setStringProperty("Selector", "select");
publisher.publish(message);
session.commit();
-
+
m = (TextMessage) selector.receive(1000);
assertNotNull("should have received message", m);
assertEquals("Message contents were wrong", "hello", m.getText());
-
+
// Send non-matching message
message = session.createTextMessage("non-matching 2");
publisher.publish(message);
session.commit();
-
+
// Assert queue count is 0
long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
assertEquals("Queue depth was wrong", 0, depth);
-
+
}
public static junit.framework.Test suite()
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 0426c4f45f..b1d14721bd 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -23,6 +23,8 @@ package org.apache.qpid.test.utils;
import org.apache.qpid.util.FileUtils;
import javax.naming.NamingException;
+import javax.jms.JMSException;
+import javax.naming.NamingException;
import org.apache.qpid.client.AMQConnectionFactory;
import org.slf4j.Logger;