diff options
| author | Gordon Sim <gsim@apache.org> | 2007-05-09 08:48:18 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-05-09 08:48:18 +0000 |
| commit | 41d3e70cdf4c93aba0589ed7aec61499a86c6119 (patch) | |
| tree | d4b52849b875b16590241747b899bf3cc1bc86cb /java/systests/src | |
| parent | 04186f293032c5720f9500eb2a07433572f13d24 (diff) | |
| download | qpid-python-41d3e70cdf4c93aba0589ed7aec61499a86c6119.tar.gz | |
Patch from Arnaud Simon (asimon@redhat.com) to fix tests broken by earlier changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536458 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
6 files changed, 363 insertions, 34 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index e69de29bb2..bca5636440 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -0,0 +1,230 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.ack; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.messageStore.MemoryMessageStore; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; + +import java.util.*; + +public class TxAckTest extends TestCase +{ + private Scenario individual; + private Scenario multiple; + private Scenario combined; + + protected void setUp() throws Exception + { + super.setUp(); + + //ack only 5th msg + individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l)); + individual.update(5, false); + + //ack all up to and including 5th msg + multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l)); + multiple.update(5, true); + + //leave only 8th and 9th unacked + combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l)); + combined.update(3, false); + combined.update(5, true); + combined.update(7, true); + combined.update(2, true);//should be ignored + combined.update(1, false);//should be ignored + combined.update(10, false); + } + + public void testPrepare() throws AMQException + { + individual.prepare(); + multiple.prepare(); + combined.prepare(); + } + + public void testUndoPrepare() throws AMQException + { + individual.undoPrepare(); + multiple.undoPrepare(); + combined.undoPrepare(); + } + + public void testCommit() throws AMQException + { + individual.commit(); + multiple.commit(); + combined.commit(); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TxAckTest.class); + } + + private class Scenario + { + private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000); + private final TxAck _op = new TxAck(_map); + private final List<Long> _acked; + private final List<Long> _unacked; + private StoreContext _storeContext = new StoreContext(); + + Scenario(int messageCount, List<Long> acked, List<Long> unacked) + { + TransactionalContext txnContext = new NonTransactionalContext(new MemoryMessageStore(), + _storeContext, null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + for (int i = 0; i < messageCount; i++) + { + long deliveryTag = i + 1; + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); + _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag)); + } + _acked = acked; + _unacked = unacked; + } + + void update(long deliverytag, boolean multiple) + { + _op.update(deliverytag, multiple); + } + + private void assertCount(List<Long> tags, int expected) + { + for (long tag : tags) + { + UnacknowledgedMessage u = _map.get(tag); + assertTrue("Message not found for tag " + tag, u != null); + ((TestMessage) u.message).assertCountEquals(expected); + } + } + + void prepare() throws AMQException + { + _op.consolidate(); + _op.prepare(_storeContext); + + assertCount(_acked, -1); + assertCount(_unacked, 0); + + } + + void undoPrepare() + { + _op.consolidate(); + _op.undoPrepare(); + + assertCount(_acked, 0); + assertCount(_unacked, 0); + } + + void commit() + { + _op.consolidate(); + _op.commit(_storeContext); + + //check acked messages are removed from map + Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags()); + keys.retainAll(_acked); + assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty()); + //check unacked messages are still in map + keys = new HashSet<Long>(_unacked); + keys.removeAll(_map.getDeliveryTags()); + assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty()); + } + } + + private class TestMessage extends AMQMessage + { + private final long _tag; + private int _count; + + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext) + { + super(messageId, publishBody, txnContext); + try + { + setContentHeaderBody(new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); + } + catch (AMQException e) + { + // won't happen + } + _tag = tag; + } + + public void incrementReference() + { + _count++; + } + + public void decrementReference(StoreContext context) + { + _count--; + } + + void assertCountEquals(int expected) + { + assertEquals("Wrong count for message with tag " + _tag, expected, _count); + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java b/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java index e69de29bb2..503bb28775 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java @@ -0,0 +1,75 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.protocol; + +import junit.framework.TestCase; +import org.apache.mina.common.IoSession; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.framing.AMQShortString; + +import javax.management.JMException; + +/** Test class to test MBean operations for AMQMinaProtocolSession. */ +public class MaxChannelsTest extends TestCase +{ +// private MessageStore _messageStore = new SkeletonMessageStore(); + + public void testChannels() throws Exception + { + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + AMQMinaProtocolSession _protocolSession = new AMQMinaProtocolSession(new MockIoSession(), + appRegistry.getVirtualHostRegistry(), + new AMQCodecFactory(true), + null); + _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test")); + + // check the channel count is correct + int channelCount = _protocolSession.getChannels().size(); + assertEquals("Initial channel count wrong", 0, channelCount); + + long maxChannels = 10L; + _protocolSession.setMaximumNumberOfChannels(maxChannels); + assertEquals("Number of channels not correctly set.", new Long(maxChannels), _protocolSession.getMaximumNumberOfChannels()); + + + try + { + for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++) + { + _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null, null)); + } + } + catch (AMQException e) + { + assertEquals("Wrong exception recevied.", e.getErrorCode(), AMQConstant.NOT_ALLOWED); + } + assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_protocolSession.getChannels().size())); + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index ca352b2fd7..27ac3474b9 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -3,7 +3,6 @@ package org.apache.qpid.server.exchange; import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
import org.apache.qpid.client.transport.TransportConnection;
@@ -269,7 +268,14 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex public void onException(JMSException jmsException)
{
- Exception linkedException = jmsException.getLinkedException();
+ Exception linkedException = null;
+ try
+ {
+ linkedException = jmsException.getLinkedException();
+ } catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
if (linkedException instanceof AMQNoRouteException)
{
AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java index c6cbac0ba8..0da18ea87f 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java +++ b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java @@ -8,7 +8,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.log4j.Logger; import javax.jms.JMSException; -import javax.jms.DeliveryMode; import java.io.IOException; @@ -17,7 +16,7 @@ public class HeapExhaustion extends TestCase { private static final Logger _logger = Logger.getLogger(HeapExhaustion.class); - protected QpidClientConnection conn; + protected QpidClientConnection conn; protected final String BROKER = "localhost"; protected final String vhost = "/test"; protected final String queue = "direct://amq.direct//queue"; @@ -35,7 +34,13 @@ public class HeapExhaustion extends TestCase conn = new QpidClientConnection(BROKER); conn.setVirtualHost(vhost); - conn.connect(); + try + { + conn.connect(); + } catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } // clear queue _logger.debug("setup: clearing test queue"); conn.consume(queue, 2000); @@ -55,7 +60,7 @@ public class HeapExhaustion extends TestCase * * @throws Exception on error */ - public void testUntilFailureTransient() throws Exception + public void testUntilFailure() throws Exception { int copies = 0; int total = 0; @@ -63,7 +68,7 @@ public class HeapExhaustion extends TestCase int size = payload.getBytes().length; while (true) { - conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT); + conn.put(queue, payload, 1); copies++; total += size; System.out.println("put copy " + copies + " OK for total bytes: " + total); @@ -75,7 +80,7 @@ public class HeapExhaustion extends TestCase * * @throws Exception on error */ - public void testUntilFailureWithDelaysTransient() throws Exception + public void testUntilFailureWithDelays() throws Exception { int copies = 0; int total = 0; @@ -83,7 +88,7 @@ public class HeapExhaustion extends TestCase int size = payload.getBytes().length; while (true) { - conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT); + conn.put(queue, payload, 1); copies++; total += size; System.out.println("put copy " + copies + " OK for total bytes: " + total); @@ -110,7 +115,7 @@ public class HeapExhaustion extends TestCase _logger.info("Running testUntilFailure"); try { - he.testUntilFailureTransient(); + he.testUntilFailure(); } catch (FailoverException fe) { @@ -159,7 +164,7 @@ public class HeapExhaustion extends TestCase _logger.info("Running testUntilFailure"); try { - he.testUntilFailureWithDelaysTransient(); + he.testUntilFailureWithDelays(); } catch (FailoverException fe) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java index 068f37574d..6a70fea711 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java @@ -38,7 +38,7 @@ public class ConcurrencyTestDisabled extends MessageTestHelper { private final Random random = new Random(); - private final int numMessages = 1000; + private final int numMessages = 10; private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>(); private final Set<Subscription> _active = new HashSet<Subscription>(); diff --git a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java index d3f064293e..6f955ebdab 100644 --- a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -16,7 +16,6 @@ import javax.jms.MessageProducer; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.TextMessage; -import javax.jms.DeliveryMode; public class QpidClientConnection implements ExceptionListener { @@ -42,7 +41,9 @@ public class QpidClientConnection implements ExceptionListener } - public void connect() throws JMSException + public void connect() + throws + JMSException { if (!connected) { @@ -77,7 +78,9 @@ public class QpidClientConnection implements ExceptionListener } } - public void disconnect() throws JMSException + public void disconnect() + throws + JMSException { if (connected) { @@ -89,7 +92,9 @@ public class QpidClientConnection implements ExceptionListener } } - public void disconnectWithoutCommit() throws JMSException + public void disconnectWithoutCommit() + throws + JMSException { if (connected) { @@ -126,7 +131,9 @@ public class QpidClientConnection implements ExceptionListener } - /** override as necessary */ + /** + * override as necessary + */ public void onException(JMSException exception) { _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); @@ -148,10 +155,11 @@ public class QpidClientConnection implements ExceptionListener * @param queueName The queue name to put to * @param payload the content of the payload * @param copies the number of messages to put - * * @throws javax.jms.JMSException any exception that occurs */ - public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException + public void put(String queueName, String payload, int copies) + throws + JMSException { if (!connected) { @@ -163,8 +171,6 @@ public class QpidClientConnection implements ExceptionListener final MessageProducer sender = session.createProducer(queue); - sender.setDeliveryMode(deliveryMode); - for (int i = 0; i < copies; i++) { Message m = session.createTextMessage(payload + i); @@ -172,7 +178,13 @@ public class QpidClientConnection implements ExceptionListener sender.send(m); } - session.commit(); + try + { + session.commit(); + } catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } sender.close(); _logger.info("put " + copies + " copies"); } @@ -182,12 +194,12 @@ public class QpidClientConnection implements ExceptionListener * * @param queueName The quename to get from * @param readTimeout The timeout to use - * * @return the content of the text message if any - * * @throws javax.jms.JMSException any exception that occured */ - public Message getNextMessage(String queueName, long readTimeout) throws JMSException + public Message getNextMessage(String queueName, long readTimeout) + throws + JMSException { if (!connected) { @@ -208,12 +220,10 @@ public class QpidClientConnection implements ExceptionListener if (message instanceof TextMessage) { result = ((TextMessage) message); - } - else if (null == message) + } else if (null == message) { result = null; - } - else + } else { _logger.info("warning: received non-text message"); result = message; @@ -226,12 +236,12 @@ public class QpidClientConnection implements ExceptionListener * GET the top message on a queue. Consumes the message. * * @param queueName The Queuename to get from - * * @return The string content of the text message, if any received - * * @throws javax.jms.JMSException any exception that occurs */ - public Message getNextMessage(String queueName) throws JMSException + public Message getNextMessage(String queueName) + throws + JMSException { return getNextMessage(queueName, 0); } @@ -241,11 +251,13 @@ public class QpidClientConnection implements ExceptionListener * * @param queueName The Queue name to consume from * @param readTimeout The timeout for each consume - * * @throws javax.jms.JMSException Any exception that occurs during the consume * @throws InterruptedException If the consume thread was interrupted during a consume. */ - public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + public void consume(String queueName, int readTimeout) + throws + JMSException, + InterruptedException { if (!connected) { @@ -266,6 +278,7 @@ public class QpidClientConnection implements ExceptionListener session.commit(); consumer.close(); + _logger.info("consumed: " + messagesReceived); } } |
