diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-04-10 22:12:57 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-10 22:12:57 +0000 |
| commit | c01b7325c6e4ca251f2443bc92df6bcc0ed4ec25 (patch) | |
| tree | bed95b4adb462872c770d59b2a7ebc45b020bdcd /qpid/java | |
| parent | 3d098fe3d0206d1992baa21987accdbff0fe4ee9 (diff) | |
| download | qpid-python-c01b7325c6e4ca251f2443bc92df6bcc0ed4ec25.tar.gz | |
QPID-1803 : Test that selectors work when a QueueDepth is set.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@764081 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
2 files changed, 132 insertions, 29 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java index 280d897852..eb09bbeb16 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java @@ -24,7 +24,6 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.apache.log4j.PropertyConfigurator; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; @@ -53,22 +52,26 @@ import java.util.Hashtable; */ public class QueueDepthWithSelectorTest extends TestCase { - private static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class); + protected static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class); protected final String BROKER = "vm://:1"; protected final String VHOST = "test"; protected final String QUEUE = this.getClass().getName(); - private Context _context; + protected Context _context; - private Connection _clientConnection, _producerConnection; - private Session _clientSession, _producerSession; - private MessageProducer _producer; + protected Connection _clientConnection; + protected Connection _producerConnection; + private Session _clientSession; + protected Session _producerSession; + protected MessageProducer _producer; private MessageConsumer _consumer; - private static final int MSG_COUNT = 50; + protected static int MSG_COUNT = 50; - private Message[] _messages = new Message[MSG_COUNT]; + protected Message[] _messages = new Message[MSG_COUNT]; + + protected Queue _queue; protected void setUp() throws Exception { @@ -96,6 +99,9 @@ public class QueueDepthWithSelectorTest extends TestCase _context = factory.getInitialContext(env); + _messages = new Message[MSG_COUNT]; + _queue = (Queue) _context.lookup("queue"); + init(); } protected void tearDown() throws Exception @@ -120,8 +126,6 @@ public class QueueDepthWithSelectorTest extends TestCase public void test() throws Exception { - - init(); //Send messages _logger.info("Starting to send messages"); for (int msg = 0; msg < MSG_COUNT; msg++) @@ -134,34 +138,32 @@ public class QueueDepthWithSelectorTest extends TestCase //Verify we get all the messages. _logger.info("Verifying messages"); - verifyAllMessagesRecevied(); + verifyAllMessagesRecevied(0); //Close the connection.. .giving the broker time to clean up its state. _clientConnection.close(); //Verify Broker state _logger.info("Verifying broker state"); - verifyBrokerState(); + verifyBrokerState(0); } - private void init() throws NamingException, JMSException + protected void init() throws NamingException, JMSException, AMQException { - _messages = new Message[MSG_COUNT]; - //Create Producer _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); _producerConnection.start(); _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _producer = _producerSession.createProducer((Queue) _context.lookup("queue")); + _producer = _producerSession.createProducer(_queue); // Create consumer _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); _clientConnection.start(); _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _consumer = _clientSession.createConsumer((Queue) _context.lookup("queue"), "key = 23"); + _consumer = _clientSession.createConsumer(_queue, "key = 23"); } - private void verifyBrokerState() + protected void verifyBrokerState(int expectedDepth) { try { @@ -177,17 +179,13 @@ public class QueueDepthWithSelectorTest extends TestCase try { Thread.sleep(2000); - long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _context.lookup("queue")); - assertEquals("Session reports Queue depth not as expected", 0, queueDepth); + long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue); + assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth); } catch (InterruptedException e) { fail(e.getMessage()); } - catch (NamingException e) - { - fail(e.getMessage()); - } catch (AMQException e) { fail(e.getMessage()); @@ -206,7 +204,7 @@ public class QueueDepthWithSelectorTest extends TestCase } - private void verifyAllMessagesRecevied() throws Exception + protected void verifyAllMessagesRecevied(int expectedDepth) throws Exception { boolean[] msgIdRecevied = new boolean[MSG_COUNT]; @@ -216,8 +214,9 @@ public class QueueDepthWithSelectorTest extends TestCase _messages[i] = _consumer.receive(1000); assertNotNull("should have received a message but didn't", _messages[i]); } - long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _context.lookup("queue")); - assertEquals("Session reports Queue depth not as expected", 0, queueDepth); + + long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue); + assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth); //Check received messages int msgId = 0; @@ -246,8 +245,7 @@ public class QueueDepthWithSelectorTest extends TestCase * * @throws JMSException */ - - private Message nextMessage(int msgNo) throws JMSException + protected Message nextMessage(int msgNo) throws JMSException { Message send = _producerSession.createTextMessage("MessageReturnTest"); send.setIntProperty("ID", msgNo); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorUsingFlowToDiskTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorUsingFlowToDiskTest.java new file mode 100644 index 0000000000..67b127eeb8 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorUsingFlowToDiskTest.java @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.AMQException; + +import javax.jms.Session; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.naming.NamingException; +import java.util.HashMap; +import java.util.Map; + +public class QueueDepthWithSelectorUsingFlowToDiskTest extends QueueDepthWithSelectorTest +{ + + @Override + public void init() throws NamingException, JMSException, AMQException + { + //Incresae the number of messages to send + MSG_COUNT = 100; + + //Resize the array + _messages = new Message[MSG_COUNT]; + + + Map<String, Object> arguments = new HashMap<String, Object>(); + + //Ensure we can call createQueue with a priority int value + arguments.put(AMQQueueFactory.QPID_POLICY_TYPE.toString(), AMQQueueFactory.QPID_FLOW_TO_DISK); + // each message in the QueueDepthWithSelectorTest is 17 bytes each so only give space for half + arguments.put(AMQQueueFactory.QPID_MAX_SIZE.toString(), 8 * MSG_COUNT); + + //Create the FlowToDisk Queue + Connection connection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + AMQSession session = ((AMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); + session.createQueue(new AMQShortString(getName()), false, false, false, arguments); + + // Get a JMS reference to the new queue + _queue = session.createQueue(getName()); + connection.close(); + + super.init(); + } + + public void testOnlyGettingHalf() throws Exception + { + //Send messages + _logger.info("Starting to send messages"); + for (int msg = 0; msg < MSG_COUNT; msg++) + { + //Send a message that matches the selector + _producer.send(nextMessage(msg)); + + //Send one that doesn't + _producer.send(_producerSession.createTextMessage("MessageReturnTest")); + } + + + _logger.info("Closing connection"); + //Close the connection.. .giving the broker time to clean up its state. + _producerConnection.close(); + + //Verify we get all the messages. + _logger.info("Verifying messages"); + // Expecting there to be MSG_COUNT on the queue as we have sent + // MSG_COUNT * (one that matches selector and one that doesn't) + verifyAllMessagesRecevied(MSG_COUNT); + + //Close the connection.. .giving the broker time to clean up its state. + _clientConnection.close(); + + //Verify Broker state + _logger.info("Verifying broker state"); + verifyBrokerState(MSG_COUNT); + } + + + + + +} |
