diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-08-30 12:19:31 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-08-30 12:19:31 +0000 |
| commit | 61a61c3716e42bf175004049976391407f28704d (patch) | |
| tree | 8ab343c1941a7565532189763dc79473a10beb3c /java/client | |
| parent | e183227707d150b1f42e750df0e90cd7dac8744e (diff) | |
| download | qpid-python-61a61c3716e42bf175004049976391407f28704d.tar.gz | |
Remerge of M2. All tests pass locally
Testing done in Intelij and mvn command line via windows/cygwin.
Python tests removed from auto build pending Jython-siztion. Tested running broker in intelij and python run-tests from cygwin.
All tests pass. (CombinedTest still exhibts a race condition. but that has always been so.)
Additional Race condition identified (around MsgReject/AutoDeleteQueues) during testing patch to follow.
systests are inconsistent Some use TestableMemoryMessageStore some use MemoryMessgaeStore.
Lets not roll back this change if issues are discovered. Lets work together to go forward and address any issues. I have spent a lot of time ensuring the tests work for me so I hope that they work for you.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@571129 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
21 files changed, 771 insertions, 229 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java index 69960e54e5..1240284a56 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,13 +19,12 @@ package org.apache.qpid.example.publisher; -import java.io.*; - -import javax.jms.*; - import org.apache.qpid.example.shared.FileUtils; import org.apache.qpid.example.shared.Statics; +import java.io.*; +import javax.jms.*; + public class FileMessageFactory { protected final Session _session; diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index 8784d340da..8c51f61522 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -18,18 +18,18 @@ */ package org.apache.qpid.example.publisher; -import org.apache.log4j.Logger; import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; -import javax.jms.*; - -import java.util.Properties; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; /** - * Class that sends heartbeat messages to allow monitoring of message consumption - * Sends regular (currently 20 seconds apart) heartbeat message + * Class that sends heartbeat messages to allow monitoring of message consumption Sends regular (currently 20 seconds + * apart) heartbeat message */ -public class MonitorMessageDispatcher { +public class MonitorMessageDispatcher +{ private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class); @@ -39,17 +39,19 @@ public class MonitorMessageDispatcher { /** * Easy entry point for running a message dispatcher for monitoring consumption + * Sends 1000 messages with no delay + * * @param args */ public static void main(String[] args) { - //Switch on logging appropriately for your app BasicConfigurator.configure(); try { - while(true) + int i =0; + while (i < 1000) { try { @@ -62,9 +64,10 @@ public class MonitorMessageDispatcher { } //sleep for twenty seconds and then publish again - change if appropriate - Thread.sleep(20000); + //Thread.sleep(1000); + i++ ; } - catch(UndeliveredMessageException a) + catch (UndeliveredMessageException a) { //trigger application specific failure handling here _logger.error("Problem delivering monitor message"); @@ -72,7 +75,7 @@ public class MonitorMessageDispatcher { } } } - catch(Exception e) + catch (Exception e) { _logger.error("Error trying to dispatch AMS monitor message: " + e); System.exit(1); @@ -81,7 +84,7 @@ public class MonitorMessageDispatcher { { if (getMonitorPublisher() != null) { - getMonitorPublisher().cleanup(); + getMonitorPublisher().cleanup(); } } @@ -90,19 +93,24 @@ public class MonitorMessageDispatcher { /** * Publish heartbeat message + * * @throws JMSException * @throws UndeliveredMessageException */ public static void publish() throws JMSException, UndeliveredMessageException { //Send the message generated from the payload using the _publisher - getMonitorPublisher().sendImmediateMessage - (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); +// getMonitorPublisher().sendImmediateMessage +// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); + + getMonitorPublisher().sendMessage + (getMonitorPublisher()._session, + FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()), + DeliveryMode.PERSISTENT, false, true); + } - /** - * Cleanup publishers - */ + /** Cleanup publishers */ public static void cleanup() { if (getMonitorPublisher() != null) @@ -119,16 +127,16 @@ public class MonitorMessageDispatcher { //Returns a _publisher for the monitor queue private static MonitorPublisher getMonitorPublisher() { - if (_monitorPublisher != null) - { - return _monitorPublisher; - } + if (_monitorPublisher != null) + { + return _monitorPublisher; + } - //Create a _publisher using failover details and constant for monitor queue - _monitorPublisher = new MonitorPublisher(); + //Create a _publisher using failover details and constant for monitor queue + _monitorPublisher = new MonitorPublisher(); - _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); - return _monitorPublisher; + _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); + return _monitorPublisher; } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java index 233c3fea0a..a67b602e58 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java @@ -18,15 +18,17 @@ */ package org.apache.qpid.example.publisher; -import javax.jms.Message; +import org.apache.log4j.Logger; +import org.apache.qpid.client.BasicMessageProducer; + import javax.jms.DeliveryMode; import javax.jms.JMSException; -import org.apache.qpid.client.BasicMessageProducer; -import org.apache.log4j.Logger; +import javax.jms.Message; +import javax.jms.Session; /** - * Subclass of Publisher which uses QPID functionality to send a heartbeat message - * Note immediate flag not available via JMS MessageProducer + * Subclass of Publisher which uses QPID functionality to send a heartbeat message Note immediate flag not available via + * JMS MessageProducer */ public class MonitorPublisher extends Publisher { @@ -40,14 +42,45 @@ public class MonitorPublisher extends Publisher super(); } - /* - * Publishes a non-persistent message using transacted session - */ + /* + * Publishes a message using given details + */ + public boolean sendMessage(Session session, Message message, int deliveryMode, + boolean immediate, boolean commit) throws UndeliveredMessageException + { + try + { + _producer = (BasicMessageProducer) session.createProducer(_destination); + + _producer.send(message, deliveryMode, immediate); + + if (commit) + { + //commit the message send and close the transaction + _session.commit(); + } + + } + catch (JMSException e) + { + //Have to assume our commit failed but do not rollback here as channel closed + _log.error(e); + e.printStackTrace(); + throw new UndeliveredMessageException("Cannot deliver immediate message", e); + } + + _log.info(_name + " finished sending message: " + message); + return true; + } + + /* + * Publishes a non-persistent message using transacted session + */ public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException { try { - _producer = (BasicMessageProducer)_session.createProducer(_destination); + _producer = (BasicMessageProducer) _session.createProducer(_destination); //Send message via our producer which is not persistent and is immediate //NB: not available via jms interface MessageProducer @@ -62,7 +95,7 @@ public class MonitorPublisher extends Publisher //Have to assume our commit failed but do not rollback here as channel closed _log.error(e); e.printStackTrace(); - throw new UndeliveredMessageException("Cannot deliver immediate message",e); + throw new UndeliveredMessageException("Cannot deliver immediate message", e); } _log.info(_name + " finished sending message: " + message); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java new file mode 100644 index 0000000000..e32ee0ba73 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.naming.NamingException; + +/** + * An abstract base class that wraps up the creation of a JMS client utilising JNDI + */ +public abstract class Client +{ + protected ConnectionSetup _setup; + + protected Connection _connection; + protected Destination _destination; + protected Session _session; + + public Client(String destination) + { + if (destination == null) + { + destination = ConnectionSetup.TOPIC_JNDI_NAME; + } + + try + { + _setup = new ConnectionSetup(); + } + catch (NamingException e) + { + //ignore + } + + if (_setup != null) + { + try + { + _connection = _setup.getConnectionFactory().createConnection(); + _destination = _setup.getDestination(destination); + } + catch (JMSException e) + { + System.err.println(e.getMessage()); + } + } + } + + public abstract void start(); + +}
\ No newline at end of file diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java new file mode 100644 index 0000000000..c4edd9034f --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; + +/** + * This ConnectionSetup is a wrapper around JNDI it creates a number of entries. + * + * It is equivalent to a PropertyFile of value: + * + * connectionfactory.local=amqp://guest:guest@clientid/test?brokerlist='localhost' + * connectionfactory.vm=amqp://guest:guest@clientid/test?brokerlist='vm://:1' + * + * queue.queue=example.MyQueue + * topic.topic=example.hierarical.topic + * + */ +public class ConnectionSetup +{ + final static String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + final static String CONNECTION_JNDI_NAME = "local"; + final static String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='localhost'"; + + public static final String QUEUE_JNDI_NAME = "queue"; + final static String QUEUE_NAME = "example.MyQueue"; + + public static final String TOPIC_JNDI_NAME = "topic"; + final static String TOPIC_NAME = "example.hierarical.topic"; + + private Context _ctx; + + public ConnectionSetup() throws NamingException + { + + // Set the properties ... + Properties properties = new Properties(); + properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); + properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME); + properties.put("connectionfactory." + "vm", "amqp://guest:guest@clientid/test?brokerlist='vm://:1'"); + + properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME); + properties.put("topic." + TOPIC_JNDI_NAME, TOPIC_NAME); + // Create the initial context + _ctx = new InitialContext(properties); + + } + + public ConnectionSetup(Properties properties) throws NamingException + { + _ctx = new InitialContext(properties); + } + + public ConnectionFactory getConnectionFactory() + { + + // Perform the lookups + try + { + return (ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME); + } + catch (NamingException e) + { + //ignore + } + return null; + } + + public Destination getDestination(String jndiName) + { + // Perform the lookups + try + { + return (Destination) _ctx.lookup(jndiName); + } + catch (ClassCastException cce) + { + //ignore + } + catch (NamingException ne) + { + //ignore + } + return null; + } + + + public void close() + { + try + { + _ctx.close(); + } + catch (NamingException e) + { + //ignore + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java new file mode 100644 index 0000000000..dd936e429f --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * A simple Publisher example. + * + * The class can take two arguments. + * java Publisher <destination> <msgCount> + * Where: + * destination is either 'topic' or 'queue' (Default: topic) + * msgCount is the number of messages to send (Default : 100) + * + */ +public class Publisher extends Client +{ + int _msgCount; + + public Publisher(String destination, int msgCount) + { + super(destination); + _msgCount = msgCount; + } + + public void start() + { + try + { + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer _producer = _session.createProducer(_destination); + + for (int msgCount = 0; msgCount < _msgCount; msgCount++) + { + _producer.send(_session.createTextMessage("msg:" + msgCount)); + System.out.println("Sent:" + msgCount); + } + + System.out.println("Done."); + _connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + + public static void main(String[] args) + { + + String destination = args.length > 2 ? args[1] : null; + + int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; + + new Publisher(destination, msgCount).start(); + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java new file mode 100644 index 0000000000..f2d736701f --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.util.concurrent.CountDownLatch; + + +/** + * Simple client that listens for the specified number of msgs on the given Destinaton + * + * The class can take two arguments. + * java Subscriber <destination> <msgCount> + * Where: + * destination is either 'topic' or 'queue' (Default: topic) + * msgCount is the number of messages to send (Default : 100) + */ +public class Subscriber extends Client implements MessageListener +{ + + CountDownLatch _count; + + public Subscriber(String destination, int msgCount) + { + super(destination); + _count = new CountDownLatch(msgCount); + } + + + public void start() + { + try + { + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _session.createDurableSubscriber((Topic) _setup.getDestination(ConnectionSetup.TOPIC_JNDI_NAME), + "exampleClient").setMessageListener(this); + _connection.start(); + _count.await(); + + System.out.println("Done"); + + _connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + public static void main(String[] args) + { + String destination = args.length > 2 ? args[1] : null; + int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; + + new Subscriber(destination, msgCount).start(); + } + + public void onMessage(Message message) + { + try + { + _count.countDown(); + System.out.println("Received msg:" + ((TextMessage) message).getText()); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} diff --git a/java/client/pom.xml b/java/client/pom.xml index 9bb448b631..9509043ae4 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -123,10 +123,54 @@ <build> <plugins> + + <plugin> + <artifactId>minijar-maven-plugin</artifactId> + <groupId>org.codehaus.mojo</groupId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>ueberjar</goal> + </goals> + <configuration> + <stripUnusedClasses>false</stripUnusedClasses> + <name>[artifactId]-[version]-single.jar</name> + <classifier>single</classifier> + <attach>true</attach> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${artifactId}-${version}-single.jar</file> + <type>jar</type> + <classifier>single</classifier> + </artifact> + </artifacts> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-antrun-plugin</artifactId> - </plugin> + </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index d59412fdba..1ac43f4388 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -46,11 +46,9 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; import javax.jms.Destination; @@ -68,7 +66,6 @@ import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; - import java.io.IOException; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; @@ -1106,6 +1103,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { + //Should never get here as all AMQEs are required to have an ErrorCode! je = new JMSException("Exception thrown against " + toString() + ": " + cause); } @@ -1148,7 +1146,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - _logger.info("Not a hard-error connection not closing."); + _logger.info("Not a hard-error connection not closing: " + cause.getMessage()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index b3fbd1f510..eff6360d91 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -20,6 +20,14 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; @@ -35,6 +43,8 @@ import org.apache.qpid.url.URLSyntaxException; public class AMQConnectionURL implements ConnectionURL { + private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionURL.class); + private String _url; private String _failoverMethod; private HashMap<String, String> _failoverOptions; @@ -182,7 +192,7 @@ public class AMQConnectionURL implements ConnectionURL if (colonIndex == -1) { throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), - "Null password in user information not allowed.", _url); + "Null password in user information not allowed.", _url); } else { @@ -387,7 +397,14 @@ public class AMQConnectionURL implements ConnectionURL if (_password != null) { sb.append(':'); - sb.append(_password); + if (_logger.isDebugEnabled()) + { + sb.append(_password); + } + else + { + sb.append("********"); + } } sb.append('@'); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 879578bd6c..af469ee291 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -73,7 +73,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,7 +100,6 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; - import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -293,6 +291,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ private final boolean _strictAMQPFATAL; + private final Object _messageDeliveryLock = new Object(); /** * Creates a new session on a connection. @@ -505,49 +504,53 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_connection.getFailoverMutex()) + synchronized (_messageDeliveryLock) { - // Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) - { - // we pass null since this is not an error case - closeProducersAndConsumers(null); - try + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (_connection.getFailoverMutex()) + { + // Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { + // we pass null since this is not an error case + closeProducersAndConsumers(null); - getProtocolHandler().closeSession(this); + try + { - final AMQFrame frame = - ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText + getProtocolHandler().closeSession(this); - getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + final AMQFrame frame = + ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully. - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error closing session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - // This is ignored because the channel is already marked as closed so the fail-over process will - // not re-open it. - catch (FailoverException e) - { - _logger.debug( - "Got FailoverException during channel close, ignored as channel already marked as closed."); - } - finally - { - _connection.deregisterSession(_channelId); + getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + // This is ignored because the channel is already marked as closed so the fail-over process will + // not re-open it. + catch (FailoverException e) + { + _logger.debug( + "Got FailoverException during channel close, ignored as channel already marked as closed."); + } + finally + { + _connection.deregisterSession(_channelId); + } } } } @@ -560,23 +563,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized (_messageDeliveryLock) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - _closed.set(true); - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else + synchronized (_connection.getFailoverMutex()) { - amqe = new AMQException(null, "Closing session forcibly", e); - } + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + _closed.set(true); + AMQException amqe; + if (e instanceof AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException(null, "Closing session forcibly", e); + } - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); + } } } @@ -1279,7 +1285,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - public void declareAndBind(AMQDestination amqd) + public void declareAndBind(AMQDestination amqd) throws AMQException { @@ -2664,7 +2670,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _lock.wait(); } - dispatchMessage(message); + synchronized (_messageDeliveryLock) + { + dispatchMessage(message); + } while (connectionStopped()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java index 8f9a84a3a6..862a9be8d4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java @@ -1,18 +1,21 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.client.handler; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java index 81228b4cdc..65060d44d2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java @@ -1,18 +1,21 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.client.handler; diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index a00078b010..2c435aba6c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -25,7 +25,6 @@ import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; - import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -55,7 +54,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.SSLContextFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -273,10 +271,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (_failoverState != FailoverState.IN_PROGRESS) { - _logger.info("sessionClose() not allowed to failover"); - _connection.exceptionReceived( - new AMQDisconnectedException("Server closed connection and reconnection " + - "not permitted.", null)); + _logger.debug("sessionClose() not allowed to failover"); + _connection.exceptionReceived(new AMQDisconnectedException( + "Server closed connection and reconnection " + "not permitted.", null)); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index bef3180041..9f430d76a7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -51,7 +51,6 @@ import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -254,8 +253,10 @@ public class AMQStateManager implements AMQMethodListener if (_currentState != s) { - _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); - throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s, null); + _logger.warn("State not achieved within permitted time. Current state " + _currentState + + ", desired state: " + s); + throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState + + ", desired state: " + s, null); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 459579d920..4cda53a6a1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -26,11 +26,9 @@ import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; - import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,40 +88,40 @@ public class TransportConnection switch (transport) { - case TCP: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + case TCP: + _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() { - public IoConnector newSocketConnector() + SocketConnector result; + // FIXME - this needs to be sorted to use the new Mina MultiThread SA. + if (Boolean.getBoolean("qpidnio")) { - SocketConnector result; - // FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (Boolean.getBoolean("qpidnio")) - { - _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set."); - // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector - } - // else - - { - _logger.info("Using Mina NIO"); - result = new SocketConnector(); // non-blocking connector - } - - // Don't have the connector's worker thread wait around for other connections (we only use - // one SocketConnector per connection at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - result.setWorkerTimeout(0); - - return result; + _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set."); + // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector } - }); - break; + // else - case VM: - { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - break; - } + { + _logger.info("Using Mina NIO"); + result = new SocketConnector(); // non-blocking connector + } + + // Don't have the connector's worker thread wait around for other connections (we only use + // one SocketConnector per connection at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + result.setWorkerTimeout(0); + + return result; + } + }); + break; + + case VM: + { + _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + break; + } } return _instance; @@ -145,7 +143,7 @@ public class TransportConnection } private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) - throws AMQVMBrokerCreationException + throws AMQVMBrokerCreationException { int port = details.getPort(); @@ -158,7 +156,7 @@ public class TransportConnection else { throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port - + " does not exist. Auto create disabled.", null); + + " does not exist. Auto create disabled.", null); } } @@ -253,8 +251,8 @@ public class TransportConnection IoHandlerAdapter provider; try { - Class[] cnstr = { Integer.class }; - Object[] params = { port }; + Class[] cnstr = {Integer.class}; + Object[] params = {port}; provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); @@ -273,7 +271,7 @@ public class TransportConnection } AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); amqbce.initCause(e); throw amqbce; } diff --git a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java index 1818132be0..dc0d9b8c78 100644 --- a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java @@ -37,6 +37,16 @@ public class AMQVMBrokerCreationException extends AMQTransportConnectionExceptio { private int _port; + /** + * @param port + * + * @deprecated + */ + public AMQVMBrokerCreationException(int port) + { + this(null, port, "Unable to create vm broker", null); + } + public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause) { super(errorCode, message, cause); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java index d19a6095d5..9600d1e9d3 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -1,18 +1,21 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.test.unit.client.channelclose; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 588c82221e..56394fee27 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -47,12 +47,12 @@ public class ConnectionTest extends TestCase protected void setUp() throws Exception { super.setUp(); -// TransportConnection.createVMBroker(1); + TransportConnection.createVMBroker(1); } protected void tearDown() throws Exception { -// TransportConnection.killVMBroker(1); + TransportConnection.killVMBroker(1); } public void testSimpleConnection() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java index 9c354ee260..9cde24dd92 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java @@ -21,9 +21,7 @@ package org.apache.qpid.test.unit.client.forwardall; import junit.framework.TestCase; - import org.apache.qpid.testutil.VMBrokerSetup; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +32,7 @@ import org.slf4j.LoggerFactory; public class CombinedTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class); + private int run = 0; protected void setUp() throws Exception { @@ -48,14 +47,18 @@ public class CombinedTest extends TestCase public void testForwardAll() throws Exception { - int services = 2; - ServiceCreator.start("vm://:1", services); + while (run < 10) + { + int services = 2; + ServiceCreator.start("vm://:1", services); + + _logger.info("Starting " + ++run + " client..."); - _logger.info("Starting client..."); + new Client("vm://:1", services).shutdownWhenComplete(); - new Client("vm://:1", services).shutdownWhenComplete(); - _logger.info("Completed successfully!"); + _logger.info("Completed " + run + " successfully!"); + } } public static junit.framework.Test suite() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index df2a38d0fc..1a45773907 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -21,12 +21,10 @@ package org.apache.qpid.test.unit.transacted; import junit.framework.TestCase; - import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +55,9 @@ public class CommitRollbackTest extends TestCase private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class); private static final String BROKER = "vm://:1"; + private boolean _gotone = false; + private boolean _gottwo = false; + private boolean _gottwoRedelivered = false; protected void setUp() throws Exception { @@ -340,57 +341,98 @@ public class CommitRollbackTest extends TestCase * * @throws Exception On error */ - /*public void testSend2ThenRollback() throws Exception + public void testSend2ThenRollback() throws Exception { - assertTrue("session is not transacted", _session.getTransacted()); - assertTrue("session is not transacted", _pubSession.getTransacted()); + int run = 0; + while (run < 10) + { + run++; + _logger.info("Run:" + run); + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending two test messages"); - _publisher.send(_pubSession.createTextMessage("1")); - _publisher.send(_pubSession.createTextMessage("2")); - _pubSession.commit(); + _logger.info("sending two test messages"); + _publisher.send(_pubSession.createTextMessage("1")); + _publisher.send(_pubSession.createTextMessage("2")); + _pubSession.commit(); - _logger.info("getting test message"); - assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); + _logger.info("getting test message"); + assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); - _logger.info("rolling back"); - _session.rollback(); + _logger.info("rolling back"); + _session.rollback(); - _logger.info("receiving result"); - Message result = _consumer.receive(1000); + _logger.info("receiving result"); + Message result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); + assertNotNull("test message was consumed and rolled back, but is gone", result); + // Message Order is: - if (((TextMessage) result).getText().equals("2")) - { - assertTrue("Messasge is marked as redelivered", !result.getJMSRedelivered()); + // Send 1 , 2 + // Retrieve 1 and then rollback + // Receieve 1 (redelivered) , 2 (may or may not be redelivered??) - result = _consumer.receive(1000); - assertEquals("1", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + verifyMessages(result); + + // Occassionally get message 2 first! +// assertEquals("Should get message one first", "1", ((TextMessage) result).getText()); +// assertTrue("Message is not marked as redelivered", result.getJMSRedelivered()); +// +// result = _consumer.receive(1000); +// assertEquals("Second message should be message 2", "2", ((TextMessage) result).getText()); +// assertTrue("Message is not marked as redelivered", result.getJMSRedelivered()); +// +// result = _consumer.receive(1000); +// assertNull("There should be no more messages", result); + + _session.commit(); } - else + } + + private void verifyMessages(Message result) throws JMSException + { + + if (result == null) { - assertEquals("1", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); - result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("2", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + assertTrue("Didn't receive redelivered message one", _gotone); + assertTrue("Didn't receive message two at all", _gottwo | _gottwoRedelivered); + _gotone = false; + _gottwo = false; + _gottwoRedelivered = false; + return; } - result = _consumer.receive(1000); + if (((TextMessage) result).getText().equals("1")) + { + _logger.info("Got 1 redelivered"); + assertTrue("Message is not marked as redelivered", result.getJMSRedelivered()); + assertFalse("Already received message one", _gotone); + _gotone = true; - if (result != null) + } + else { assertEquals("2", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); - result = _consumer.receive(1000); + + if (result.getJMSRedelivered()) + { + _logger.info("Got 2 redelivered, message was prefetched"); + assertFalse("Already received message redelivered two", _gottwoRedelivered); + + _gottwoRedelivered = true; + } + else + { + _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured"); + assertFalse("Already received message two", _gottwo); + + _gottwo = true; + } } - assertNull("test message should be null", result); - }*/ + verifyMessages(_consumer.receive(1000)); + } public void testSend2ThenCloseAfter1andTryAgain() throws Exception { @@ -417,12 +459,12 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); - // NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected. - // Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet. +// NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected. +// Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet. result = _consumer.receive(1000); assertNotNull("test message was consumed and rolled back, but is gone", result); - // The first message back will be either 1 or 2 being redelivered +// The first message back will be either 1 or 2 being redelivered if (result.getJMSRedelivered()) { assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); |
