summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-02-27 12:47:15 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-02-27 12:47:15 +0000
commit499cea57dbf11e4c73cdcd56ac268889ff36b880 (patch)
tree8b2cf69d9c98ea5a79244bc1d3bc85482fe4618a /java
parente7d09b6a5e3293021857541bab7aa4ae163ab05c (diff)
downloadqpid-python-499cea57dbf11e4c73cdcd56ac268889ff36b880.tar.gz
QPID-509 : [Java Client] Messages sent to topics should not default to mandatory
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1294135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java44
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java205
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java98
7 files changed, 239 insertions, 138 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index e7e937b689..30f1dcf8b7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -120,18 +120,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
/**
- * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
- * not need to be attached to a queue.
- */
- private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
-
- /**
- * The default value for mandatory flag used by producers created by this session is true. That is, server will not
- * silently drop messages where no queue is connected to the exchange for the message.
- */
- private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
-
- /**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
*/
@@ -1198,12 +1186,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public P createProducer(Destination destination) throws JMSException
{
- return createProducerImpl(destination, _defaultMandatoryValue, _defaultImmediateValue);
+ return createProducerImpl(destination, null, null);
}
public P createProducer(Destination destination, boolean immediate) throws JMSException
{
- return createProducerImpl(destination, _defaultMandatoryValue, immediate);
+ return createProducerImpl(destination, null, immediate);
}
public P createProducer(Destination destination, boolean mandatory, boolean immediate)
@@ -2613,7 +2601,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void sendConsume(C consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException;
- private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
+ private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
throws JMSException
{
return new FailoverRetrySupport<P, JMSException>(
@@ -2642,8 +2630,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}, _connection).execute();
}
- public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final long producerId) throws JMSException;
+ public abstract P createMessageProducer(final Destination destination, final Boolean mandatory,
+ final Boolean immediate, final long producerId) throws JMSException;
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 816ad1f222..36dbad1928 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -17,9 +17,20 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.jms.Destination;
+import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
@@ -27,7 +38,6 @@ import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.FieldTableSupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
@@ -42,28 +52,14 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.*;
-import org.apache.qpid.util.Serial;
-import org.apache.qpid.util.Strings;
-
import static org.apache.qpid.transport.Option.BATCH;
import static org.apache.qpid.transport.Option.NONE;
import static org.apache.qpid.transport.Option.SYNC;
import static org.apache.qpid.transport.Option.UNRELIABLE;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.qpid.util.Serial;
+import org.apache.qpid.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a 0.10 Session
@@ -654,8 +650,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Create an 0_10 message producer
*/
- public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final long producerId) throws JMSException
+ public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final Boolean mandatory,
+ final Boolean immediate, final long producerId) throws JMSException
{
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 29f1925cbc..b56da5c2ec 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -441,8 +441,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
- public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, long producerId) throws JMSException
+ public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final Boolean mandatory,
+ final Boolean immediate, long producerId) throws JMSException
{
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 75f198e1fa..56739f17f9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -20,18 +20,8 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.util.UUIDGen;
-import org.apache.qpid.util.UUIDs;
-
+import java.io.UnsupportedEncodingException;
+import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -42,74 +32,19 @@ import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
-import java.io.UnsupportedEncodingException;
-import java.util.UUID;
+import javax.jms.Topic;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageConverter;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.UUIDGen;
+import org.apache.qpid.util.UUIDs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
- /**
- * If true, messages will not get a timestamp.
- */
- protected boolean isDisableTimestamps()
- {
- return _disableTimestamps;
- }
-
- protected void setDisableTimestamps(boolean disableTimestamps)
- {
- _disableTimestamps = disableTimestamps;
- }
-
- protected void setDestination(AMQDestination destination)
- {
- _destination = destination;
- }
-
- protected AMQProtocolHandler getProtocolHandler()
- {
- return _protocolHandler;
- }
-
- protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
- {
- _protocolHandler = protocolHandler;
- }
-
- protected int getChannelId()
- {
- return _channelId;
- }
-
- protected void setChannelId(int channelId)
- {
- _channelId = channelId;
- }
-
- protected void setSession(AMQSession session)
- {
- _session = session;
- }
-
- protected String getUserID()
- {
- return _userID;
- }
-
- protected void setUserID(String userID)
- {
- _userID = userID;
- }
-
- protected PublishMode getPublishMode()
- {
- return publishMode;
- }
-
- protected void setPublishMode(PublishMode publishMode)
- {
- this.publishMode = publishMode;
- }
-
enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
private final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -166,7 +101,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private final boolean _immediate;
- private final boolean _mandatory;
+ private final Boolean _mandatory;
private boolean _disableMessageId;
@@ -174,12 +109,34 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private String _userID; // ref user id used in the connection.
- private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
+
+ /**
+ * The default value for immediate flag used this producer is false. That is, a consumer does
+ * not need to be attached to a queue.
+ */
+ private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+
+ /**
+ * The default value for mandatory flag used by this producer is true. That is, server will not
+ * silently drop messages where no queue is connected to the exchange for the message.
+ */
+ private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+ /**
+ * The default value for mandatory flag used by this producer when publishing to a Topic is false. That is, server
+ * will silently drop messages where no queue is connected to the exchange for the message.
+ */
+ private final boolean _defaultMandatoryTopicValue =
+ Boolean.parseBoolean(System.getProperty("qpid.default_mandatory_topic",
+ System.getProperties().containsKey("qpid.default_mandatory")
+ ? System.getProperty("qpid.default_mandatory")
+ : "false"));
private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+ Boolean immediate, Boolean mandatory) throws AMQException
{
_connection = connection;
_destination = destination;
@@ -193,8 +150,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
declareDestination(destination);
}
- _immediate = immediate;
- _mandatory = mandatory;
+ _immediate = immediate == null ? _defaultImmediateValue : immediate;
+ _mandatory = mandatory == null
+ ? destination == null ? null
+ : destination instanceof Topic
+ ? _defaultMandatoryTopicValue
+ : _defaultMandatoryValue
+ : mandatory;
+
_userID = connection.getUsername();
setPublishMode();
}
@@ -381,7 +344,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
+ sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
+ _mandatory == null
+ ? destination instanceof Topic
+ ? _defaultMandatoryTopicValue
+ : _defaultMandatoryValue
+ : _mandatory,
_immediate);
}
}
@@ -394,7 +362,13 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
+ _mandatory == null
+ ? destination instanceof Topic
+ ? _defaultMandatoryTopicValue
+ : _defaultMandatoryValue
+ : _mandatory,
+ _immediate);
}
}
@@ -646,6 +620,69 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
+ /**
+ * If true, messages will not get a timestamp.
+ */
+ protected boolean isDisableTimestamps()
+ {
+ return _disableTimestamps;
+ }
+
+ protected void setDisableTimestamps(boolean disableTimestamps)
+ {
+ _disableTimestamps = disableTimestamps;
+ }
+
+ protected void setDestination(AMQDestination destination)
+ {
+ _destination = destination;
+ }
+
+ protected AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
+ protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
+ {
+ _protocolHandler = protocolHandler;
+ }
+
+ protected int getChannelId()
+ {
+ return _channelId;
+ }
+
+ protected void setChannelId(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ protected void setSession(AMQSession session)
+ {
+ _session = session;
+ }
+
+ protected String getUserID()
+ {
+ return _userID;
+ }
+
+ protected void setUserID(String userID)
+ {
+ _userID = userID;
+ }
+
+ protected PublishMode getPublishMode()
+ {
+ return publishMode;
+ }
+
+ protected void setPublishMode(PublishMode publishMode)
+ {
+ this.publishMode = publishMode;
+ }
+
Logger getLogger()
{
return _logger;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 024219cfd6..db5baed586 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -61,7 +61,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- boolean immediate, boolean mandatory) throws AMQException
+ Boolean immediate, Boolean mandatory) throws AMQException
{
super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 3b5e361f97..7fd8feef54 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -44,7 +44,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
{
BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
index 484146b690..b746a5b09e 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
@@ -18,11 +18,9 @@
*/
package org.apache.qpid.test.client;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQNoRouteException;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
@@ -32,9 +30,10 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase implements ExceptionListener
{
@@ -202,7 +201,88 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
return message;
}
- @Override
+ public void testMandatoryAndImmediateDefaults() throws JMSException, InterruptedException
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // publish to non-existent queue - should get mandatory failure
+ MessageProducer producer = session.createProducer(session.createQueue(getTestQueueName()));
+ Message message = session.createMessage();
+ producer.send(message);
+
+ JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
+ assertNotNull("JMSException is expected", exception);
+ AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
+ assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
+ Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
+ assertNotNull("Bounced Message is expected", bounceMessage);
+ assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+
+ producer = session.createProducer(null);
+ message = session.createMessage();
+ producer.send(session.createQueue(getTestQueueName()), message);
+
+ exception = _exceptions.poll(10, TimeUnit.SECONDS);
+ assertNotNull("JMSException is expected", exception);
+ noRouteException = (AMQNoRouteException) exception.getLinkedException();
+ assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
+ bounceMessage = (Message) noRouteException.getUndeliveredMessage();
+ assertNotNull("Bounced Message is expected", bounceMessage);
+ assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+
+
+ // publish to non-existent topic - should get no failure
+ producer = session.createProducer(session.createTopic(getTestQueueName()));
+ message = session.createMessage();
+ producer.send(message);
+
+ exception = _exceptions.poll(1, TimeUnit.SECONDS);
+ assertNull("Unexpected JMSException", exception);
+
+ producer = session.createProducer(null);
+ message = session.createMessage();
+ producer.send(session.createTopic(getTestQueueName()), message);
+
+ exception = _exceptions.poll(1, TimeUnit.SECONDS);
+ assertNull("Unexpected JMSException", exception);
+
+ session.close();
+ }
+
+ public void testMandatoryAndImmediateSystemProperties() throws JMSException, InterruptedException
+ {
+ setTestClientSystemProperty("qpid.default_mandatory","true");
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // publish to non-existent topic - should get mandatory failure
+
+ MessageProducer producer = session.createProducer(session.createTopic(getTestQueueName()));
+ Message message = session.createMessage();
+ producer.send(message);
+
+ JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
+ assertNotNull("JMSException is expected", exception);
+ AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
+ assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
+ Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
+ assertNotNull("Bounced Message is expected", bounceMessage);
+ assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+
+ // now set topic specific system property to false - should no longer get mandatory failure on new producer
+ setTestClientSystemProperty("qpid.default_mandatory_topic","false");
+ producer = session.createProducer(null);
+ message = session.createMessage();
+ producer.send(session.createTopic(getTestQueueName()), message);
+
+ exception = _exceptions.poll(1, TimeUnit.SECONDS);
+ if(exception != null)
+ {
+ exception.printStackTrace();
+ }
+ assertNull("Unexpected JMSException", exception);
+
+ }
+
public void onException(JMSException exception)
{
_exceptions.add(exception);