summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java46
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java29
-rw-r--r--qpid/java/test-profiles/python_tests/Java010PythonExcludes3
4 files changed, 76 insertions, 29 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 7e320956b6..295a7191e7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -146,6 +146,33 @@ public class HeadersExchange extends AbstractExchange
return new ArrayList<BaseQueue>(queues);
}
+
+ public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
+ {
+ CopyOnWriteArraySet<Binding> bindings;
+ if(bindingKey == null)
+ {
+ bindings = new CopyOnWriteArraySet<Binding>(getBindings());
+ }
+ else
+ {
+ bindings = _bindingsByKey.get(bindingKey);
+ }
+
+ if(bindings != null)
+ {
+ for(Binding binding : bindings)
+ {
+ if(queue == null || binding.getQueue().equals(queue))
+ {
+ return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments);
+ }
+ }
+ }
+
+ return false;
+ }
+
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
{
//fixme isBound here should take the arguements in to consideration.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 6d7242a78d..e4c452a5d6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -20,13 +20,23 @@
*/
package org.apache.qpid.server.exchange;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.management.JMException;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -36,23 +46,11 @@ import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.exchange.topic.TopicNormalizer;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import javax.management.JMException;
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.concurrent.ConcurrentHashMap;
-
public class TopicExchange extends AbstractExchange
{
@@ -275,6 +273,28 @@ public class TopicExchange extends AbstractExchange
}
}
+ public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
+ {
+ Binding binding = new Binding(null, bindingKey, queue, this, arguments);
+ if (arguments == null)
+ {
+ return _bindings.containsKey(binding);
+ }
+ else
+ {
+ FieldTable o = _bindings.get(binding);
+ if (o != null)
+ {
+ return arguments.equals(FieldTable.convertToMap(o));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ }
+
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return isBound(routingKey, null, queue);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 629597f40b..ca8de53d32 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -859,12 +859,6 @@ public class ServerSessionDelegate extends SessionDelegate
if(method.hasBindingKey())
{
- if(method.hasArguments())
- {
- FieldTable args = FieldTable.convertToFieldTable(method.getArguments());
-
- result.setArgsNotMatched(!exchange.isBound(new AMQShortString(method.getBindingKey()), args, queue));
- }
if(queueMatched)
{
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
@@ -873,23 +867,28 @@ public class ServerSessionDelegate extends SessionDelegate
{
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
}
+
+ if(method.hasArguments())
+ {
+ result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null));
+ }
+
}
else if (method.hasArguments())
{
- // TODO
-
+ result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null));
}
- result.setQueueNotMatched(!exchange.isBound(queue));
-
}
else if(exchange != null && method.hasBindingKey())
{
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+
if(method.hasArguments())
{
- // TODO
+ result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queue));
}
- result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+
}
@@ -898,11 +897,15 @@ public class ServerSessionDelegate extends SessionDelegate
{
if(method.hasArguments())
{
- // TODO
+ result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null));
}
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
}
+ else if(exchange != null && method.hasArguments())
+ {
+ result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), null));
+ }
session.executionResult((int) method.getId(), result);
diff --git a/qpid/java/test-profiles/python_tests/Java010PythonExcludes b/qpid/java/test-profiles/python_tests/Java010PythonExcludes
index 70a15fea9b..42f8c18235 100644
--- a/qpid/java/test-profiles/python_tests/Java010PythonExcludes
+++ b/qpid/java/test-profiles/python_tests/Java010PythonExcludes
@@ -77,9 +77,6 @@ qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_reject_no_
qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_add_alternate_to_exchange
qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange
-#QPID-3597 Headers exchange issues
-qpid_tests.broker_0_10.query.QueryTests.test_exchange_bound_header
-
#QPID-3599 Tests fail due to differences in expected message Redelivered status
qpid.tests.messaging.endpoints.SessionTests.testCommitAck
qpid.tests.messaging.endpoints.SessionTests.testRelease