summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-10 14:14:46 +0000
committerKeith Wall <kwall@apache.org>2011-11-10 14:14:46 +0000
commit87139b266b3817be517db38092ffcc5b2f7c7277 (patch)
treeff7576ba53e65ae9f1f3f9deac73e4b227f7e302 /java
parent93417080e6d1a7c611ce80c188c52dd4af0a253b (diff)
downloadqpid-python-87139b266b3817be517db38092ffcc5b2f7c7277.tar.gz
QPID-3539: NoLocal should occur at the connection level and not the session level (0-10). Also fixed typing issue that prevented Java Broker understanding the no-local queue argument when encoded as String (as sent by Python tests).
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1200335 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java143
-rw-r--r--java/test-profiles/JavaExcludes3
-rw-r--r--java/test-profiles/python_tests/Java010PythonExcludes4
6 files changed, 188 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index c5d6bc203c..6598d07a84 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -56,6 +56,7 @@ import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -209,13 +210,13 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return false;
}
-
-
- if (_noLocal
- && (entry.getMessage() instanceof MessageTransferMessage)
- && ((MessageTransferMessage)entry.getMessage()).getSession() == _session)
+ if (_noLocal && entry.getMessage() instanceof MessageTransferMessage)
{
- return false;
+ Session messageSession= ((MessageTransferMessage)entry.getMessage()).getSession();
+ if (messageSession != null && messageSession.getConnection() == _session.getConnection())
+ {
+ return false;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index c87919b478..754a233907 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -100,6 +100,11 @@ public class ServerSessionDelegate extends SessionDelegate
{
private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
+ /**
+ * No-local queue argument is used to support the no-local feature of Durable Subscribers.
+ */
+ private static final String QUEUE_ARGUMENT_NO_LOCAL = "no-local";
+
public ServerSessionDelegate()
{
@@ -963,13 +968,10 @@ public class ServerSessionDelegate extends SessionDelegate
if(method.hasArguments() && method.getArguments() != null)
{
- if(method.getArguments().containsKey("no-local"))
+ if(method.getArguments().containsKey(QUEUE_ARGUMENT_NO_LOCAL))
{
- Object no_local = method.getArguments().get("no-local");
- if(no_local instanceof Boolean && ((Boolean)no_local))
- {
- queue.setNoLocal(true);
- }
+ Object noLocal = method.getArguments().get(QUEUE_ARGUMENT_NO_LOCAL);
+ queue.setNoLocal(convertBooleanValue(noLocal));
}
}
@@ -1079,6 +1081,30 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
+ /**
+ * Converts a queue argument into a boolean value. For compatibility with the C++
+ * and the clients, accepts with Boolean, String, or Number types.
+ * @param argValue argument value.
+ *
+ * @return true if set
+ */
+ private boolean convertBooleanValue(Object argValue)
+ {
+ if(argValue instanceof Boolean && ((Boolean)argValue))
+ {
+ return true;
+ }
+ else if (argValue instanceof String && Boolean.parseBoolean((String)argValue))
+ {
+ return true;
+ }
+ else if (argValue instanceof Number && ((Number)argValue).intValue() != 0)
+ {
+ return true;
+ }
+ return false;
+ }
+
protected AMQQueue createQueue(final String queueName,
final QueueDeclare body,
VirtualHost virtualHost,
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 7e5edef38d..c625849694 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -47,6 +47,7 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.FieldTableSupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
@@ -737,7 +738,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
Map<String,Object> arguments = new HashMap<String,Object>();
if (noLocal)
{
- arguments.put("no-local", true);
+ arguments.put(AddressHelper.NO_LOCAL, true);
}
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 9ea116ae53..d3d9cf2984 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -841,4 +841,147 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
e.printStackTrace();
}
}
+
+ /**
+ * Tests that a subscriber created on a same <i>session</i> as producer with
+ * no local true does not receive messages.
+ */
+ public void testNoLocalOnSameSession() throws Exception
+ {
+ Connection connection = getConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(getTestQueueName());
+ MessageProducer producer = session.createProducer(topic);
+ TopicSubscriber subscriber = null;
+ try
+ {
+ subscriber = session.createDurableSubscriber(topic, getTestName(), null, true);
+ connection.start();
+
+ producer.send(createNextMessage(session, 1));
+
+ Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNull("Unexpected message received", m);
+ }
+ finally
+ {
+ session.unsubscribe(getTestName());
+ }
+ }
+
+
+ /**
+ * Tests that a subscriber created on a same <i>connection</i> but separate
+ * <i>sessionM</i> as producer with no local true does not receive messages.
+ */
+ public void testNoLocalOnSameConnection() throws Exception
+ {
+ Connection connection = getConnection();
+
+ Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = consumerSession.createTopic(getTestQueueName());
+ MessageProducer producer = producerSession.createProducer(topic);
+
+ TopicSubscriber subscriber = null;
+ try
+ {
+ subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true);
+ connection.start();
+
+ producer.send(createNextMessage(producerSession, 1));
+
+ Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNull("Unexpected message received", m);
+ }
+ finally
+ {
+ consumerSession.unsubscribe(getTestName());
+ }
+ }
+
+ /**
+ * Tests that if no-local is in use, that the messages are delivered when
+ * the client reconnects.
+ *
+ * Currently fails on the Java Broker due to QPID-3605.
+ */
+ public void testNoLocalMessagesNotDeliveredAfterReconnection() throws Exception
+ {
+ Connection connection = getConnection();
+
+ Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = consumerSession.createTopic(getTestQueueName());
+ MessageProducer producer = producerSession.createProducer(topic);
+
+ TopicSubscriber subscriber = null;
+ try
+ {
+ subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true);
+ connection.start();
+
+ producer.send(createNextMessage(producerSession, 1));
+
+ Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNull("Unexpected message received", m);
+
+ connection.close();
+
+ connection = getConnection();
+
+ consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true);
+ connection.start();
+ m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNull("Message should not be received on a new connection", m);
+ }
+ finally
+ {
+ consumerSession.unsubscribe(getTestName());
+ }
+ }
+
+ /**
+ * Tests that messages are delivered normally to a subscriber on a separate connection despite
+ * the use of durable subscriber with no-local on the first connection.
+ */
+ public void testNoLocalSubscriberAndSubscriberOnSeparateConnection() throws Exception
+ {
+ Connection noLocalConnection = getConnection();
+ Connection connection = getConnection();
+
+ String noLocalSubId1 = getTestName() + "subId1";
+ String subId = getTestName() + "subId2";
+
+ Session noLocalSession = noLocalConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic noLocalTopic = noLocalSession.createTopic(getTestQueueName());
+
+ Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = consumerSession.createTopic(getTestQueueName());
+
+ TopicSubscriber noLocalSubscriber = null;
+ TopicSubscriber subscriber = null;
+ try
+ {
+ MessageProducer producer = noLocalSession.createProducer(noLocalTopic);
+ noLocalSubscriber = noLocalSession.createDurableSubscriber(noLocalTopic, noLocalSubId1, null, true);
+ subscriber = consumerSession.createDurableSubscriber(topic, subId, null, true);
+ noLocalConnection.start();
+ connection.start();
+
+ producer.send(createNextMessage(noLocalSession, 1));
+
+ Message m1 = noLocalSubscriber.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNull("Subscriber on nolocal connection should not receive message", m1);
+
+ Message m2 = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Subscriber on non-nolocal connection should receive message", m2);
+ }
+ finally
+ {
+ noLocalSession.unsubscribe(noLocalSubId1);
+ consumerSession.unsubscribe(subId);
+ }
+ }
}
diff --git a/java/test-profiles/JavaExcludes b/java/test-profiles/JavaExcludes
index 8de36cbd9a..023534ed26 100644
--- a/java/test-profiles/JavaExcludes
+++ b/java/test-profiles/JavaExcludes
@@ -43,3 +43,6 @@ org.apache.qpid.jms.xa.XAResourceTest#*
//The Java broker doesnt support client auth
org.apache.qpid.client.ssl.SSLTest#testMultipleCertsInSingleStore
+
+//QPID-3605 Durable subscriber with no-local true receives messages on re-connection
+org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testNoLocalMessagesNotDeliveredAfterReconnection
diff --git a/java/test-profiles/python_tests/Java010PythonExcludes b/java/test-profiles/python_tests/Java010PythonExcludes
index 10e6298634..7d9782d0ff 100644
--- a/java/test-profiles/python_tests/Java010PythonExcludes
+++ b/java/test-profiles/python_tests/Java010PythonExcludes
@@ -69,10 +69,8 @@ qpid_tests.broker_0_10.message.MessageTests.test_window_flow_bytes
#QPID-3592 Fails to receive more messages after restart
qpid_tests.broker_0_10.message.MessageTests.test_window_stop
-#QPID-3539 Tests fail because is incorrectly being done per session and not connection
-qpid_tests.broker_0_10.message.MessageTests.test_no_local
+#QPID-3605 Durable subscriber with no-local true receives messages on re-connection
qpid_tests.broker_0_10.message.MessageTests.test_no_local_awkward
-qpid_tests.broker_0_10.message.MessageTests.test_no_local_exclusive_subscribe
#QPID-3593 Priority Queue test failures
qpid_tests.broker_0_10.priority.PriorityTests.test_browsing