diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-09-13 13:55:42 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-09-13 13:55:42 +0000 |
| commit | a1aec7ac41539186a093ac750ce925ca5d9aca81 (patch) | |
| tree | a6928a1926147cce613738cefbe6e68d6616e784 /java/client/src | |
| parent | 15e875512cf43b6e8bdeb65740554aed9841afe7 (diff) | |
| download | qpid-python-a1aec7ac41539186a093ac750ce925ca5d9aca81.tar.gz | |
QPID-3448: catch exceptions from the underlying Transport/Session/Connection and rethrow as a JMSException like users are expecting
Applied patch by Oleksandr Rudyy <orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1170182 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
8 files changed, 953 insertions, 48 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 25562cfff7..e0da1ef41f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -97,7 +97,10 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -606,8 +609,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Acknowledges all unacknowledged messages on the session, for all message consumers on the session. * * @throws IllegalStateException If the session is closed. + * @throws JMSException if there is a problem during acknowledge process. */ - public void acknowledge() throws IllegalStateException + public void acknowledge() throws IllegalStateException, JMSException { if (isClosed()) { @@ -625,7 +629,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { break; } - acknowledgeMessage(tag, false); + + try + { + acknowledgeMessage(tag, false); + } + catch (TransportException e) + { + throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e); + } } } @@ -763,6 +775,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.debug( "Got FailoverException during channel close, ignored as channel already marked as closed."); } + catch (TransportException e) + { + throw toJMSException("Error closing session:" + e.getMessage(), e); + } finally { _connection.deregisterSession(_channelId); @@ -874,6 +890,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); } + catch(TransportException e) + { + throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e); + } } public abstract void sendCommit() throws AMQException, FailoverException; @@ -1071,6 +1091,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ex.setLinkedException(e); throw ex; } + catch(TransportException e) + { + throw toJMSException("Error when verifying destination", e); + } } String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; @@ -1156,6 +1180,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return subscriber; } + catch (TransportException e) + { + throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e); + } finally { _subscriberDetails.unlock(); @@ -1405,7 +1433,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkNotClosed(); - // return (QueueSender) createProducer(queue); return new QueueSenderAdapter(createProducer(queue), queue); } @@ -1442,7 +1469,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkNotClosed(); Topic dest = checkValidTopic(topic); - // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); } @@ -1727,13 +1753,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Ensure that the session is not transacted. checkNotTransacted(); - // flush any acks we are holding in the buffer. - flushAcknowledgments(); - - // this is set only here, and the before the consumer's onMessage is called it is set to false - _inRecovery = true; + try { + // flush any acks we are holding in the buffer. + flushAcknowledgments(); + + // this is set only here, and the before the consumer's onMessage is called it is set to false + _inRecovery = true; boolean isSuspended = isSuspended(); @@ -1769,7 +1796,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e); } - + catch(TransportException e) + { + throw toJMSException("Recover failed: " + e.getMessage(), e); + } } protected abstract void sendRecover() throws AMQException, FailoverException; @@ -1854,6 +1884,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e); } + catch (TransportException e) + { + throw toJMSException("Failure to rollback:" + e.getMessage(), e); + } } } @@ -1900,7 +1934,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public void unsubscribe(String name) throws JMSException { - unsubscribe(name, false); + try + { + unsubscribe(name, false); + } + catch (TransportException e) + { + throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e); + } } /** @@ -2021,8 +2062,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // argument, as specifying null for the arguments when querying means they should not be checked at all ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); - C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, - noLocal, exclusive, messageSelector, ft, noConsume, autoClose); + C consumer; + try + { + consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, + noLocal, exclusive, messageSelector, ft, noConsume, autoClose); + } + catch(TransportException e) + { + throw toJMSException("Exception while creating consumer: " + e.getMessage(), e); + } if (_messageListener != null) { @@ -2059,7 +2108,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ex.initCause(e); throw ex; } - + catch (TransportException e) + { + throw toJMSException("Exception while registering consumer:" + e.getMessage(), e); + } return consumer; } }, _connection).execute(); @@ -2601,8 +2653,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkNotClosed(); long producerId = getNextProducerId(); - P producer = createMessageProducer(destination, mandatory, - immediate, waitUntilSent, producerId); + + P producer; + try + { + producer = createMessageProducer(destination, mandatory, + immediate, waitUntilSent, producerId); + } + catch (TransportException e) + { + throw toJMSException("Exception while creating producer:" + e.getMessage(), e); + } + registerProducer(producerId, producer); return producer; @@ -3009,6 +3071,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e); } + catch (TransportException e) + { + throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e); + } } } @@ -3486,4 +3552,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { return DECLARE_EXCHANGES; } + + JMSException toJMSException(String message, TransportException e) + { + int code = getErrorCode(e); + JMSException jmse = new JMSException(message, Integer.toString(code)); + jmse.setLinkedException(e); + jmse.initCause(e); + return jmse; + } + + private int getErrorCode(TransportException e) + { + int code = AMQConstant.INTERNAL_ERROR.getCode(); + if (e instanceof SessionException) + { + SessionException se = (SessionException) e; + if(se.getException() != null && se.getException().getErrorCode() != null) + { + code = se.getException().getErrorCode().getValue(); + } + } + return code; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index b5868cd235..bfbb9f7148 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -72,6 +72,7 @@ import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Serial; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -548,7 +549,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) - throws JMSException { boolean res; ExchangeBoundResult bindingQueryResult = @@ -692,6 +692,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic throw ex; } + catch(TransportException e) + { + throw toJMSException("Exception while creating message producer:" + e.getMessage(), e); + } } @@ -994,7 +998,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - @Override public void commit() throws JMSException + @Override + public void commit() throws JMSException { checkTransacted(); try @@ -1007,12 +1012,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } sendCommit(); } - catch(SessionException e) + catch(TransportException e) { - JMSException ex = new JMSException("Session exception occured while trying to commit"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; + throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e); } catch (AMQException e) { @@ -1383,5 +1385,5 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sb.append(">"); return sb.toString(); } - + } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 5d32863f2f..754055ad98 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -419,6 +420,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa return null; } + catch(TransportException e) + { + throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e); + } finally { releaseReceiving(); @@ -489,6 +494,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa return null; } + catch(TransportException e) + { + throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e); + } finally { releaseReceiving(); @@ -582,6 +591,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { throw new JMSAMQException("FailoverException interrupted basic cancel.", e); } + catch (TransportException e) + { + throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); + } } } else @@ -775,7 +788,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } - void postDeliver(AbstractJMSMessage msg) throws JMSException + void postDeliver(AbstractJMSMessage msg) { switch (_acknowledgeMode) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 964c238946..47da59724c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -23,9 +23,7 @@ import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; @@ -365,21 +363,28 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); - if (messageListener != null && capacity == 0) - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); - } - if (messageListener != null && !_synchronousQueue.isEmpty()) + try { - Iterator messages=_synchronousQueue.iterator(); - while (messages.hasNext()) + if (messageListener != null && capacity == 0) { - AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); - messages.remove(); - _session.rejectMessage(message, true); + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } + if (messageListener != null && !_synchronousQueue.isEmpty()) + { + Iterator messages=_synchronousQueue.iterator(); + while (messages.hasNext()) + { + AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); + messages.remove(); + _session.rejectMessage(message, true); + } + } + } + catch(TransportException e) + { + throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e); } } @@ -443,7 +448,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return o; } - void postDeliver(AbstractJMSMessage msg) throws JMSException + void postDeliver(AbstractJMSMessage msg) { super.postDeliver(msg); if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 8756ac4d05..2bfca025b2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -39,6 +39,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; import org.slf4j.Logger; @@ -266,7 +267,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac return _destination; } - public void close() + public void close() throws JMSException { _closed.set(true); _session.deregisterProducer(_producerId); @@ -498,7 +499,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac message.setJMSMessageID(messageId); } - sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait); + try + { + sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait); + } + catch (TransportException e) + { + throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e); + } if (message != origMessage) { @@ -596,6 +604,13 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac public boolean isBound(AMQDestination destination) throws JMSException { - return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey()); + try + { + return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey()); + } + catch (TransportException e) + { + throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index d739903ee6..1fa5c1003f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -37,7 +37,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.messaging.address.Link.Reliability; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -47,6 +46,7 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -246,14 +246,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } } - + @Override public boolean isBound(AMQDestination destination) throws JMSException { return _session.isQueueBound(destination); } @Override - public void close() + public void close() throws JMSException { super.close(); AMQDestination dest = _destination; @@ -262,10 +262,18 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.SENDER ) { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( + try + { + ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( _destination.getQueueName()); + } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } } } } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 1c2c46cf51..43b3b85641 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -47,7 +47,6 @@ import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Message; -import org.apache.qpid.messaging.Address; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.ExchangeQueryResult; import org.apache.qpid.transport.Future; @@ -56,6 +55,7 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.ReplyTo; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -342,6 +342,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate e.setLinkedException(ex); throw e; } + catch (TransportException e) + { + JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage()); + jmse.initCause(e); + jmse.setLinkedException(e); + throw jmse; + } + } final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString()); diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java new file mode 100644 index 0000000000..ea55419144 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -0,0 +1,765 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; + +import junit.framework.TestCase; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Connection.SessionFactory; +import org.apache.qpid.transport.Connection.State; +import org.apache.qpid.transport.ExchangeBound; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.ExchangeDeclare; +import org.apache.qpid.transport.ExchangeDelete; +import org.apache.qpid.transport.ExchangeQuery; +import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.ExecutionResult; +import org.apache.qpid.transport.ExecutionSync; +import org.apache.qpid.transport.Future; +import org.apache.qpid.transport.MessageCancel; +import org.apache.qpid.transport.MessageFlow; +import org.apache.qpid.transport.MessageRelease; +import org.apache.qpid.transport.MessageSubscribe; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.QueueDelete; +import org.apache.qpid.transport.QueueQuery; +import org.apache.qpid.transport.QueueQueryResult; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionAttach; +import org.apache.qpid.transport.SessionDelegate; +import org.apache.qpid.transport.SessionDetach; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionRequestTimeout; +import org.apache.qpid.transport.TxCommit; +import org.apache.qpid.transport.TxRollback; +import org.apache.qpid.transport.TxSelect; + +/** + * Tests AMQSession_0_10 methods. + * <p> + * The main purpose of the tests in this test suite is to check that + * {@link SessionException} is not thrown from methods of + * {@link AMQSession_0_10}. + */ +public class AMQSession_0_10Test extends TestCase +{ + + public void testExceptionOnCommit() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + try + { + session.commit(); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); + } + } + + public void testExceptionOnCreateMessageProducer() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + try + { + session.createMessageProducer(createDestination(), true, true, true, 1l); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected but got:" + e, e instanceof JMSException); + assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); + } + } + + public void testExceptionOnRollback() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + try + { + session.rollback(); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + } + } + + public void testExceptionOnRecover() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(javax.jms.Session.AUTO_ACKNOWLEDGE); + try + { + session.recover(); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + } + } + + public void testExceptionOnCreateBrowser() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + AMQQueue destination = createQueue(); + try + { + session.createBrowser(destination); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); + } + } + + public void testExceptionOnCreateConsumer() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + AMQAnyDestination destination = createDestination(); + try + { + session.createConsumer(destination); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); + } + } + + public void testExceptionOnCreateSubscriber() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + AMQAnyDestination destination = createDestination(); + try + { + session.createSubscriber(destination); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); + } + } + + public void testExceptionOnUnsubscribe() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + try + { + session.unsubscribe("whatever"); + fail("JMSExceptiuon should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); + } + } + + public void testCommit() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + session.commit(); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, TxCommit.class, false); + assertNotNull("TxCommit was not sent", event); + } + + public void testRollback() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + session.rollback(); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, TxRollback.class, false); + assertNotNull("TxRollback was not sent", event); + } + + public void testRecover() + { + AMQSession_0_10 session = createAMQSession_0_10(javax.jms.Session.AUTO_ACKNOWLEDGE); + try + { + session.recover(); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false); + assertNotNull("MessageRelease was not sent", event); + } + + public void testCreateProducer() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + session.createProducer(createQueue()); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false); + assertNotNull("ExchangeDeclare was not sent", event); + } + + public void testCreateConsumer() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + session.createConsumer(createQueue()); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, MessageSubscribe.class, false); + assertNotNull("MessageSubscribe was not sent", event); + } + + public void testSync() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + session.sync(); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, ExecutionSync.class, false); + assertNotNull("ExecutionSync was not sent", event); + } + + public void testRejectMessage() + { + AMQSession_0_10 session = createAMQSession_0_10(); + session.rejectMessage(1l, true); + ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false); + assertNotNull("MessageRelease event was not sent", event); + } + + public void testReleaseForRollback() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + session.releaseForRollback(); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false); + assertNotNull("MessageRelease event was not sent", event); + } + + public void testSendQueueDelete() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + session.sendQueueDelete(new AMQShortString("test")); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, QueueDelete.class, false); + assertNotNull("QueueDelete event was not sent", event); + QueueDelete exchangeDelete = (QueueDelete) event; + assertEquals("test", exchangeDelete.getQueue()); + } + + public void testSendConsume() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, + null, new FieldTable(), false, true); + session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, MessageSubscribe.class, false); + assertNotNull("MessageSubscribe event was not sent", event); + } + + public void testCreateMessageProducer() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + session.createMessageProducer(createDestination(), true, true, true, 1l); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false); + assertNotNull("ExchangeDeclare event was not sent", event); + } + + public void testSendExchangeDelete() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + session.sendExchangeDelete("test", true); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDelete.class, false); + assertNotNull("ExchangeDelete event was not sent", event); + ExchangeDelete exchangeDelete = (ExchangeDelete) event; + assertEquals("test", exchangeDelete.getExchange()); + } + + public void testExceptionOnMessageConsumerReceive() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + try + { + BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, + null, new FieldTable(), false, true); + session.start(); + consumer.receive(1); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); + } + } + + public void testMessageConsumerReceive() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, + null, new FieldTable(), false, true); + session.start(); + consumer.receive(1); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, MessageFlow.class, false); + assertNotNull("MessageFlow event was not sent", event); + } + + public void testExceptionOnMessageConsumerReceiveNoWait() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + try + { + BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, + null, new FieldTable(), false, true); + session.start(); + consumer.receiveNoWait(); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); + } + } + + public void testExceptionOnMessageConsumerSetMessageListener() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + try + { + BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, + null, new FieldTable(), false, true); + consumer.setMessageListener(new MockMessageListener()); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); + } + } + + public void testMessageConsumerSetMessageListener() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, + null, new FieldTable(), false, true); + consumer.setMessageListener(new MockMessageListener()); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, MessageFlow.class, false); + assertNotNull("MessageFlow event was not sent", event); + } + + public void testMessageConsumerClose() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, + null, new FieldTable(), false, true); + consumer.close(); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, MessageCancel.class, false); + assertNotNull("MessageCancel event was not sent", event); + } + + public void testExceptionOnMessageConsumerClose() + { + AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); + try + { + BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, + null, new FieldTable(), false, true); + consumer.close(); + fail("JMSException should be thrown"); + } + catch (Exception e) + { + assertTrue("JMSException is expected", e instanceof JMSException); + assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); + } + } + + public void testMessageProducerSend() + { + AMQSession_0_10 session = createAMQSession_0_10(); + try + { + MessageProducer producer = session.createProducer(createQueue()); + producer.send(session.createTextMessage("Test")); + session.commit(); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent event = findSentProtocolEventOfClass(session, MessageTransfer.class, false); + assertNotNull("MessageTransfer event was not sent", event); + event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false); + assertNotNull("ExchangeDeclare event was not sent", event); + } + + private AMQAnyDestination createDestination() + { + AMQAnyDestination destination = null; + try + { + destination = new AMQAnyDestination(new AMQShortString("amq.direct"), new AMQShortString("direct"), + new AMQShortString("test"), false, true, new AMQShortString("test"), true, null); + } + catch (Exception e) + { + fail("Failued to create destination:" + e.getMessage()); + } + return destination; + } + + private AMQQueue createQueue() + { + AMQQueue destination = null; + try + { + destination = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString("test"), + new AMQShortString("test")); + } + catch (Exception e) + { + fail("Failued to create destination:" + e.getMessage()); + } + return destination; + } + + private AMQSession_0_10 createThrowingExceptionAMQSession_0_10() + { + return createAMQSession_0_10(true, javax.jms.Session.SESSION_TRANSACTED); + } + + private AMQSession_0_10 createThrowingExceptionAMQSession_0_10(int akcnowledgeMode) + { + return createAMQSession_0_10(true, akcnowledgeMode); + } + + private ProtocolEvent findSentProtocolEventOfClass(AMQSession_0_10 session, Class<? extends ProtocolEvent> class1, + boolean isLast) + { + ProtocolEvent found = null; + List<ProtocolEvent> events = ((MockSession) session.getQpidSession()).getSender().getSendEvents(); + assertNotNull("Events list should not be null", events); + assertFalse("Events list should not be empty", events.isEmpty()); + if (isLast) + { + ProtocolEvent event = events.get(events.size() - 1); + if (event.getClass().isAssignableFrom(class1)) + { + found = event; + } + } + else + { + for (ProtocolEvent protocolEvent : events) + { + if (protocolEvent.getClass().isAssignableFrom(class1)) + { + found = protocolEvent; + break; + } + } + + } + return found; + } + + private AMQSession_0_10 createAMQSession_0_10() + { + return createAMQSession_0_10(false, javax.jms.Session.SESSION_TRANSACTED); + } + + private AMQSession_0_10 createAMQSession_0_10(int acknowledgeMode) + { + return createAMQSession_0_10(false, acknowledgeMode); + } + + private AMQSession_0_10 createAMQSession_0_10(boolean throwException, int acknowledgeMode) + { + AMQConnection amqConnection = null; + try + { + amqConnection = new MockAMQConnection( + "amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'&maxprefetch='0'"); + } + catch (Exception e) + { + fail("Failure to create a mock connection:" + e.getMessage()); + } + boolean isTransacted = acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED ? true : false; + AMQSession_0_10 session = new AMQSession_0_10(createConnection(throwException), amqConnection, 1, isTransacted, acknowledgeMode, + 1, 1, "test"); + return session; + } + + private Connection createConnection(final boolean throwException) + { + MockTransportConnection connection = new MockTransportConnection(); + connection.setState(State.OPEN); + connection.setSender(new MockSender()); + connection.setSessionFactory(new SessionFactory() + { + + @Override + public Session newSession(Connection conn, Binary name, long expiry) + { + return new MockSession(conn, new SessionDelegate(), name, expiry, throwException); + } + }); + return connection; + } + + private final class MockMessageListener implements MessageListener + { + @Override + public void onMessage(Message arg0) + { + } + } + + class MockSession extends Session + { + private final boolean _throwException; + private final Connection _connection; + private final SessionDelegate _delegate; + + protected MockSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, + boolean throwException) + { + super(connection, delegate, name, expiry); + _throwException = throwException; + setState(State.OPEN); + _connection = connection; + _delegate = delegate; + } + + public void invoke(Method m, Runnable postIdSettingAction) + { + if (_throwException) + { + if (m instanceof SessionAttach || m instanceof SessionRequestTimeout || m instanceof TxSelect) + { + // do not throw exception for SessionAttach, + // SessionRequestTimeout and TxSelect + // session needs to be instantiated + return; + } + ExecutionException e = new ExecutionException(); + e.setErrorCode(ExecutionErrorCode.INTERNAL_ERROR); + throw new SessionException(e); + } + else + { + super.invoke(m, postIdSettingAction); + if (m instanceof SessionDetach) + { + setState(State.CLOSED); + } + } + } + + public void sync() + { + // to avoid recursive calls + setAutoSync(false); + // simply send sync command + super.executionSync(Option.SYNC); + } + + protected <T> Future<T> invoke(Method m, Class<T> klass) + { + int commandId = getCommandsOut(); + Future<T> future = super.invoke(m, klass); + ExecutionResult result = new ExecutionResult(); + result.setCommandId(commandId); + if (m instanceof ExchangeBound) + { + ExchangeBoundResult struc = new ExchangeBoundResult(); + struc.setQueueNotFound(true); + result.setValue(struc); + } + else if (m instanceof ExchangeQuery) + { + ExchangeQueryResult struc = new ExchangeQueryResult(); + result.setValue(struc); + } + else if (m instanceof QueueQuery) + { + QueueQueryResult struc = new QueueQueryResult(); + result.setValue(struc); + } + _delegate.executionResult(this, result); + return future; + } + + public MockSender getSender() + { + return (MockSender) _connection.getSender(); + } + } + + class MockTransportConnection extends Connection + { + public void setState(State state) + { + super.setState(state); + } + } + + class MockSender implements Sender<ProtocolEvent> + { + private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>(); + + @Override + public void setIdleTimeout(int i) + { + } + + @Override + public void send(ProtocolEvent msg) + { + _sendEvents.add(msg); + } + + @Override + public void flush() + { + } + + @Override + public void close() + { + } + + public List<ProtocolEvent> getSendEvents() + { + return _sendEvents; + } + + } + +} |
