diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-28 09:19:15 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-28 09:19:15 +0000 |
| commit | 4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd (patch) | |
| tree | 8f5a5c8e728615f6442f9e317518817f15a3ee74 /java/client | |
| parent | 907330f70818a437f7a0723743ab98b355d80d67 (diff) | |
| download | qpid-python-4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd.tar.gz | |
QPID-3642,QPID-3643: Add Dead Letter Queue functionality for 0-8/0-9/0-9-1 paths, fixes isBound methods on FanoutExchange
Applied patch from Keith Wall <keith.wall@gmail.com>, Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1207029 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
13 files changed, 453 insertions, 52 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ad7885f195..6879fe0cfd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -284,7 +284,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); - _logger.debug("AMQP version " + amqpVersion); + if (_logger.isDebugEnabled()) + { + _logger.debug("AMQP version " + amqpVersion); + } _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); @@ -1485,4 +1488,5 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _lastFailoverTime; } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index f9a38138ba..1df809c67c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -75,6 +75,8 @@ public abstract class AMQDestination implements Destination, Referenceable private boolean _exchangeExistsChecked; + private RejectBehaviour _rejectBehaviour; + public static final int QUEUE_TYPE = 1; public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; @@ -227,6 +229,8 @@ public abstract class AMQDestination implements Destination, Referenceable _queueName = binding.getQueueName() == null ? null : binding.getQueueName(); _routingKey = binding.getRoutingKey() == null ? null : binding.getRoutingKey(); _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys(); + final String rejectBehaviourValue = binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR); + _rejectBehaviour = rejectBehaviourValue == null ? null : RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase()); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName) @@ -294,7 +298,7 @@ public abstract class AMQDestination implements Destination, Referenceable _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys; _destSyntax = DestSyntax.BURL; _browseOnly = browseOnly; - + _rejectBehaviour = null; if (_logger.isDebugEnabled()) { _logger.debug("Based on " + toString() + " the selected destination syntax is " + _destSyntax); @@ -499,6 +503,13 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); } + if (_rejectBehaviour != null) + { + sb.append(BindingURL.OPTION_REJECT_BEHAVIOUR); + sb.append("='" + _rejectBehaviour + "'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + //removeKey the last char '?' if there is no options , ',' if there are. sb.deleteCharAt(sb.length() - 1); url = sb.toString(); @@ -842,4 +853,19 @@ public abstract class AMQDestination implements Destination, Referenceable { return _addressResolved.get() > time; } + + /** + * This option is only applicable for 0-8/0-9/0-9-1 protocols connection + * <p> + * It tells the client to delegate the requeue/DLQ decision to the + * server .If this option is not specified, the messages won't be moved to + * the DLQ (or dropped) when delivery count exceeds the maximum. + * + * @return destination reject behaviour + */ + public RejectBehaviour getRejectBehaviour() + { + return _rejectBehaviour; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ef44221ec1..8984b7ca8c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -310,7 +310,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the highest received delivery tag. */ protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); - + + /** Pre-fetched message tags */ + protected ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>(); + /** All the not yet acknowledged message tags */ protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); @@ -2925,11 +2928,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _producers.put(new Long(producerId), producer); } - private void rejectAllMessages(boolean requeue) - { - rejectMessagesForConsumerTag(0, requeue, true); - } - /** * @param consumerTag The consumerTag to prune from queue or all if null * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) @@ -3235,7 +3233,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic for (C consumer : _consumers.values()) { List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); - _unacknowledgedMessageTags.addAll(tags); + _prefetchedMessageTags.addAll(tags); } setConnectionStopped(isStopped); @@ -3345,7 +3343,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else if (_usingDispatcherForCleanup) { - _unacknowledgedMessageTags.add(deliveryTag); + _prefetchedMessageTags.add(deliveryTag); } else { @@ -3548,4 +3546,5 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.debug("Rollback mark is set to " + _rollbackMark.get()); } } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index e33410f5fe..96df463481 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; +import java.util.ArrayList; import java.util.Map; import javax.jms.Destination; @@ -40,7 +41,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; @@ -62,7 +62,6 @@ import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.framing.QueueDeclareBody; @@ -223,6 +222,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B public void sendRecover() throws AMQException, FailoverException { + enforceRejectBehaviourDuringRecover(); + _prefetchedMessageTags.clear(); _unacknowledgedMessageTags.clear(); if (isStrictAMQP()) @@ -259,6 +260,49 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B } } + private void enforceRejectBehaviourDuringRecover() + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags); + } + ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values()); + boolean messageListenerFound = false; + boolean serverRejectBehaviourFound = false; + for(BasicMessageConsumer_0_8 consumer : consumersToCheck) + { + if (consumer.isMessageListenerSet()) + { + messageListenerFound = true; + } + if (RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour())) + { + serverRejectBehaviourFound = true; + } + } + _logger.debug("about to pre-reject messages for " + consumersToCheck.size() + " consumer(s)"); + + if (serverRejectBehaviourFound) + { + //reject(false) any messages we don't want returned again + switch(_acknowledgeMode) + { + case Session.DUPS_OK_ACKNOWLEDGE: + case Session.AUTO_ACKNOWLEDGE: + if (!messageListenerFound) + { + break; + } + case Session.CLIENT_ACKNOWLEDGE: + for(Long tag : _unacknowledgedMessageTags) + { + rejectMessage(tag, false); + } + break; + } + } + } + public void releaseForRollback() { // Reject all the messages that have been received in this session and @@ -267,6 +311,17 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B // Otherwise messages will be able to arrive out of order to a second // consumer on the queue. Whilst this is within the JMS spec it is not // user friendly and avoidable. + boolean normalRejectBehaviour = true; + for (BasicMessageConsumer_0_8 consumer : _consumers.values()) + { + if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour())) + { + normalRejectBehaviour = false; + //no need to consult other consumers now, found server behaviour. + break; + } + } + while (true) { Long tag = _deliveredMessageTags.poll(); @@ -275,13 +330,14 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B break; } - rejectMessage(tag, true); + rejectMessage(tag, normalRejectBehaviour); } } public void rejectMessage(long deliveryTag, boolean requeue) { - if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) + if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)|| + ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners())) { if (_logger.isDebugEnabled()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 7bb400fada..c6e5fbb019 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -147,7 +147,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa private List<StackTraceElement> _closedStack = null; - protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, @@ -211,6 +210,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); _arguments = ft; + } public AMQDestination getDestination() @@ -814,31 +814,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } } - - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - * - * @return the lastDeliveryTag to acknowledge - */ - Long getLastDelivered() - { - if (!_receivedDeliveryTags.isEmpty()) - { - Long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - return lastDeliveryTag; - } - - return null; - } - void notifyError(Throwable cause) { // synchronized (_closed) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index cf1d7cedeb..efcbfd5532 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -28,7 +28,10 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.*; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.BindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +39,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final RejectBehaviour _rejectBehaviour; + protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow, @@ -55,6 +60,25 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } + if (destination.getRejectBehaviour() != null) + { + _rejectBehaviour = destination.getRejectBehaviour(); + } + else + { + ConnectionURL connectionURL = connection.getConnectionURL(); + String rejectBehaviour = connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR); + if (rejectBehaviour != null) + { + _rejectBehaviour = RejectBehaviour.valueOf(rejectBehaviour.toUpperCase()); + } + else + { + // use the default value for all connections, if not set + rejectBehaviour = System.getProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.NORMAL.toString()); + _rejectBehaviour = RejectBehaviour.valueOf( rejectBehaviour.toUpperCase()); + } + } } void sendCancel() throws AMQException, FailoverException @@ -89,4 +113,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { } + + public RejectBehaviour getRejectBehaviour() + { + return _rejectBehaviour; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java b/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java new file mode 100644 index 0000000000..e3c958044e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +/** + * This enum can be used only with for 0-8/0-9/0-9-1 protocols connections to notify + * the client to delegate the requeue/DLQ decision to the server + * if <code>SERVER</server> value is specified. Otherwise the messages won't be moved to + * the DLQ (or dropped) when delivery count exceeds the maximum. + */ +public enum RejectBehaviour +{ + NORMAL, SERVER; +} diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 26641982d7..24d9360cfa 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -41,7 +41,16 @@ public interface ConnectionURL public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format"; public static final String OPTIONS_BROKERLIST = "brokerlist"; public static final String OPTIONS_FAILOVER = "failover"; - public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; + public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; + + /** + * This option is only applicable for 0-8/0-9/0-9-1 protocols connection + * <p> + * It tells the client to delegate the requeue/DLQ decision to the + * server .If this option is not specified, the messages won't be moved to + * the DLQ (or dropped) when delivery count exceeds the maximum. + */ + public static final String OPTIONS_REJECT_BEHAVIOUR = "rejectbehaviour"; public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange"; public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java new file mode 100644 index 0000000000..3a565f0f0d --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQInvalidArgumentException; + +public class AMQConnectionUnitTest extends TestCase +{ + + public void testExceptionReceived() + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'"; + AMQInvalidArgumentException expectedException = new AMQInvalidArgumentException("Test", null); + final AtomicReference<JMSException> receivedException = new AtomicReference<JMSException>(); + try + { + MockAMQConnection connection = new MockAMQConnection(url); + connection.setExceptionListener(new ExceptionListener() + { + + @Override + public void onException(JMSException jmsException) + { + receivedException.set(jmsException); + } + }); + connection.exceptionReceived(expectedException); + } + catch (Exception e) + { + fail("Failure to test exceptionRecived:" + e.getMessage()); + } + JMSException exception = receivedException.get(); + assertNotNull("Expected JMSException but got null", exception); + assertEquals("JMSException error code is incorrect", Integer.toString(expectedException.getErrorCode().getCode()), exception.getErrorCode()); + assertNotNull("Expected not null message for JMSException", exception.getMessage()); + assertTrue("JMSException error message is incorrect", exception.getMessage().contains(expectedException.getMessage())); + assertEquals("JMSException linked exception is incorrect", expectedException, exception.getLinkedException()); + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java b/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java new file mode 100644 index 0000000000..d8d94ba40e --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import javax.jms.Session; + +import org.apache.qpid.test.unit.message.TestAMQSession; +import org.apache.qpid.url.AMQBindingURL; + +import junit.framework.TestCase; + +public class BasicMessageConsumer_0_8_Test extends TestCase +{ + /** + * Test that if there is a value for Reject Behaviour specified for the Destination + * used to create the Consumer, it overrides the value for the Connection. + */ + public void testDestinationRejectBehaviourOverridesDefaultConnection() throws Exception + { + /* + * Check that when the connection does not have a value applied that this + * is successfully overridden with a specific value by the consumer. + */ + String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'"; + AMQConnection conn = new MockAMQConnection(connUrlString); + + String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'"; + AMQBindingURL burl = new AMQBindingURL(url); + AMQDestination queue = new AMQQueue(burl); + + AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn); + BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + + assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour()); + } + + /** + * Check that when the connection does have a specific value applied that this + * is successfully overridden with another specific value by the consumer. + */ + public void testDestinationRejectBehaviourSpecified() throws Exception + { + final String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='server'"; + final AMQConnection conn = new MockAMQConnection(connUrlString); + + final String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='normal'"; + final AMQBindingURL burl = new AMQBindingURL(url); + final AMQDestination queue = new AMQQueue(burl); + + final AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn); + final BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + + assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour()); + } + + /** + * Test that if no value for Reject Behaviour is applied to the Destination, then the value + * from the connection is used and acts as expected. + */ + public void testRejectBehaviourDetectedFromConnection() throws Exception + { + /* + * Check that when the connection does have a specific value applied that this + * is successfully detected by the consumer. + */ + String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='normal'"; + AMQConnection conn = new MockAMQConnection(connUrlString); + + String url = "exchangeClass://exchangeName/Destination/Queue"; + AMQBindingURL burl = new AMQBindingURL(url); + AMQDestination queue = new AMQQueue(burl); + + assertNull("Reject behaviour should have been null", queue.getRejectBehaviour()); + + AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn); + BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + + assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour()); + } + + + protected RejectBehaviour getRejectBehaviour(AMQDestination destination) + { + return destination.getRejectBehaviour(); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java index 73e67469ae..919809edc3 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java +++ b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -55,4 +55,9 @@ public class MockAMQConnection extends AMQConnection _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN); return null; } + + public AMQConnectionDelegate getDelegate() + { + return _delegate; + } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 4624b36fea..5a5a3a0bd9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -38,7 +38,7 @@ public class ConnectionURLTest extends TestCase ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod().equals("roundrobin")); - assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE)); + assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE)); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("bob")); assertTrue(connectionurl.getVirtualHost().equals("/test")); @@ -338,7 +338,7 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getPassword().equals("pass")); assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getClientName().equals("client_id")); - + assertTrue(connectionurl.getBrokerCount() == 1); } @@ -457,7 +457,6 @@ public class ConnectionURLTest extends TestCase assertTrue(service.getTransport().equals("tcp")); - assertTrue(service.getHost().equals("localhost")); assertTrue(service.getPort() == 5672); assertEquals("jim",service.getProperty("foo")); @@ -468,7 +467,7 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getOption("timeout").equals("200")); assertTrue(connectionurl.getOption("immediatedelivery").equals("true")); } - + /** * Test that options other than failover and brokerlist are returned in the string representation. * <p> @@ -477,7 +476,7 @@ public class ConnectionURLTest extends TestCase public void testOptionToString() throws Exception { ConnectionURL url = new AMQConnectionURL("amqp://user:pass@temp/test?maxprefetch='12345'&brokerlist='tcp://localhost:5672'"); - + assertTrue("String representation should contain options and values", url.toString().contains("maxprefetch='12345'")); } @@ -493,10 +492,10 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getBrokerCount() == 1); BrokerDetails service = connectionurl.getBrokerDetails(0); - assertTrue(service.getTransport().equals("tcp")); + assertTrue(service.getTransport().equals("tcp")); assertTrue(service.getHost().equals("under_score")); assertTrue(service.getPort() == 6672); - + url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score'"; connectionurl = new AMQConnectionURL(url); @@ -507,11 +506,44 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getBrokerCount() == 1); service = connectionurl.getBrokerDetails(0); - assertTrue(service.getTransport().equals("tcp")); + assertTrue(service.getTransport().equals("tcp")); assertTrue(service.getHost().equals("under_score")); assertTrue(service.getPort() == 5672); } - + + + public void testRejectBehaviourPresent() throws Exception + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='server'"; + + ConnectionURL connectionURL = new AMQConnectionURL(url); + + assertTrue(connectionURL.getFailoverMethod() == null); + assertTrue(connectionURL.getUsername().equals("guest")); + assertTrue(connectionURL.getPassword().equals("guest")); + assertTrue(connectionURL.getVirtualHost().equals("/test")); + + //check that the reject behaviour option is returned as expected + assertEquals("Reject behaviour option was not as expected", "server", + connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR)); + } + + public void testRejectBehaviourNotPresent() throws URLSyntaxException + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'"; + + ConnectionURL connectionurl = new AMQConnectionURL(url); + + assertTrue(connectionurl.getFailoverMethod() == null); + assertTrue(connectionurl.getUsername().equals("guest")); + assertTrue(connectionurl.getPassword().equals("guest")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); + + //check that the reject behaviour option is null as expected + assertNull("Reject behaviour option was not as expected", + connectionurl.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR)); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ConnectionURLTest.class); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java index 7de09cff45..2c32e4c559 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java @@ -22,8 +22,11 @@ package org.apache.qpid.test.unit.client.destinationurl; import junit.framework.TestCase; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.RejectBehaviour; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -190,6 +193,67 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getQueueName().equals("test:testQueueD")); } + public void testRejectBehaviourPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + assertTrue(burl.getExchangeClass().equals("exchangeClass")); + assertTrue(burl.getExchangeName().equals("exchangeName")); + assertTrue(burl.getDestinationName().equals("Destination")); + assertTrue(burl.getQueueName().equals("Queue")); + + //check that the MaxDeliveryCount property has the right value + assertEquals("server",burl.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR)); + + //check that the MaxDeliveryCount value is correctly returned from an AMQDestination + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertEquals("Reject behaviour is unexpected", RejectBehaviour.SERVER, dest.getRejectBehaviour()); + } + + public void testRejectBehaviourNotPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + + assertTrue(burl.getExchangeClass().equals("exchangeClass")); + assertTrue(burl.getExchangeName().equals("exchangeName")); + assertTrue(burl.getDestinationName().equals("Destination")); + assertTrue(burl.getQueueName().equals("Queue")); + + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertNull("Reject behaviour is unexpected", dest.getRejectBehaviour()); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(DestinationURLTest.class); |
