summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-09-13 13:55:42 +0000
committerRobert Gemmell <robbie@apache.org>2011-09-13 13:55:42 +0000
commita1aec7ac41539186a093ac750ce925ca5d9aca81 (patch)
treea6928a1926147cce613738cefbe6e68d6616e784 /java/client/src
parent15e875512cf43b6e8bdeb65740554aed9841afe7 (diff)
downloadqpid-python-a1aec7ac41539186a093ac750ce925ca5d9aca81.tar.gz
QPID-3448: catch exceptions from the underlying Transport/Session/Connection and rethrow as a JMSException like users are expecting
Applied patch by Oleksandr Rudyy <orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1170182 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java121
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java35
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java765
8 files changed, 953 insertions, 48 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 25562cfff7..e0da1ef41f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -97,7 +97,10 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -606,8 +609,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
*
* @throws IllegalStateException If the session is closed.
+ * @throws JMSException if there is a problem during acknowledge process.
*/
- public void acknowledge() throws IllegalStateException
+ public void acknowledge() throws IllegalStateException, JMSException
{
if (isClosed())
{
@@ -625,7 +629,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
break;
}
- acknowledgeMessage(tag, false);
+
+ try
+ {
+ acknowledgeMessage(tag, false);
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
+ }
}
}
@@ -763,6 +775,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.debug(
"Got FailoverException during channel close, ignored as channel already marked as closed.");
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Error closing session:" + e.getMessage(), e);
+ }
finally
{
_connection.deregisterSession(_channelId);
@@ -874,6 +890,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
+ }
}
public abstract void sendCommit() throws AMQException, FailoverException;
@@ -1071,6 +1091,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
ex.setLinkedException(e);
throw ex;
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Error when verifying destination", e);
+ }
}
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
@@ -1156,6 +1180,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return subscriber;
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
+ }
finally
{
_subscriberDetails.unlock();
@@ -1405,7 +1433,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkNotClosed();
- // return (QueueSender) createProducer(queue);
return new QueueSenderAdapter(createProducer(queue), queue);
}
@@ -1442,7 +1469,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
Topic dest = checkValidTopic(topic);
- // AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
}
@@ -1727,13 +1753,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Ensure that the session is not transacted.
checkNotTransacted();
- // flush any acks we are holding in the buffer.
- flushAcknowledgments();
-
- // this is set only here, and the before the consumer's onMessage is called it is set to false
- _inRecovery = true;
+
try
{
+ // flush any acks we are holding in the buffer.
+ flushAcknowledgments();
+
+ // this is set only here, and the before the consumer's onMessage is called it is set to false
+ _inRecovery = true;
boolean isSuspended = isSuspended();
@@ -1769,7 +1796,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
}
-
+ catch(TransportException e)
+ {
+ throw toJMSException("Recover failed: " + e.getMessage(), e);
+ }
}
protected abstract void sendRecover() throws AMQException, FailoverException;
@@ -1854,6 +1884,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Failure to rollback:" + e.getMessage(), e);
+ }
}
}
@@ -1900,7 +1934,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
public void unsubscribe(String name) throws JMSException
{
- unsubscribe(name, false);
+ try
+ {
+ unsubscribe(name, false);
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
+ }
}
/**
@@ -2021,8 +2062,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// argument, as specifying null for the arguments when querying means they should not be checked at all
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
- C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
- noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ C consumer;
+ try
+ {
+ consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+ noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ }
+ catch(TransportException e)
+ {
+ throw toJMSException("Exception while creating consumer: " + e.getMessage(), e);
+ }
if (_messageListener != null)
{
@@ -2059,7 +2108,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
ex.initCause(e);
throw ex;
}
-
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while registering consumer:" + e.getMessage(), e);
+ }
return consumer;
}
}, _connection).execute();
@@ -2601,8 +2653,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkNotClosed();
long producerId = getNextProducerId();
- P producer = createMessageProducer(destination, mandatory,
- immediate, waitUntilSent, producerId);
+
+ P producer;
+ try
+ {
+ producer = createMessageProducer(destination, mandatory,
+ immediate, waitUntilSent, producerId);
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while creating producer:" + e.getMessage(), e);
+ }
+
registerProducer(producerId, producer);
return producer;
@@ -3009,6 +3071,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e);
}
+ catch (TransportException e)
+ {
+ throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
+ }
}
}
@@ -3486,4 +3552,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
return DECLARE_EXCHANGES;
}
+
+ JMSException toJMSException(String message, TransportException e)
+ {
+ int code = getErrorCode(e);
+ JMSException jmse = new JMSException(message, Integer.toString(code));
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ return jmse;
+ }
+
+ private int getErrorCode(TransportException e)
+ {
+ int code = AMQConstant.INTERNAL_ERROR.getCode();
+ if (e instanceof SessionException)
+ {
+ SessionException se = (SessionException) e;
+ if(se.getException() != null && se.getException().getErrorCode() != null)
+ {
+ code = se.getException().getErrorCode().getValue();
+ }
+ }
+ return code;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index b5868cd235..bfbb9f7148 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -72,6 +72,7 @@ import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -548,7 +549,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
- throws JMSException
{
boolean res;
ExchangeBoundResult bindingQueryResult =
@@ -692,6 +692,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
throw ex;
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Exception while creating message producer:" + e.getMessage(), e);
+ }
}
@@ -994,7 +998,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- @Override public void commit() throws JMSException
+ @Override
+ public void commit() throws JMSException
{
checkTransacted();
try
@@ -1007,12 +1012,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
sendCommit();
}
- catch(SessionException e)
+ catch(TransportException e)
{
- JMSException ex = new JMSException("Session exception occured while trying to commit");
- ex.initCause(e);
- ex.setLinkedException(e);
- throw ex;
+ throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
}
catch (AMQException e)
{
@@ -1383,5 +1385,5 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sb.append(">");
return sb.toString();
}
-
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 5d32863f2f..754055ad98 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -27,6 +27,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -419,6 +420,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return null;
}
+ catch(TransportException e)
+ {
+ throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
+ }
finally
{
releaseReceiving();
@@ -489,6 +494,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return null;
}
+ catch(TransportException e)
+ {
+ throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
+ }
finally
{
releaseReceiving();
@@ -582,6 +591,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
+ catch (TransportException e)
+ {
+ throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
+ }
}
}
else
@@ -775,7 +788,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
- void postDeliver(AbstractJMSMessage msg) throws JMSException
+ void postDeliver(AbstractJMSMessage msg)
{
switch (_acknowledgeMode)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 964c238946..47da59724c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -23,9 +23,7 @@ import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
@@ -365,21 +363,28 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- if (messageListener != null && capacity == 0)
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
- }
- if (messageListener != null && !_synchronousQueue.isEmpty())
+ try
{
- Iterator messages=_synchronousQueue.iterator();
- while (messages.hasNext())
+ if (messageListener != null && capacity == 0)
{
- AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
- messages.remove();
- _session.rejectMessage(message, true);
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
+ if (messageListener != null && !_synchronousQueue.isEmpty())
+ {
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
+ {
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
+ }
+ }
+ }
+ catch(TransportException e)
+ {
+ throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
}
}
@@ -443,7 +448,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return o;
}
- void postDeliver(AbstractJMSMessage msg) throws JMSException
+ void postDeliver(AbstractJMSMessage msg)
{
super.postDeliver(msg);
if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 8756ac4d05..2bfca025b2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -39,6 +39,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
@@ -266,7 +267,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
return _destination;
}
- public void close()
+ public void close() throws JMSException
{
_closed.set(true);
_session.deregisterProducer(_producerId);
@@ -498,7 +499,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
message.setJMSMessageID(messageId);
}
- sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+ try
+ {
+ sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+ }
+ catch (TransportException e)
+ {
+ throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e);
+ }
if (message != origMessage)
{
@@ -596,6 +604,13 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
public boolean isBound(AMQDestination destination) throws JMSException
{
- return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
+ try
+ {
+ return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
+ }
+ catch (TransportException e)
+ {
+ throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index d739903ee6..1fa5c1003f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -37,7 +37,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -47,6 +46,7 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -246,14 +246,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
}
-
+ @Override
public boolean isBound(AMQDestination destination) throws JMSException
{
return _session.isQueueBound(destination);
}
@Override
- public void close()
+ public void close() throws JMSException
{
super.close();
AMQDestination dest = _destination;
@@ -262,10 +262,18 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.SENDER )
{
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
+ try
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
_destination.getQueueName());
+ }
+ catch(TransportException e)
+ {
+ throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ }
}
}
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 1c2c46cf51..43b3b85641 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -47,7 +47,6 @@ import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Message;
-import org.apache.qpid.messaging.Address;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.ExchangeQueryResult;
import org.apache.qpid.transport.Future;
@@ -56,6 +55,7 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -342,6 +342,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
e.setLinkedException(ex);
throw e;
}
+ catch (TransportException e)
+ {
+ JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage());
+ jmse.initCause(e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+
}
final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
new file mode 100644
index 0000000000..ea55419144
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -0,0 +1,765 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Connection.SessionFactory;
+import org.apache.qpid.transport.Connection.State;
+import org.apache.qpid.transport.ExchangeBound;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.ExchangeDeclare;
+import org.apache.qpid.transport.ExchangeDelete;
+import org.apache.qpid.transport.ExchangeQuery;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.ExecutionResult;
+import org.apache.qpid.transport.ExecutionSync;
+import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.MessageCancel;
+import org.apache.qpid.transport.MessageFlow;
+import org.apache.qpid.transport.MessageRelease;
+import org.apache.qpid.transport.MessageSubscribe;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.QueueDelete;
+import org.apache.qpid.transport.QueueQuery;
+import org.apache.qpid.transport.QueueQueryResult;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionAttach;
+import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.SessionDetach;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionRequestTimeout;
+import org.apache.qpid.transport.TxCommit;
+import org.apache.qpid.transport.TxRollback;
+import org.apache.qpid.transport.TxSelect;
+
+/**
+ * Tests AMQSession_0_10 methods.
+ * <p>
+ * The main purpose of the tests in this test suite is to check that
+ * {@link SessionException} is not thrown from methods of
+ * {@link AMQSession_0_10}.
+ */
+public class AMQSession_0_10Test extends TestCase
+{
+
+ public void testExceptionOnCommit()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ try
+ {
+ session.commit();
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
+ }
+ }
+
+ public void testExceptionOnCreateMessageProducer()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ try
+ {
+ session.createMessageProducer(createDestination(), true, true, true, 1l);
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected but got:" + e, e instanceof JMSException);
+ assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
+ }
+ }
+
+ public void testExceptionOnRollback()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ try
+ {
+ session.rollback();
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ }
+ }
+
+ public void testExceptionOnRecover()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(javax.jms.Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ session.recover();
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ }
+ }
+
+ public void testExceptionOnCreateBrowser()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ AMQQueue destination = createQueue();
+ try
+ {
+ session.createBrowser(destination);
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
+ }
+ }
+
+ public void testExceptionOnCreateConsumer()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ AMQAnyDestination destination = createDestination();
+ try
+ {
+ session.createConsumer(destination);
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
+ }
+ }
+
+ public void testExceptionOnCreateSubscriber()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ AMQAnyDestination destination = createDestination();
+ try
+ {
+ session.createSubscriber(destination);
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
+ }
+ }
+
+ public void testExceptionOnUnsubscribe()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ try
+ {
+ session.unsubscribe("whatever");
+ fail("JMSExceptiuon should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
+ }
+ }
+
+ public void testCommit()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, TxCommit.class, false);
+ assertNotNull("TxCommit was not sent", event);
+ }
+
+ public void testRollback()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ session.rollback();
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, TxRollback.class, false);
+ assertNotNull("TxRollback was not sent", event);
+ }
+
+ public void testRecover()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10(javax.jms.Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ session.recover();
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
+ assertNotNull("MessageRelease was not sent", event);
+ }
+
+ public void testCreateProducer()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ session.createProducer(createQueue());
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false);
+ assertNotNull("ExchangeDeclare was not sent", event);
+ }
+
+ public void testCreateConsumer()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ session.createConsumer(createQueue());
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, MessageSubscribe.class, false);
+ assertNotNull("MessageSubscribe was not sent", event);
+ }
+
+ public void testSync()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ session.sync();
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, ExecutionSync.class, false);
+ assertNotNull("ExecutionSync was not sent", event);
+ }
+
+ public void testRejectMessage()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ session.rejectMessage(1l, true);
+ ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
+ assertNotNull("MessageRelease event was not sent", event);
+ }
+
+ public void testReleaseForRollback()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ session.releaseForRollback();
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
+ assertNotNull("MessageRelease event was not sent", event);
+ }
+
+ public void testSendQueueDelete()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ session.sendQueueDelete(new AMQShortString("test"));
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, QueueDelete.class, false);
+ assertNotNull("QueueDelete event was not sent", event);
+ QueueDelete exchangeDelete = (QueueDelete) event;
+ assertEquals("test", exchangeDelete.getQueue());
+ }
+
+ public void testSendConsume()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
+ null, new FieldTable(), false, true);
+ session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1);
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, MessageSubscribe.class, false);
+ assertNotNull("MessageSubscribe event was not sent", event);
+ }
+
+ public void testCreateMessageProducer()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ session.createMessageProducer(createDestination(), true, true, true, 1l);
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false);
+ assertNotNull("ExchangeDeclare event was not sent", event);
+ }
+
+ public void testSendExchangeDelete()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ session.sendExchangeDelete("test", true);
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDelete.class, false);
+ assertNotNull("ExchangeDelete event was not sent", event);
+ ExchangeDelete exchangeDelete = (ExchangeDelete) event;
+ assertEquals("test", exchangeDelete.getExchange());
+ }
+
+ public void testExceptionOnMessageConsumerReceive()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ try
+ {
+ BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
+ null, new FieldTable(), false, true);
+ session.start();
+ consumer.receive(1);
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
+ }
+ }
+
+ public void testMessageConsumerReceive()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
+ null, new FieldTable(), false, true);
+ session.start();
+ consumer.receive(1);
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, MessageFlow.class, false);
+ assertNotNull("MessageFlow event was not sent", event);
+ }
+
+ public void testExceptionOnMessageConsumerReceiveNoWait()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ try
+ {
+ BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
+ null, new FieldTable(), false, true);
+ session.start();
+ consumer.receiveNoWait();
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
+ }
+ }
+
+ public void testExceptionOnMessageConsumerSetMessageListener()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ try
+ {
+ BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
+ null, new FieldTable(), false, true);
+ consumer.setMessageListener(new MockMessageListener());
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
+ }
+ }
+
+ public void testMessageConsumerSetMessageListener()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
+ null, new FieldTable(), false, true);
+ consumer.setMessageListener(new MockMessageListener());
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, MessageFlow.class, false);
+ assertNotNull("MessageFlow event was not sent", event);
+ }
+
+ public void testMessageConsumerClose()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
+ null, new FieldTable(), false, true);
+ consumer.close();
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, MessageCancel.class, false);
+ assertNotNull("MessageCancel event was not sent", event);
+ }
+
+ public void testExceptionOnMessageConsumerClose()
+ {
+ AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
+ try
+ {
+ BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
+ null, new FieldTable(), false, true);
+ consumer.close();
+ fail("JMSException should be thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("JMSException is expected", e instanceof JMSException);
+ assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
+ }
+ }
+
+ public void testMessageProducerSend()
+ {
+ AMQSession_0_10 session = createAMQSession_0_10();
+ try
+ {
+ MessageProducer producer = session.createProducer(createQueue());
+ producer.send(session.createTextMessage("Test"));
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent event = findSentProtocolEventOfClass(session, MessageTransfer.class, false);
+ assertNotNull("MessageTransfer event was not sent", event);
+ event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false);
+ assertNotNull("ExchangeDeclare event was not sent", event);
+ }
+
+ private AMQAnyDestination createDestination()
+ {
+ AMQAnyDestination destination = null;
+ try
+ {
+ destination = new AMQAnyDestination(new AMQShortString("amq.direct"), new AMQShortString("direct"),
+ new AMQShortString("test"), false, true, new AMQShortString("test"), true, null);
+ }
+ catch (Exception e)
+ {
+ fail("Failued to create destination:" + e.getMessage());
+ }
+ return destination;
+ }
+
+ private AMQQueue createQueue()
+ {
+ AMQQueue destination = null;
+ try
+ {
+ destination = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString("test"),
+ new AMQShortString("test"));
+ }
+ catch (Exception e)
+ {
+ fail("Failued to create destination:" + e.getMessage());
+ }
+ return destination;
+ }
+
+ private AMQSession_0_10 createThrowingExceptionAMQSession_0_10()
+ {
+ return createAMQSession_0_10(true, javax.jms.Session.SESSION_TRANSACTED);
+ }
+
+ private AMQSession_0_10 createThrowingExceptionAMQSession_0_10(int akcnowledgeMode)
+ {
+ return createAMQSession_0_10(true, akcnowledgeMode);
+ }
+
+ private ProtocolEvent findSentProtocolEventOfClass(AMQSession_0_10 session, Class<? extends ProtocolEvent> class1,
+ boolean isLast)
+ {
+ ProtocolEvent found = null;
+ List<ProtocolEvent> events = ((MockSession) session.getQpidSession()).getSender().getSendEvents();
+ assertNotNull("Events list should not be null", events);
+ assertFalse("Events list should not be empty", events.isEmpty());
+ if (isLast)
+ {
+ ProtocolEvent event = events.get(events.size() - 1);
+ if (event.getClass().isAssignableFrom(class1))
+ {
+ found = event;
+ }
+ }
+ else
+ {
+ for (ProtocolEvent protocolEvent : events)
+ {
+ if (protocolEvent.getClass().isAssignableFrom(class1))
+ {
+ found = protocolEvent;
+ break;
+ }
+ }
+
+ }
+ return found;
+ }
+
+ private AMQSession_0_10 createAMQSession_0_10()
+ {
+ return createAMQSession_0_10(false, javax.jms.Session.SESSION_TRANSACTED);
+ }
+
+ private AMQSession_0_10 createAMQSession_0_10(int acknowledgeMode)
+ {
+ return createAMQSession_0_10(false, acknowledgeMode);
+ }
+
+ private AMQSession_0_10 createAMQSession_0_10(boolean throwException, int acknowledgeMode)
+ {
+ AMQConnection amqConnection = null;
+ try
+ {
+ amqConnection = new MockAMQConnection(
+ "amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'&maxprefetch='0'");
+ }
+ catch (Exception e)
+ {
+ fail("Failure to create a mock connection:" + e.getMessage());
+ }
+ boolean isTransacted = acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED ? true : false;
+ AMQSession_0_10 session = new AMQSession_0_10(createConnection(throwException), amqConnection, 1, isTransacted, acknowledgeMode,
+ 1, 1, "test");
+ return session;
+ }
+
+ private Connection createConnection(final boolean throwException)
+ {
+ MockTransportConnection connection = new MockTransportConnection();
+ connection.setState(State.OPEN);
+ connection.setSender(new MockSender());
+ connection.setSessionFactory(new SessionFactory()
+ {
+
+ @Override
+ public Session newSession(Connection conn, Binary name, long expiry)
+ {
+ return new MockSession(conn, new SessionDelegate(), name, expiry, throwException);
+ }
+ });
+ return connection;
+ }
+
+ private final class MockMessageListener implements MessageListener
+ {
+ @Override
+ public void onMessage(Message arg0)
+ {
+ }
+ }
+
+ class MockSession extends Session
+ {
+ private final boolean _throwException;
+ private final Connection _connection;
+ private final SessionDelegate _delegate;
+
+ protected MockSession(Connection connection, SessionDelegate delegate, Binary name, long expiry,
+ boolean throwException)
+ {
+ super(connection, delegate, name, expiry);
+ _throwException = throwException;
+ setState(State.OPEN);
+ _connection = connection;
+ _delegate = delegate;
+ }
+
+ public void invoke(Method m, Runnable postIdSettingAction)
+ {
+ if (_throwException)
+ {
+ if (m instanceof SessionAttach || m instanceof SessionRequestTimeout || m instanceof TxSelect)
+ {
+ // do not throw exception for SessionAttach,
+ // SessionRequestTimeout and TxSelect
+ // session needs to be instantiated
+ return;
+ }
+ ExecutionException e = new ExecutionException();
+ e.setErrorCode(ExecutionErrorCode.INTERNAL_ERROR);
+ throw new SessionException(e);
+ }
+ else
+ {
+ super.invoke(m, postIdSettingAction);
+ if (m instanceof SessionDetach)
+ {
+ setState(State.CLOSED);
+ }
+ }
+ }
+
+ public void sync()
+ {
+ // to avoid recursive calls
+ setAutoSync(false);
+ // simply send sync command
+ super.executionSync(Option.SYNC);
+ }
+
+ protected <T> Future<T> invoke(Method m, Class<T> klass)
+ {
+ int commandId = getCommandsOut();
+ Future<T> future = super.invoke(m, klass);
+ ExecutionResult result = new ExecutionResult();
+ result.setCommandId(commandId);
+ if (m instanceof ExchangeBound)
+ {
+ ExchangeBoundResult struc = new ExchangeBoundResult();
+ struc.setQueueNotFound(true);
+ result.setValue(struc);
+ }
+ else if (m instanceof ExchangeQuery)
+ {
+ ExchangeQueryResult struc = new ExchangeQueryResult();
+ result.setValue(struc);
+ }
+ else if (m instanceof QueueQuery)
+ {
+ QueueQueryResult struc = new QueueQueryResult();
+ result.setValue(struc);
+ }
+ _delegate.executionResult(this, result);
+ return future;
+ }
+
+ public MockSender getSender()
+ {
+ return (MockSender) _connection.getSender();
+ }
+ }
+
+ class MockTransportConnection extends Connection
+ {
+ public void setState(State state)
+ {
+ super.setState(state);
+ }
+ }
+
+ class MockSender implements Sender<ProtocolEvent>
+ {
+ private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>();
+
+ @Override
+ public void setIdleTimeout(int i)
+ {
+ }
+
+ @Override
+ public void send(ProtocolEvent msg)
+ {
+ _sendEvents.add(msg);
+ }
+
+ @Override
+ public void flush()
+ {
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ public List<ProtocolEvent> getSendEvents()
+ {
+ return _sendEvents;
+ }
+
+ }
+
+}