diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-10 14:14:46 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-10 14:14:46 +0000 |
| commit | 87139b266b3817be517db38092ffcc5b2f7c7277 (patch) | |
| tree | ff7576ba53e65ae9f1f3f9deac73e4b227f7e302 /java | |
| parent | 93417080e6d1a7c611ce80c188c52dd4af0a253b (diff) | |
| download | qpid-python-87139b266b3817be517db38092ffcc5b2f7c7277.tar.gz | |
QPID-3539: NoLocal should occur at the connection level and not the session level (0-10). Also fixed typing issue that prevented Java Broker understanding the no-local queue argument when encoded as String (as sent by Python tests).
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1200335 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 188 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index c5d6bc203c..6598d07a84 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -56,6 +56,7 @@ import org.apache.qpid.transport.MessageFlowMode; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.Session; import org.apache.qpid.transport.Struct; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -209,13 +210,13 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return false; } - - - if (_noLocal - && (entry.getMessage() instanceof MessageTransferMessage) - && ((MessageTransferMessage)entry.getMessage()).getSession() == _session) + if (_noLocal && entry.getMessage() instanceof MessageTransferMessage) { - return false; + Session messageSession= ((MessageTransferMessage)entry.getMessage()).getSession(); + if (messageSession != null && messageSession.getConnection() == _session.getConnection()) + { + return false; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index c87919b478..754a233907 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -100,6 +100,11 @@ public class ServerSessionDelegate extends SessionDelegate { private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class); + /** + * No-local queue argument is used to support the no-local feature of Durable Subscribers. + */ + private static final String QUEUE_ARGUMENT_NO_LOCAL = "no-local"; + public ServerSessionDelegate() { @@ -963,13 +968,10 @@ public class ServerSessionDelegate extends SessionDelegate if(method.hasArguments() && method.getArguments() != null) { - if(method.getArguments().containsKey("no-local")) + if(method.getArguments().containsKey(QUEUE_ARGUMENT_NO_LOCAL)) { - Object no_local = method.getArguments().get("no-local"); - if(no_local instanceof Boolean && ((Boolean)no_local)) - { - queue.setNoLocal(true); - } + Object noLocal = method.getArguments().get(QUEUE_ARGUMENT_NO_LOCAL); + queue.setNoLocal(convertBooleanValue(noLocal)); } } @@ -1079,6 +1081,30 @@ public class ServerSessionDelegate extends SessionDelegate } } + /** + * Converts a queue argument into a boolean value. For compatibility with the C++ + * and the clients, accepts with Boolean, String, or Number types. + * @param argValue argument value. + * + * @return true if set + */ + private boolean convertBooleanValue(Object argValue) + { + if(argValue instanceof Boolean && ((Boolean)argValue)) + { + return true; + } + else if (argValue instanceof String && Boolean.parseBoolean((String)argValue)) + { + return true; + } + else if (argValue instanceof Number && ((Number)argValue).intValue() != 0) + { + return true; + } + return false; + } + protected AMQQueue createQueue(final String queueName, final QueueDeclare body, VirtualHost virtualHost, diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 7e5edef38d..c625849694 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -47,6 +47,7 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.FieldTableSupport; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; +import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; @@ -737,7 +738,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic Map<String,Object> arguments = new HashMap<String,Object>(); if (noLocal) { - arguments.put("no-local", true); + arguments.put(AddressHelper.NO_LOCAL, true); } getQpidSession().queueDeclare(queueName.toString(), "" , arguments, diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 9ea116ae53..d3d9cf2984 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -841,4 +841,147 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase e.printStackTrace(); } } + + /** + * Tests that a subscriber created on a same <i>session</i> as producer with + * no local true does not receive messages. + */ + public void testNoLocalOnSameSession() throws Exception + { + Connection connection = getConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTestQueueName()); + MessageProducer producer = session.createProducer(topic); + TopicSubscriber subscriber = null; + try + { + subscriber = session.createDurableSubscriber(topic, getTestName(), null, true); + connection.start(); + + producer.send(createNextMessage(session, 1)); + + Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertNull("Unexpected message received", m); + } + finally + { + session.unsubscribe(getTestName()); + } + } + + + /** + * Tests that a subscriber created on a same <i>connection</i> but separate + * <i>sessionM</i> as producer with no local true does not receive messages. + */ + public void testNoLocalOnSameConnection() throws Exception + { + Connection connection = getConnection(); + + Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = consumerSession.createTopic(getTestQueueName()); + MessageProducer producer = producerSession.createProducer(topic); + + TopicSubscriber subscriber = null; + try + { + subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true); + connection.start(); + + producer.send(createNextMessage(producerSession, 1)); + + Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertNull("Unexpected message received", m); + } + finally + { + consumerSession.unsubscribe(getTestName()); + } + } + + /** + * Tests that if no-local is in use, that the messages are delivered when + * the client reconnects. + * + * Currently fails on the Java Broker due to QPID-3605. + */ + public void testNoLocalMessagesNotDeliveredAfterReconnection() throws Exception + { + Connection connection = getConnection(); + + Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = consumerSession.createTopic(getTestQueueName()); + MessageProducer producer = producerSession.createProducer(topic); + + TopicSubscriber subscriber = null; + try + { + subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true); + connection.start(); + + producer.send(createNextMessage(producerSession, 1)); + + Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertNull("Unexpected message received", m); + + connection.close(); + + connection = getConnection(); + + consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true); + connection.start(); + m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertNull("Message should not be received on a new connection", m); + } + finally + { + consumerSession.unsubscribe(getTestName()); + } + } + + /** + * Tests that messages are delivered normally to a subscriber on a separate connection despite + * the use of durable subscriber with no-local on the first connection. + */ + public void testNoLocalSubscriberAndSubscriberOnSeparateConnection() throws Exception + { + Connection noLocalConnection = getConnection(); + Connection connection = getConnection(); + + String noLocalSubId1 = getTestName() + "subId1"; + String subId = getTestName() + "subId2"; + + Session noLocalSession = noLocalConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic noLocalTopic = noLocalSession.createTopic(getTestQueueName()); + + Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = consumerSession.createTopic(getTestQueueName()); + + TopicSubscriber noLocalSubscriber = null; + TopicSubscriber subscriber = null; + try + { + MessageProducer producer = noLocalSession.createProducer(noLocalTopic); + noLocalSubscriber = noLocalSession.createDurableSubscriber(noLocalTopic, noLocalSubId1, null, true); + subscriber = consumerSession.createDurableSubscriber(topic, subId, null, true); + noLocalConnection.start(); + connection.start(); + + producer.send(createNextMessage(noLocalSession, 1)); + + Message m1 = noLocalSubscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertNull("Subscriber on nolocal connection should not receive message", m1); + + Message m2 = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertNotNull("Subscriber on non-nolocal connection should receive message", m2); + } + finally + { + noLocalSession.unsubscribe(noLocalSubId1); + consumerSession.unsubscribe(subId); + } + } } diff --git a/java/test-profiles/JavaExcludes b/java/test-profiles/JavaExcludes index 8de36cbd9a..023534ed26 100644 --- a/java/test-profiles/JavaExcludes +++ b/java/test-profiles/JavaExcludes @@ -43,3 +43,6 @@ org.apache.qpid.jms.xa.XAResourceTest#* //The Java broker doesnt support client auth org.apache.qpid.client.ssl.SSLTest#testMultipleCertsInSingleStore + +//QPID-3605 Durable subscriber with no-local true receives messages on re-connection +org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testNoLocalMessagesNotDeliveredAfterReconnection diff --git a/java/test-profiles/python_tests/Java010PythonExcludes b/java/test-profiles/python_tests/Java010PythonExcludes index 10e6298634..7d9782d0ff 100644 --- a/java/test-profiles/python_tests/Java010PythonExcludes +++ b/java/test-profiles/python_tests/Java010PythonExcludes @@ -69,10 +69,8 @@ qpid_tests.broker_0_10.message.MessageTests.test_window_flow_bytes #QPID-3592 Fails to receive more messages after restart qpid_tests.broker_0_10.message.MessageTests.test_window_stop -#QPID-3539 Tests fail because is incorrectly being done per session and not connection -qpid_tests.broker_0_10.message.MessageTests.test_no_local +#QPID-3605 Durable subscriber with no-local true receives messages on re-connection qpid_tests.broker_0_10.message.MessageTests.test_no_local_awkward -qpid_tests.broker_0_10.message.MessageTests.test_no_local_exclusive_subscribe #QPID-3593 Priority Queue test failures qpid_tests.broker_0_10.priority.PriorityTests.test_browsing |
