diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-03-13 10:35:42 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-03-13 10:35:42 +0000 |
| commit | c4f018f7c10d2169ced4c59e776844ee5cf52d33 (patch) | |
| tree | 421d85cf41bf382bab618587298d9d6bef825bdc /java/systests | |
| parent | 685ad5615e73f02f76f69841162fb9aa126892d2 (diff) | |
| download | qpid-python-c4f018f7c10d2169ced4c59e776844ee5cf52d33.tar.gz | |
QPID-346,QPID-386,QPID-403, QPID-410 Rollback, Basic-Reject, QueueBrowser NO_ACK.
QPID-346 Message loss after rollback\recover
QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state.
QPID-403 Implement Basic.Reject
QPID-410 Queue Browsers should use not acknowledge messages.
-------------------------------------
Broker
TxAck - Added comment and fixed white space
UnacknowledgedMessage - Added comment for messageDecrement
AMQChannel - Added extra debugging.
+ Created a NonTransactionalContext for requeuing messages as using txContext will tie the requeue to any runing transaction.
+ Updated message reference counting. So it is in terms of queues don't increment when giving to client.
BasicCancelMethodHandler - Added Debug log.
BasicConsumeMethodHandler - Reverted to directly writes frames to the session, throwing ChannelException caused problems. Added Trace and debug logging.
BasicRejectMethodHandler, ChannelCloseHandler, ConnectionCloseMethodHandler - Added Debug logging
AMQPFastProtocolHandler - moved error log to before session.write
AMQMessage - Added additional debug via debugIdentity() and comments
AMQQueue - Decoupled reference counting from dequeue operation.
ConcurrentSelectorDeliveryManager - Added comments and increased info in debug logging
SubscriptionImpl - Disabled use of acks for browsers. For now put setDeliveredToConsumer back in the finally block. commented that I'm not sure this is correct as even an error writing to client will cause msg to be marked delivered to consumer.
+ On Close ensured that it is only called once.
+ Had problem where closing browser was causing two CancelOk frames to be sent back to client.
RequiredDeliveryException - Added comment to explain incrementReference
LocalTransactionalContext - Commented out incrementReference as it shouldn't be required here.
NonTransactionalContext - Removed incrementReference on deliver
+ - Fixed bug where browsers - acks would cause messages to be discarded. new JIRA this needs tidied up.
TxnBuffer - Added debug logging.
Client
------
AMQQueueBrowser - Added comments
AMQSession - Added comments and debug
+ Updated to cause closed consumer to reject messages rather than receive them.
+ Prevented NoConsumer's from rollingback and rejecting.. they simply clear their SyncQueue - JIRA to ensure clean state with rollback
BasicMessageConsumer - Added trace level debuging on close calls
+ Forced noConsume-rs to use NO_ACK
+ added more logging
Closeable - Updated to use isClosed rather than directly calling _closed.get() to aid in future work on ensuring multi threaded close still allows pending acks to be processed first.
ChannelCloseOkMethodHandler - updated comment
AMQProtocolSession - Update comments,whitespace
TransportConnection - removed static block
FlowControllingBlockingQueue - Added isEmpty() Method
PropertyValueTest - Added VM Broker setup
+ Updated test to run once and 50 times to pull out delivery tag problems that were occuring.
+ Adjusted logging level to be more helpful. moved some info down to trace and debug.
MessageRequeueTest - Moved QpidClientConnection its own file.
+ Fixed it so it actually runs more than one consumer, concurrently.Now 3 was 1.
ConcurrentLinkedMessageQueueAtomicSize - Implemented iterator().
Added QueueBrowserTest to system tests to test QueueBrowsering.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@517638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests')
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java | 117 | ||||
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java | 150 |
2 files changed, 267 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java new file mode 100644 index 0000000000..bbac06382d --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test; + +import junit.extensions.TestSetup; +import junit.framework.Test; +import junit.framework.TestCase; + +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import javax.jms.Queue; +import javax.jms.ConnectionFactory; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.MessageProducer; +import java.util.Hashtable; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.HashMap; + +public class VMTestCase extends TestCase +{ + protected long RECEIVE_TIMEOUT = 1000l; // 1 sec + protected long CLOSE_TIMEOUT = 10000l; // 10 secs + + protected Context _context; + protected String _clientID; + protected String _virtualhost; + protected String _brokerlist; + + protected final Map<String, String> _connections = new HashMap<String, String>(); + protected final Map<String, String> _queues = new HashMap<String, String>(); + protected final Map<String, String> _topics = new HashMap<String, String>(); + + protected void setUp() throws Exception + { + super.setUp(); + try + { + TransportConnection.createVMBroker(1); + } + catch (Exception e) + { + fail("Unable to create broker: " + e); + } + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + if (_clientID == null) + { + _clientID = this.getClass().getName(); + } + + if (_virtualhost == null) + { + _virtualhost = "/test"; + } + + if (_brokerlist == null) + { + _brokerlist = "vm://:1"; + } + + env.put("connectionfactory.connection", "amqp://client:client@" + + _clientID + _virtualhost + "?brokerlist='" + _brokerlist + "'"); + + for (Map.Entry<String, String> c : _connections.entrySet()) + { + env.put("connectionfactory." + c.getKey(), c.getValue()); + } + + env.put("queue.queue", "queue"); + + for (Map.Entry<String, String> q : _queues.entrySet()) + { + env.put("queue." + q.getKey(), q.getValue()); + } + + env.put("topic.topic", "topic"); + + for (Map.Entry<String, String> t : _topics.entrySet()) + { + env.put("topic." + t.getKey(), t.getValue()); + } + + _context = factory.getInitialContext(env); + } + + protected void tearDown() throws Exception + { + TransportConnection.killVMBroker(1); + super.tearDown(); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java new file mode 100644 index 0000000000..ac65eec979 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.client; + +import org.apache.qpid.test.VMTestCase; +import org.apache.log4j.Logger; + +import javax.jms.Queue; +import javax.jms.ConnectionFactory; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.MessageConsumer; +import javax.jms.QueueBrowser; +import javax.jms.TextMessage; +import javax.jms.JMSException; +import javax.jms.QueueReceiver; +import javax.jms.Message; +import java.util.Enumeration; + +public class QueueBrowserTest extends VMTestCase +{ + private static final Logger _logger = Logger.getLogger(QueueBrowserTest.class); + + private static final int MSG_COUNT = 10; + + private Connection _clientConnection; + private Session _clientSession; + private Queue _queue; + + public void setUp() throws Exception + { + + super.setUp(); + + _queue = (Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + //Create Producer put some messages on the queue + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.send(producerSession.createTextMessage("Message " + msg)); + } + + producerConnection.close(); + + } + + /* + * Test Messages Remain on Queue + * Create a queu and send messages to it. Browse them and then receive them all to verify they were still there + * + */ + + public void queueBrowserMsgsRemainOnQueueTest() throws JMSException + { + + // create QueueBrowser + _logger.info("Creating Queue Browser"); + + QueueBrowser queueBrowser = _clientSession.createBrowser(_queue); + + // check for messages + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking for " + MSG_COUNT + " messages with QueueBrowser"); + } + + int msgCount = 0; + Enumeration msgs = queueBrowser.getEnumeration(); + + while (msgs.hasMoreElements()) + { + msgs.nextElement(); + msgCount++; + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Found " + msgCount + " messages total in browser"); + } + + // check to see if all messages found +// assertEquals("browser did not find all messages", MSG_COUNT, msgCount); + if (msgCount != MSG_COUNT) + { + _logger.warn(msgCount + "/" + MSG_COUNT + " messages received."); + } + + //Close browser + queueBrowser.close(); + + // VERIFY + + // continue and try to receive all messages + MessageConsumer consumer = _clientSession.createConsumer(_queue); + + _logger.info("Verify messages are still on the queue"); + + Message tempMsg; + + for (msgCount = 0; msgCount < MSG_COUNT; msgCount++) + { + tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT); + if (tempMsg == null) + { + fail("Message " + msgCount + " not retrieved from queue"); + } + } + + _logger.info("All messages recevied from queue"); + } + + +} |
