summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-28 09:19:15 +0000
committerKeith Wall <kwall@apache.org>2011-11-28 09:19:15 +0000
commit4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd (patch)
tree8f5a5c8e728615f6442f9e317518817f15a3ee74 /java/client
parent907330f70818a437f7a0723743ab98b355d80d67 (diff)
downloadqpid-python-4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd.tar.gz
QPID-3642,QPID-3643: Add Dead Letter Queue functionality for 0-8/0-9/0-9-1 paths, fixes isBound methods on FanoutExchange
Applied patch from Keith Wall <keith.wall@gmail.com>, Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1207029 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java64
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java32
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java66
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java104
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java54
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java64
13 files changed, 453 insertions, 52 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index ad7885f195..6879fe0cfd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -284,7 +284,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
- _logger.debug("AMQP version " + amqpVersion);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("AMQP version " + amqpVersion);
+ }
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
@@ -1485,4 +1488,5 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _lastFailoverTime;
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index f9a38138ba..1df809c67c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -75,6 +75,8 @@ public abstract class AMQDestination implements Destination, Referenceable
private boolean _exchangeExistsChecked;
+ private RejectBehaviour _rejectBehaviour;
+
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
@@ -227,6 +229,8 @@ public abstract class AMQDestination implements Destination, Referenceable
_queueName = binding.getQueueName() == null ? null : binding.getQueueName();
_routingKey = binding.getRoutingKey() == null ? null : binding.getRoutingKey();
_bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
+ final String rejectBehaviourValue = binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR);
+ _rejectBehaviour = rejectBehaviourValue == null ? null : RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase());
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName)
@@ -294,7 +298,7 @@ public abstract class AMQDestination implements Destination, Referenceable
_bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
_destSyntax = DestSyntax.BURL;
_browseOnly = browseOnly;
-
+ _rejectBehaviour = null;
if (_logger.isDebugEnabled())
{
_logger.debug("Based on " + toString() + " the selected destination syntax is " + _destSyntax);
@@ -499,6 +503,13 @@ public abstract class AMQDestination implements Destination, Referenceable
sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
}
+ if (_rejectBehaviour != null)
+ {
+ sb.append(BindingURL.OPTION_REJECT_BEHAVIOUR);
+ sb.append("='" + _rejectBehaviour + "'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
//removeKey the last char '?' if there is no options , ',' if there are.
sb.deleteCharAt(sb.length() - 1);
url = sb.toString();
@@ -842,4 +853,19 @@ public abstract class AMQDestination implements Destination, Referenceable
{
return _addressResolved.get() > time;
}
+
+ /**
+ * This option is only applicable for 0-8/0-9/0-9-1 protocols connection
+ * <p>
+ * It tells the client to delegate the requeue/DLQ decision to the
+ * server .If this option is not specified, the messages won't be moved to
+ * the DLQ (or dropped) when delivery count exceeds the maximum.
+ *
+ * @return destination reject behaviour
+ */
+ public RejectBehaviour getRejectBehaviour()
+ {
+ return _rejectBehaviour;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ef44221ec1..8984b7ca8c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -310,7 +310,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the highest received delivery tag. */
protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
-
+
+ /** Pre-fetched message tags */
+ protected ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
+
/** All the not yet acknowledged message tags */
protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
@@ -2925,11 +2928,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_producers.put(new Long(producerId), producer);
}
- private void rejectAllMessages(boolean requeue)
- {
- rejectMessagesForConsumerTag(0, requeue, true);
- }
-
/**
* @param consumerTag The consumerTag to prune from queue or all if null
* @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
@@ -3235,7 +3233,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
for (C consumer : _consumers.values())
{
List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
- _unacknowledgedMessageTags.addAll(tags);
+ _prefetchedMessageTags.addAll(tags);
}
setConnectionStopped(isStopped);
@@ -3345,7 +3343,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else if (_usingDispatcherForCleanup)
{
- _unacknowledgedMessageTags.add(deliveryTag);
+ _prefetchedMessageTags.add(deliveryTag);
}
else
{
@@ -3548,4 +3546,5 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.debug("Rollback mark is set to " + _rollbackMark.get());
}
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index e33410f5fe..96df463481 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
+import java.util.ArrayList;
import java.util.Map;
import javax.jms.Destination;
@@ -40,7 +41,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
@@ -62,7 +62,6 @@ import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareBody;
@@ -223,6 +222,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
public void sendRecover() throws AMQException, FailoverException
{
+ enforceRejectBehaviourDuringRecover();
+ _prefetchedMessageTags.clear();
_unacknowledgedMessageTags.clear();
if (isStrictAMQP())
@@ -259,6 +260,49 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
}
}
+ private void enforceRejectBehaviourDuringRecover()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags);
+ }
+ ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values());
+ boolean messageListenerFound = false;
+ boolean serverRejectBehaviourFound = false;
+ for(BasicMessageConsumer_0_8 consumer : consumersToCheck)
+ {
+ if (consumer.isMessageListenerSet())
+ {
+ messageListenerFound = true;
+ }
+ if (RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
+ {
+ serverRejectBehaviourFound = true;
+ }
+ }
+ _logger.debug("about to pre-reject messages for " + consumersToCheck.size() + " consumer(s)");
+
+ if (serverRejectBehaviourFound)
+ {
+ //reject(false) any messages we don't want returned again
+ switch(_acknowledgeMode)
+ {
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ case Session.AUTO_ACKNOWLEDGE:
+ if (!messageListenerFound)
+ {
+ break;
+ }
+ case Session.CLIENT_ACKNOWLEDGE:
+ for(Long tag : _unacknowledgedMessageTags)
+ {
+ rejectMessage(tag, false);
+ }
+ break;
+ }
+ }
+ }
+
public void releaseForRollback()
{
// Reject all the messages that have been received in this session and
@@ -267,6 +311,17 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
// Otherwise messages will be able to arrive out of order to a second
// consumer on the queue. Whilst this is within the JMS spec it is not
// user friendly and avoidable.
+ boolean normalRejectBehaviour = true;
+ for (BasicMessageConsumer_0_8 consumer : _consumers.values())
+ {
+ if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
+ {
+ normalRejectBehaviour = false;
+ //no need to consult other consumers now, found server behaviour.
+ break;
+ }
+ }
+
while (true)
{
Long tag = _deliveredMessageTags.poll();
@@ -275,13 +330,14 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
break;
}
- rejectMessage(tag, true);
+ rejectMessage(tag, normalRejectBehaviour);
}
}
public void rejectMessage(long deliveryTag, boolean requeue)
{
- if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
+ if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)||
+ ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
{
if (_logger.isDebugEnabled())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 7bb400fada..c6e5fbb019 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -147,7 +147,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
private List<StackTraceElement> _closedStack = null;
-
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -211,6 +210,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
_arguments = ft;
+
}
public AMQDestination getDestination()
@@ -814,31 +814,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
-
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- *
- * @return the lastDeliveryTag to acknowledge
- */
- Long getLastDelivered()
- {
- if (!_receivedDeliveryTags.isEmpty())
- {
- Long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- return lastDeliveryTag;
- }
-
- return null;
- }
-
void notifyError(Throwable cause)
{
// synchronized (_closed)
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cf1d7cedeb..efcbfd5532 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -28,7 +28,10 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.BindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +39,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final RejectBehaviour _rejectBehaviour;
+
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
@@ -55,6 +60,25 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
+ if (destination.getRejectBehaviour() != null)
+ {
+ _rejectBehaviour = destination.getRejectBehaviour();
+ }
+ else
+ {
+ ConnectionURL connectionURL = connection.getConnectionURL();
+ String rejectBehaviour = connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR);
+ if (rejectBehaviour != null)
+ {
+ _rejectBehaviour = RejectBehaviour.valueOf(rejectBehaviour.toUpperCase());
+ }
+ else
+ {
+ // use the default value for all connections, if not set
+ rejectBehaviour = System.getProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.NORMAL.toString());
+ _rejectBehaviour = RejectBehaviour.valueOf( rejectBehaviour.toUpperCase());
+ }
+ }
}
void sendCancel() throws AMQException, FailoverException
@@ -89,4 +113,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
}
+
+ public RejectBehaviour getRejectBehaviour()
+ {
+ return _rejectBehaviour;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java b/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java
new file mode 100644
index 0000000000..e3c958044e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/RejectBehaviour.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+/**
+ * This enum can be used only with for 0-8/0-9/0-9-1 protocols connections to notify
+ * the client to delegate the requeue/DLQ decision to the server
+ * if <code>SERVER</server> value is specified. Otherwise the messages won't be moved to
+ * the DLQ (or dropped) when delivery count exceeds the maximum.
+ */
+public enum RejectBehaviour
+{
+ NORMAL, SERVER;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 26641982d7..24d9360cfa 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -41,7 +41,16 @@ public interface ConnectionURL
public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
- public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+ public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+
+ /**
+ * This option is only applicable for 0-8/0-9/0-9-1 protocols connection
+ * <p>
+ * It tells the client to delegate the requeue/DLQ decision to the
+ * server .If this option is not specified, the messages won't be moved to
+ * the DLQ (or dropped) when delivery count exceeds the maximum.
+ */
+ public static final String OPTIONS_REJECT_BEHAVIOUR = "rejectbehaviour";
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
new file mode 100644
index 0000000000..3a565f0f0d
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQInvalidArgumentException;
+
+public class AMQConnectionUnitTest extends TestCase
+{
+
+ public void testExceptionReceived()
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+ AMQInvalidArgumentException expectedException = new AMQInvalidArgumentException("Test", null);
+ final AtomicReference<JMSException> receivedException = new AtomicReference<JMSException>();
+ try
+ {
+ MockAMQConnection connection = new MockAMQConnection(url);
+ connection.setExceptionListener(new ExceptionListener()
+ {
+
+ @Override
+ public void onException(JMSException jmsException)
+ {
+ receivedException.set(jmsException);
+ }
+ });
+ connection.exceptionReceived(expectedException);
+ }
+ catch (Exception e)
+ {
+ fail("Failure to test exceptionRecived:" + e.getMessage());
+ }
+ JMSException exception = receivedException.get();
+ assertNotNull("Expected JMSException but got null", exception);
+ assertEquals("JMSException error code is incorrect", Integer.toString(expectedException.getErrorCode().getCode()), exception.getErrorCode());
+ assertNotNull("Expected not null message for JMSException", exception.getMessage());
+ assertTrue("JMSException error message is incorrect", exception.getMessage().contains(expectedException.getMessage()));
+ assertEquals("JMSException linked exception is incorrect", expectedException, exception.getLinkedException());
+ }
+
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java b/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
new file mode 100644
index 0000000000..d8d94ba40e
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.Session;
+
+import org.apache.qpid.test.unit.message.TestAMQSession;
+import org.apache.qpid.url.AMQBindingURL;
+
+import junit.framework.TestCase;
+
+public class BasicMessageConsumer_0_8_Test extends TestCase
+{
+ /**
+ * Test that if there is a value for Reject Behaviour specified for the Destination
+ * used to create the Consumer, it overrides the value for the Connection.
+ */
+ public void testDestinationRejectBehaviourOverridesDefaultConnection() throws Exception
+ {
+ /*
+ * Check that when the connection does not have a value applied that this
+ * is successfully overridden with a specific value by the consumer.
+ */
+ String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+ AMQConnection conn = new MockAMQConnection(connUrlString);
+
+ String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'";
+ AMQBindingURL burl = new AMQBindingURL(url);
+ AMQDestination queue = new AMQQueue(burl);
+
+ AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
+ BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+
+ assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour());
+ }
+
+ /**
+ * Check that when the connection does have a specific value applied that this
+ * is successfully overridden with another specific value by the consumer.
+ */
+ public void testDestinationRejectBehaviourSpecified() throws Exception
+ {
+ final String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='server'";
+ final AMQConnection conn = new MockAMQConnection(connUrlString);
+
+ final String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='normal'";
+ final AMQBindingURL burl = new AMQBindingURL(url);
+ final AMQDestination queue = new AMQQueue(burl);
+
+ final AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
+ final BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+
+ assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
+ }
+
+ /**
+ * Test that if no value for Reject Behaviour is applied to the Destination, then the value
+ * from the connection is used and acts as expected.
+ */
+ public void testRejectBehaviourDetectedFromConnection() throws Exception
+ {
+ /*
+ * Check that when the connection does have a specific value applied that this
+ * is successfully detected by the consumer.
+ */
+ String connUrlString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='normal'";
+ AMQConnection conn = new MockAMQConnection(connUrlString);
+
+ String url = "exchangeClass://exchangeName/Destination/Queue";
+ AMQBindingURL burl = new AMQBindingURL(url);
+ AMQDestination queue = new AMQQueue(burl);
+
+ assertNull("Reject behaviour should have been null", queue.getRejectBehaviour());
+
+ AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
+ BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+
+ assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
+ }
+
+
+ protected RejectBehaviour getRejectBehaviour(AMQDestination destination)
+ {
+ return destination.getRejectBehaviour();
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
index 73e67469ae..919809edc3 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
@@ -55,4 +55,9 @@ public class MockAMQConnection extends AMQConnection
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN);
return null;
}
+
+ public AMQConnectionDelegate getDelegate()
+ {
+ return _delegate;
+ }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index 4624b36fea..5a5a3a0bd9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ public class ConnectionURLTest extends TestCase
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
- assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE));
+ assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE));
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals("bob"));
assertTrue(connectionurl.getVirtualHost().equals("/test"));
@@ -338,7 +338,7 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getPassword().equals("pass"));
assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getClientName().equals("client_id"));
-
+
assertTrue(connectionurl.getBrokerCount() == 1);
}
@@ -457,7 +457,6 @@ public class ConnectionURLTest extends TestCase
assertTrue(service.getTransport().equals("tcp"));
-
assertTrue(service.getHost().equals("localhost"));
assertTrue(service.getPort() == 5672);
assertEquals("jim",service.getProperty("foo"));
@@ -468,7 +467,7 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getOption("timeout").equals("200"));
assertTrue(connectionurl.getOption("immediatedelivery").equals("true"));
}
-
+
/**
* Test that options other than failover and brokerlist are returned in the string representation.
* <p>
@@ -477,7 +476,7 @@ public class ConnectionURLTest extends TestCase
public void testOptionToString() throws Exception
{
ConnectionURL url = new AMQConnectionURL("amqp://user:pass@temp/test?maxprefetch='12345'&brokerlist='tcp://localhost:5672'");
-
+
assertTrue("String representation should contain options and values", url.toString().contains("maxprefetch='12345'"));
}
@@ -493,10 +492,10 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getBrokerCount() == 1);
BrokerDetails service = connectionurl.getBrokerDetails(0);
- assertTrue(service.getTransport().equals("tcp"));
+ assertTrue(service.getTransport().equals("tcp"));
assertTrue(service.getHost().equals("under_score"));
assertTrue(service.getPort() == 6672);
-
+
url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score'";
connectionurl = new AMQConnectionURL(url);
@@ -507,11 +506,44 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getBrokerCount() == 1);
service = connectionurl.getBrokerDetails(0);
- assertTrue(service.getTransport().equals("tcp"));
+ assertTrue(service.getTransport().equals("tcp"));
assertTrue(service.getHost().equals("under_score"));
assertTrue(service.getPort() == 5672);
}
-
+
+
+ public void testRejectBehaviourPresent() throws Exception
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='server'";
+
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ assertTrue(connectionURL.getFailoverMethod() == null);
+ assertTrue(connectionURL.getUsername().equals("guest"));
+ assertTrue(connectionURL.getPassword().equals("guest"));
+ assertTrue(connectionURL.getVirtualHost().equals("/test"));
+
+ //check that the reject behaviour option is returned as expected
+ assertEquals("Reject behaviour option was not as expected", "server",
+ connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR));
+ }
+
+ public void testRejectBehaviourNotPresent() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getFailoverMethod() == null);
+ assertTrue(connectionurl.getUsername().equals("guest"));
+ assertTrue(connectionurl.getPassword().equals("guest"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+ //check that the reject behaviour option is null as expected
+ assertNull("Reject behaviour option was not as expected",
+ connectionurl.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR));
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ConnectionURLTest.class);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
index 7de09cff45..2c32e4c559 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
@@ -22,8 +22,11 @@ package org.apache.qpid.test.unit.client.destinationurl;
import junit.framework.TestCase;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.RejectBehaviour;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -190,6 +193,67 @@ public class DestinationURLTest extends TestCase
assertTrue(dest.getQueueName().equals("test:testQueueD"));
}
+ public void testRejectBehaviourPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+ assertTrue(burl.getExchangeClass().equals("exchangeClass"));
+ assertTrue(burl.getExchangeName().equals("exchangeName"));
+ assertTrue(burl.getDestinationName().equals("Destination"));
+ assertTrue(burl.getQueueName().equals("Queue"));
+
+ //check that the MaxDeliveryCount property has the right value
+ assertEquals("server",burl.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR));
+
+ //check that the MaxDeliveryCount value is correctly returned from an AMQDestination
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertEquals("Reject behaviour is unexpected", RejectBehaviour.SERVER, dest.getRejectBehaviour());
+ }
+
+ public void testRejectBehaviourNotPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertTrue(burl.getExchangeClass().equals("exchangeClass"));
+ assertTrue(burl.getExchangeName().equals("exchangeName"));
+ assertTrue(burl.getDestinationName().equals("Destination"));
+ assertTrue(burl.getQueueName().equals("Queue"));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertNull("Reject behaviour is unexpected", dest.getRejectBehaviour());
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DestinationURLTest.class);