summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-05-09 08:48:18 +0000
committerGordon Sim <gsim@apache.org>2007-05-09 08:48:18 +0000
commit41d3e70cdf4c93aba0589ed7aec61499a86c6119 (patch)
treed4b52849b875b16590241747b899bf3cc1bc86cb /java/systests/src
parent04186f293032c5720f9500eb2a07433572f13d24 (diff)
downloadqpid-python-41d3e70cdf4c93aba0589ed7aec61499a86c6119.tar.gz
Patch from Arnaud Simon (asimon@redhat.com) to fix tests broken by earlier changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536458 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java230
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java75
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java10
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java23
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java57
6 files changed, 363 insertions, 34 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index e69de29bb2..bca5636440 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -0,0 +1,230 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.messageStore.MemoryMessageStore;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+
+import java.util.*;
+
+public class TxAckTest extends TestCase
+{
+ private Scenario individual;
+ private Scenario multiple;
+ private Scenario combined;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ //ack only 5th msg
+ individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
+ individual.update(5, false);
+
+ //ack all up to and including 5th msg
+ multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
+ multiple.update(5, true);
+
+ //leave only 8th and 9th unacked
+ combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
+ combined.update(3, false);
+ combined.update(5, true);
+ combined.update(7, true);
+ combined.update(2, true);//should be ignored
+ combined.update(1, false);//should be ignored
+ combined.update(10, false);
+ }
+
+ public void testPrepare() throws AMQException
+ {
+ individual.prepare();
+ multiple.prepare();
+ combined.prepare();
+ }
+
+ public void testUndoPrepare() throws AMQException
+ {
+ individual.undoPrepare();
+ multiple.undoPrepare();
+ combined.undoPrepare();
+ }
+
+ public void testCommit() throws AMQException
+ {
+ individual.commit();
+ multiple.commit();
+ combined.commit();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TxAckTest.class);
+ }
+
+ private class Scenario
+ {
+ private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000);
+ private final TxAck _op = new TxAck(_map);
+ private final List<Long> _acked;
+ private final List<Long> _unacked;
+ private StoreContext _storeContext = new StoreContext();
+
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+ {
+ TransactionalContext txnContext = new NonTransactionalContext(new MemoryMessageStore(),
+ _storeContext, null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
+ for (int i = 0; i < messageCount; i++)
+ {
+ long deliveryTag = i + 1;
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
+ _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
+ }
+ _acked = acked;
+ _unacked = unacked;
+ }
+
+ void update(long deliverytag, boolean multiple)
+ {
+ _op.update(deliverytag, multiple);
+ }
+
+ private void assertCount(List<Long> tags, int expected)
+ {
+ for (long tag : tags)
+ {
+ UnacknowledgedMessage u = _map.get(tag);
+ assertTrue("Message not found for tag " + tag, u != null);
+ ((TestMessage) u.message).assertCountEquals(expected);
+ }
+ }
+
+ void prepare() throws AMQException
+ {
+ _op.consolidate();
+ _op.prepare(_storeContext);
+
+ assertCount(_acked, -1);
+ assertCount(_unacked, 0);
+
+ }
+
+ void undoPrepare()
+ {
+ _op.consolidate();
+ _op.undoPrepare();
+
+ assertCount(_acked, 0);
+ assertCount(_unacked, 0);
+ }
+
+ void commit()
+ {
+ _op.consolidate();
+ _op.commit(_storeContext);
+
+ //check acked messages are removed from map
+ Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags());
+ keys.retainAll(_acked);
+ assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
+ //check unacked messages are still in map
+ keys = new HashSet<Long>(_unacked);
+ keys.removeAll(_map.getDeliveryTags());
+ assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
+ }
+ }
+
+ private class TestMessage extends AMQMessage
+ {
+ private final long _tag;
+ private int _count;
+
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
+ {
+ super(messageId, publishBody, txnContext);
+ try
+ {
+ setContentHeaderBody(new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ // won't happen
+ }
+ _tag = tag;
+ }
+
+ public void incrementReference()
+ {
+ _count++;
+ }
+
+ public void decrementReference(StoreContext context)
+ {
+ _count--;
+ }
+
+ void assertCountEquals(int expected)
+ {
+ assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ }
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java b/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java
index e69de29bb2..503bb28775 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.TestCase;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.management.JMException;
+
+/** Test class to test MBean operations for AMQMinaProtocolSession. */
+public class MaxChannelsTest extends TestCase
+{
+// private MessageStore _messageStore = new SkeletonMessageStore();
+
+ public void testChannels() throws Exception
+ {
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ AMQMinaProtocolSession _protocolSession = new AMQMinaProtocolSession(new MockIoSession(),
+ appRegistry.getVirtualHostRegistry(),
+ new AMQCodecFactory(true),
+ null);
+ _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
+
+ // check the channel count is correct
+ int channelCount = _protocolSession.getChannels().size();
+ assertEquals("Initial channel count wrong", 0, channelCount);
+
+ long maxChannels = 10L;
+ _protocolSession.setMaximumNumberOfChannels(maxChannels);
+ assertEquals("Number of channels not correctly set.", new Long(maxChannels), _protocolSession.getMaximumNumberOfChannels());
+
+
+ try
+ {
+ for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
+ {
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null, null));
+ }
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Wrong exception recevied.", e.getErrorCode(), AMQConstant.NOT_ALLOWED);
+ }
+ assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_protocolSession.getChannels().size()));
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
index ca352b2fd7..27ac3474b9 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -3,7 +3,6 @@ package org.apache.qpid.server.exchange;
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
import org.apache.qpid.client.transport.TransportConnection;
@@ -269,7 +268,14 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
public void onException(JMSException jmsException)
{
- Exception linkedException = jmsException.getLinkedException();
+ Exception linkedException = null;
+ try
+ {
+ linkedException = jmsException.getLinkedException();
+ } catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
if (linkedException instanceof AMQNoRouteException)
{
AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
index c6cbac0ba8..0da18ea87f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
@@ -8,7 +8,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.log4j.Logger;
import javax.jms.JMSException;
-import javax.jms.DeliveryMode;
import java.io.IOException;
@@ -17,7 +16,7 @@ public class HeapExhaustion extends TestCase
{
private static final Logger _logger = Logger.getLogger(HeapExhaustion.class);
- protected QpidClientConnection conn;
+ protected QpidClientConnection conn;
protected final String BROKER = "localhost";
protected final String vhost = "/test";
protected final String queue = "direct://amq.direct//queue";
@@ -35,7 +34,13 @@ public class HeapExhaustion extends TestCase
conn = new QpidClientConnection(BROKER);
conn.setVirtualHost(vhost);
- conn.connect();
+ try
+ {
+ conn.connect();
+ } catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
// clear queue
_logger.debug("setup: clearing test queue");
conn.consume(queue, 2000);
@@ -55,7 +60,7 @@ public class HeapExhaustion extends TestCase
*
* @throws Exception on error
*/
- public void testUntilFailureTransient() throws Exception
+ public void testUntilFailure() throws Exception
{
int copies = 0;
int total = 0;
@@ -63,7 +68,7 @@ public class HeapExhaustion extends TestCase
int size = payload.getBytes().length;
while (true)
{
- conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
+ conn.put(queue, payload, 1);
copies++;
total += size;
System.out.println("put copy " + copies + " OK for total bytes: " + total);
@@ -75,7 +80,7 @@ public class HeapExhaustion extends TestCase
*
* @throws Exception on error
*/
- public void testUntilFailureWithDelaysTransient() throws Exception
+ public void testUntilFailureWithDelays() throws Exception
{
int copies = 0;
int total = 0;
@@ -83,7 +88,7 @@ public class HeapExhaustion extends TestCase
int size = payload.getBytes().length;
while (true)
{
- conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
+ conn.put(queue, payload, 1);
copies++;
total += size;
System.out.println("put copy " + copies + " OK for total bytes: " + total);
@@ -110,7 +115,7 @@ public class HeapExhaustion extends TestCase
_logger.info("Running testUntilFailure");
try
{
- he.testUntilFailureTransient();
+ he.testUntilFailure();
}
catch (FailoverException fe)
{
@@ -159,7 +164,7 @@ public class HeapExhaustion extends TestCase
_logger.info("Running testUntilFailure");
try
{
- he.testUntilFailureWithDelaysTransient();
+ he.testUntilFailureWithDelays();
}
catch (FailoverException fe)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
index 068f37574d..6a70fea711 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
@@ -38,7 +38,7 @@ public class ConcurrencyTestDisabled extends MessageTestHelper
{
private final Random random = new Random();
- private final int numMessages = 1000;
+ private final int numMessages = 10;
private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
private final Set<Subscription> _active = new HashSet<Subscription>();
diff --git a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
index d3f064293e..6f955ebdab 100644
--- a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
+++ b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -16,7 +16,6 @@ import javax.jms.MessageProducer;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
-import javax.jms.DeliveryMode;
public class QpidClientConnection implements ExceptionListener
{
@@ -42,7 +41,9 @@ public class QpidClientConnection implements ExceptionListener
}
- public void connect() throws JMSException
+ public void connect()
+ throws
+ JMSException
{
if (!connected)
{
@@ -77,7 +78,9 @@ public class QpidClientConnection implements ExceptionListener
}
}
- public void disconnect() throws JMSException
+ public void disconnect()
+ throws
+ JMSException
{
if (connected)
{
@@ -89,7 +92,9 @@ public class QpidClientConnection implements ExceptionListener
}
}
- public void disconnectWithoutCommit() throws JMSException
+ public void disconnectWithoutCommit()
+ throws
+ JMSException
{
if (connected)
{
@@ -126,7 +131,9 @@ public class QpidClientConnection implements ExceptionListener
}
- /** override as necessary */
+ /**
+ * override as necessary
+ */
public void onException(JMSException exception)
{
_logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
@@ -148,10 +155,11 @@ public class QpidClientConnection implements ExceptionListener
* @param queueName The queue name to put to
* @param payload the content of the payload
* @param copies the number of messages to put
- *
* @throws javax.jms.JMSException any exception that occurs
*/
- public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException
+ public void put(String queueName, String payload, int copies)
+ throws
+ JMSException
{
if (!connected)
{
@@ -163,8 +171,6 @@ public class QpidClientConnection implements ExceptionListener
final MessageProducer sender = session.createProducer(queue);
- sender.setDeliveryMode(deliveryMode);
-
for (int i = 0; i < copies; i++)
{
Message m = session.createTextMessage(payload + i);
@@ -172,7 +178,13 @@ public class QpidClientConnection implements ExceptionListener
sender.send(m);
}
- session.commit();
+ try
+ {
+ session.commit();
+ } catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
sender.close();
_logger.info("put " + copies + " copies");
}
@@ -182,12 +194,12 @@ public class QpidClientConnection implements ExceptionListener
*
* @param queueName The quename to get from
* @param readTimeout The timeout to use
- *
* @return the content of the text message if any
- *
* @throws javax.jms.JMSException any exception that occured
*/
- public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ public Message getNextMessage(String queueName, long readTimeout)
+ throws
+ JMSException
{
if (!connected)
{
@@ -208,12 +220,10 @@ public class QpidClientConnection implements ExceptionListener
if (message instanceof TextMessage)
{
result = ((TextMessage) message);
- }
- else if (null == message)
+ } else if (null == message)
{
result = null;
- }
- else
+ } else
{
_logger.info("warning: received non-text message");
result = message;
@@ -226,12 +236,12 @@ public class QpidClientConnection implements ExceptionListener
* GET the top message on a queue. Consumes the message.
*
* @param queueName The Queuename to get from
- *
* @return The string content of the text message, if any received
- *
* @throws javax.jms.JMSException any exception that occurs
*/
- public Message getNextMessage(String queueName) throws JMSException
+ public Message getNextMessage(String queueName)
+ throws
+ JMSException
{
return getNextMessage(queueName, 0);
}
@@ -241,11 +251,13 @@ public class QpidClientConnection implements ExceptionListener
*
* @param queueName The Queue name to consume from
* @param readTimeout The timeout for each consume
- *
* @throws javax.jms.JMSException Any exception that occurs during the consume
* @throws InterruptedException If the consume thread was interrupted during a consume.
*/
- public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ public void consume(String queueName, int readTimeout)
+ throws
+ JMSException,
+ InterruptedException
{
if (!connected)
{
@@ -266,6 +278,7 @@ public class QpidClientConnection implements ExceptionListener
session.commit();
consumer.close();
+
_logger.info("consumed: " + messagesReceived);
}
}