From d3459b6f6e751e77eecac781e4701a4d15290a43 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 19 Dec 2006 10:51:39 +0000 Subject: QPID-21 Added: SelectorParser.jj - ActiveMQ selector javacc grammar used to generate SelectorParser.java server/filter - Selector Filtering code from ActiveMQ project adjusted to suite our class and package structure. server/message - Decorator classes to allow access to the JMSMessage inside the AMQMessage ConcurrentSelectorDeliveryManager.java - A new DeliveryManager that utilises PreDeliveryQueues to implement selectors AMQInvalidSelectorException.java - thrown on client and broker when the Selector text is invalid. Common: log4j.properties to remove error log4j warnings on Common tests. Modified: broker/pom.xml - to generate SelectorParser.java AMQChannel.java - Addition of argument fieldtable for filter setup. BasicConsumeMethodHandler.java - writing of InvalidSelector channel close exception. AMQMessage.java - Added decorator to get access to the enclosed JMSMessage AMQQueue.java - Enhanced 'deliverymanager' property to allow the selection of the ConcurrentSelectorDeliveryManager. Subscription.java - Enhanced interface to allow a subscription to state an 'interest' in a given message. SubscriptionFactory.java - Added method to allow passing of filter arguments. SubscriptionImpl.java - Implemented new Subscription.java methods. SubscriptionManager.java - Added ability to get a list of current subscribers. SubscriptionSet.java - augmented nextSubscriber to allow the subscriber to exert the new hasInterest feature. SynchronizedDeliveryManager.java - fixed Logging class AMQSession - Added filter extraction from consume call and pass it on to the registration. ChannelCloseMethodHandler.java - Handle the reception and correct raising of the InvalidSelector Exception AbstractJMSMessage.java - Expanded imports BlockingMethodFrameListener.java - added extra info to a debug output line. SocketTransportConnection.java - made output an info not a warn. PropertiesFileInitialContextFactory.java - updated to allow the PROVIDER_URL to specify a property file to read in for the initial values. ClusteredSubscriptionManager.java - Implementation of SubscriptionSet.java NestedSubscriptionManager.java - Implementation of SubscriptionManager.java RemoteSubscriptionImpl.java - Implementation Subscription.java AMQConstant.java - Added '322' "Invalid Selector" SubscriptionTestHelper.java - Implementation of Subscription.java Edited specs/amqp-8.0.xml to add field table to consume method. Thanks to the ActiveMQ project for writing the initial SelectorParser.jj and associated filter Expressions. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488624 13f79535-47bb-0310-9956-ffa450edef68 --- java/broker/pom.xml | 23 + java/broker/src/main/grammar/SelectorParser.jj | 598 +++++++++++++++++++++ .../java/org/apache/qpid/server/AMQChannel.java | 5 +- .../qpid/server/filter/ArithmeticExpression.java | 216 ++++++++ .../qpid/server/filter/BinaryExpression.java | 100 ++++ .../qpid/server/filter/BooleanExpression.java | 45 ++ .../qpid/server/filter/ComparisonExpression.java | 465 ++++++++++++++++ .../qpid/server/filter/ConstantExpression.java | 170 ++++++ .../org/apache/qpid/server/filter/Expression.java | 42 ++ .../apache/qpid/server/filter/FilterManager.java | 37 ++ .../qpid/server/filter/FilterManagerFactory.java | 81 +++ .../qpid/server/filter/JMSSelectorFilter.java | 79 +++ .../apache/qpid/server/filter/LogicExpression.java | 95 ++++ .../apache/qpid/server/filter/MessageFilter.java | 30 ++ .../qpid/server/filter/PropertyExpression.java | 305 +++++++++++ .../qpid/server/filter/SimpleFilterManager.java | 77 +++ .../apache/qpid/server/filter/UnaryExpression.java | 263 +++++++++ .../apache/qpid/server/filter/XPathExpression.java | 130 +++++ .../qpid/server/filter/XQueryExpression.java | 56 ++ .../qpid/server/filter/XalanXPathEvaluator.java | 99 ++++ .../server/handler/BasicConsumeMethodHandler.java | 22 +- .../qpid/server/message/MessageDecorator.java | 25 + .../apache/qpid/server/message/jms/JMSMessage.java | 306 +++++++++++ .../org/apache/qpid/server/queue/AMQMessage.java | 88 ++- .../org/apache/qpid/server/queue/AMQQueue.java | 30 +- .../queue/ConcurrentSelectorDeliveryManager.java | 352 ++++++++++++ .../org/apache/qpid/server/queue/Subscription.java | 13 + .../qpid/server/queue/SubscriptionFactory.java | 4 + .../apache/qpid/server/queue/SubscriptionImpl.java | 65 ++- .../qpid/server/queue/SubscriptionManager.java | 3 + .../apache/qpid/server/queue/SubscriptionSet.java | 72 ++- .../server/queue/SynchronizedDeliveryManager.java | 2 +- 32 files changed, 3854 insertions(+), 44 deletions(-) create mode 100644 java/broker/src/main/grammar/SelectorParser.jj create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (limited to 'java/broker') diff --git a/java/broker/pom.xml b/java/broker/pom.xml index aea2d5878a..5f4c490fd4 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -54,6 +54,10 @@ commons-lang commons-lang + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + org.apache.mina mina-filter-ssl @@ -82,6 +86,25 @@ + + org.codehaus.mojo + javacc-maven-plugin + 2.0 + + + generate-sources + + ${basedir}/src/main/grammar + ${basedir}/target/generated + org.apache.qpid.server.filter.jms.selector + + + javacc + + + + + org.apache.maven.plugins maven-surefire-plugin diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj new file mode 100644 index 0000000000..5553a46e47 --- /dev/null +++ b/java/broker/src/main/grammar/SelectorParser.jj @@ -0,0 +1,598 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + // + // Original File from r450141 of the Apache ActiveMQ project + // + +// ---------------------------------------------------------------------------- +// OPTIONS +// ---------------------------------------------------------------------------- +options { + STATIC = false; + UNICODE_INPUT = true; + + // some performance optimizations + OPTIMIZE_TOKEN_MANAGER = true; + ERROR_REPORTING = false; +} + +// ---------------------------------------------------------------------------- +// PARSER +// ---------------------------------------------------------------------------- + +PARSER_BEGIN(SelectorParser) +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.filter.jms.selector; + +import java.io.*; +import java.util.*; + +import javax.jms.InvalidSelectorException; + +import org.apache.qpid.server.filter.*; + +/** + * JMS Selector Parser generated by JavaCC + * + * Do not edit this .java file directly - it is autogenerated from SelectorParser.jj + */ +public class SelectorParser { + + public SelectorParser() { + this(new StringReader("")); + } + + public BooleanExpression parse(String sql) throws InvalidSelectorException { + this.ReInit(new StringReader(sql)); + + try { + return this.JmsSelector(); + } + catch (Throwable e) { + throw (InvalidSelectorException)new InvalidSelectorException(sql).initCause(e); + } + + } + + private BooleanExpression asBooleanExpression(Expression value) throws ParseException { + if (value instanceof BooleanExpression) { + return (BooleanExpression) value; + } + if (value instanceof PropertyExpression) { + return UnaryExpression.createBooleanCast( value ); + } + throw new ParseException("Expression will not result in a boolean value: " + value); + } + + +} + +PARSER_END(SelectorParser) + +// ---------------------------------------------------------------------------- +// Tokens +// ---------------------------------------------------------------------------- + +/* White Space */ +SPECIAL_TOKEN : +{ + " " | "\t" | "\n" | "\r" | "\f" +} + +/* Comments */ +SKIP: +{ + +} + +SKIP: +{ + +} + +/* Reserved Words */ +TOKEN [IGNORE_CASE] : +{ + < NOT : "NOT"> + | < AND : "AND"> + | < OR : "OR"> + | < BETWEEN : "BETWEEN"> + | < LIKE : "LIKE"> + | < ESCAPE : "ESCAPE"> + | < IN : "IN"> + | < IS : "IS"> + | < TRUE : "TRUE" > + | < FALSE : "FALSE" > + | < NULL : "NULL" > + | < XPATH : "XPATH" > + | < XQUERY : "XQUERY" > +} + +/* Literals */ +TOKEN [IGNORE_CASE] : +{ + + < DECIMAL_LITERAL: ["1"-"9"] (["0"-"9"])* (["l","L"])? > + | < HEX_LITERAL: "0" ["x","X"] (["0"-"9","a"-"f","A"-"F"])+ > + | < OCTAL_LITERAL: "0" (["0"-"7"])* > + | < FLOATING_POINT_LITERAL: + (["0"-"9"])+ "." (["0"-"9"])* ()? // matches: 5.5 or 5. or 5.5E10 or 5.E10 + | "." (["0"-"9"])+ ()? // matches: .5 or .5E10 + | (["0"-"9"])+ // matches: 5E10 + > + | < #EXPONENT: "E" (["+","-"])? (["0"-"9"])+ > + | < STRING_LITERAL: "'" ( ("''") | ~["'"] )* "'" > +} + +TOKEN [IGNORE_CASE] : +{ + < ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* > +} + +// ---------------------------------------------------------------------------- +// Grammer +// ---------------------------------------------------------------------------- +BooleanExpression JmsSelector() : +{ + Expression left=null; +} +{ + ( + left = orExpression() + ) + { + return asBooleanExpression(left); + } + +} + +Expression orExpression() : +{ + Expression left; + Expression right; +} +{ + ( + left = andExpression() + ( + right = andExpression() + { + left = LogicExpression.createOR(asBooleanExpression(left), asBooleanExpression(right)); + } + )* + ) + { + return left; + } + +} + + +Expression andExpression() : +{ + Expression left; + Expression right; +} +{ + ( + left = equalityExpression() + ( + right = equalityExpression() + { + left = LogicExpression.createAND(asBooleanExpression(left), asBooleanExpression(right)); + } + )* + ) + { + return left; + } +} + +Expression equalityExpression() : +{ + Expression left; + Expression right; +} +{ + ( + left = comparisonExpression() + ( + + "=" right = comparisonExpression() + { + left = ComparisonExpression.createEqual(left, right); + } + | + "<>" right = comparisonExpression() + { + left = ComparisonExpression.createNotEqual(left, right); + } + | + LOOKAHEAD(2) + + { + left = ComparisonExpression.createIsNull(left); + } + | + + { + left = ComparisonExpression.createIsNotNull(left); + } + )* + ) + { + return left; + } +} + +Expression comparisonExpression() : +{ + Expression left; + Expression right; + Expression low; + Expression high; + String t, u; + boolean not; + ArrayList list; +} +{ + ( + left = addExpression() + ( + + ">" right = addExpression() + { + left = ComparisonExpression.createGreaterThan(left, right); + } + | + ">=" right = addExpression() + { + left = ComparisonExpression.createGreaterThanEqual(left, right); + } + | + "<" right = addExpression() + { + left = ComparisonExpression.createLessThan(left, right); + } + | + "<=" right = addExpression() + { + left = ComparisonExpression.createLessThanEqual(left, right); + } + | + { + u=null; + } + t = stringLitteral() + [ u = stringLitteral() ] + { + left = ComparisonExpression.createLike(left, t, u); + } + | + LOOKAHEAD(2) + { + u=null; + } + t = stringLitteral() [ u = stringLitteral() ] + { + left = ComparisonExpression.createNotLike(left, t, u); + } + | + low = addExpression() high = addExpression() + { + left = ComparisonExpression.createBetween(left, low, high); + } + | + LOOKAHEAD(2) + low = addExpression() high = addExpression() + { + left = ComparisonExpression.createNotBetween(left, low, high); + } + | + + "(" + t = stringLitteral() + { + list = new ArrayList(); + list.add( t ); + } + ( + "," + t = stringLitteral() + { + list.add( t ); + } + + )* + ")" + { + left = ComparisonExpression.createInFilter(left, list); + } + | + LOOKAHEAD(2) + + "(" + t = stringLitteral() + { + list = new ArrayList(); + list.add( t ); + } + ( + "," + t = stringLitteral() + { + list.add( t ); + } + + )* + ")" + { + left = ComparisonExpression.createNotInFilter(left, list); + } + + )* + ) + { + return left; + } +} + +Expression addExpression() : +{ + Expression left; + Expression right; +} +{ + left = multExpr() + ( + LOOKAHEAD( ("+"|"-") multExpr()) + ( + "+" right = multExpr() + { + left = ArithmeticExpression.createPlus(left, right); + } + | + "-" right = multExpr() + { + left = ArithmeticExpression.createMinus(left, right); + } + ) + + )* + { + return left; + } +} + +Expression multExpr() : +{ + Expression left; + Expression right; +} +{ + left = unaryExpr() + ( + "*" right = unaryExpr() + { + left = ArithmeticExpression.createMultiply(left, right); + } + | + "/" right = unaryExpr() + { + left = ArithmeticExpression.createDivide(left, right); + } + | + "%" right = unaryExpr() + { + left = ArithmeticExpression.createMod(left, right); + } + + )* + { + return left; + } +} + + +Expression unaryExpr() : +{ + String s=null; + Expression left=null; +} +{ + ( + LOOKAHEAD( "+" unaryExpr() ) + "+" left=unaryExpr() + | + "-" left=unaryExpr() + { + left = UnaryExpression.createNegate(left); + } + | + left=unaryExpr() + { + left = UnaryExpression.createNOT( asBooleanExpression(left) ); + } + | + s=stringLitteral() + { + left = UnaryExpression.createXPath( s ); + } + | + s=stringLitteral() + { + left = UnaryExpression.createXQuery( s ); + } + | + left = primaryExpr() + ) + { + return left; + } + +} + +Expression primaryExpr() : +{ + Expression left=null; +} +{ + ( + left = literal() + | + left = variable() + | + "(" left = orExpression() ")" + ) + { + return left; + } +} + + + +ConstantExpression literal() : +{ + Token t; + String s; + ConstantExpression left=null; +} +{ + ( + ( + s = stringLitteral() + { + left = new ConstantExpression(s); + } + ) + | + ( + t = + { + left = ConstantExpression.createFromDecimal(t.image); + } + ) + | + ( + t = + { + left = ConstantExpression.createFromHex(t.image); + } + ) + | + ( + t = + { + left = ConstantExpression.createFromOctal(t.image); + } + ) + | + ( + t = + { + left = ConstantExpression.createFloat(t.image); + } + ) + | + ( + + { + left = ConstantExpression.TRUE; + } + ) + | + ( + + { + left = ConstantExpression.FALSE; + } + ) + | + ( + + { + left = ConstantExpression.NULL; + } + ) + ) + { + return left; + } +} + +String stringLitteral() : +{ + Token t; + StringBuffer rc = new StringBuffer(); + boolean first=true; +} +{ + t = + { + // Decode the sting value. + String image = t.image; + for( int i=1; i < image.length()-1; i++ ) { + char c = image.charAt(i); + if( c == '\'' ) + i++; + rc.append(c); + } + return rc.toString(); + } +} + +PropertyExpression variable() : +{ + Token t; + PropertyExpression left=null; +} +{ + ( + t = + { + left = new PropertyExpression(t.image); + } + ) + { + return left; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index b0fbafac56..d8485ef0f2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; @@ -290,7 +291,7 @@ public class AMQChannel * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ - public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException + public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -301,7 +302,7 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks); + queue.registerProtocolSession(session, _channelId, tag, acks, filters); _consumerTag2QueueMap.put(tag, queue); return tag; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java new file mode 100644 index 0000000000..0aa5739c1c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java @@ -0,0 +1,216 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + + +import org.apache.qpid.server.queue.AMQMessage; + +import javax.jms.JMSException; + +/** + * An expression which performs an operation on two expression values + * + * @version $Revision$ + */ +public abstract class ArithmeticExpression extends BinaryExpression { + + protected static final int INTEGER = 1; + protected static final int LONG = 2; + protected static final int DOUBLE = 3; + + /** + * @param left + * @param right + */ + public ArithmeticExpression(Expression left, Expression right) { + super(left, right); + } + + public static Expression createPlus(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof String) { + String text = (String) lvalue; + String answer = text + rvalue; + return answer; + } + else if (lvalue instanceof Number) { + return plus((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "+"; + } + }; + } + + public static Expression createMinus(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return minus((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call minus operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "-"; + } + }; + } + + public static Expression createMultiply(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return multiply((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "*"; + } + }; + } + + public static Expression createDivide(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return divide((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call divide operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "/"; + } + }; + } + + public static Expression createMod(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return mod((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call mod operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "%"; + } + }; + } + + protected Number plus(Number left, Number right) { + switch (numberType(left, right)) { + case INTEGER: + return new Integer(left.intValue() + right.intValue()); + case LONG: + return new Long(left.longValue() + right.longValue()); + default: + return new Double(left.doubleValue() + right.doubleValue()); + } + } + + protected Number minus(Number left, Number right) { + switch (numberType(left, right)) { + case INTEGER: + return new Integer(left.intValue() - right.intValue()); + case LONG: + return new Long(left.longValue() - right.longValue()); + default: + return new Double(left.doubleValue() - right.doubleValue()); + } + } + + protected Number multiply(Number left, Number right) { + switch (numberType(left, right)) { + case INTEGER: + return new Integer(left.intValue() * right.intValue()); + case LONG: + return new Long(left.longValue() * right.longValue()); + default: + return new Double(left.doubleValue() * right.doubleValue()); + } + } + + protected Number divide(Number left, Number right) { + return new Double(left.doubleValue() / right.doubleValue()); + } + + protected Number mod(Number left, Number right) { + return new Double(left.doubleValue() % right.doubleValue()); + } + + private int numberType(Number left, Number right) { + if (isDouble(left) || isDouble(right)) { + return DOUBLE; + } + else if (left instanceof Long || right instanceof Long) { + return LONG; + } + else { + return INTEGER; + } + } + + private boolean isDouble(Number n) { + return n instanceof Float || n instanceof Double; + } + + protected Number asNumber(Object value) { + if (value instanceof Number) { + return (Number) value; + } + else { + throw new RuntimeException("Cannot convert value: " + value + " into a number"); + } + } + + public Object evaluate(AMQMessage message) throws JMSException { + Object lvalue = left.evaluate(message); + if (lvalue == null) { + return null; + } + Object rvalue = right.evaluate(message); + if (rvalue == null) { + return null; + } + return evaluate(lvalue, rvalue); + } + + + /** + * @param lvalue + * @param rvalue + * @return + */ + abstract protected Object evaluate(Object lvalue, Object rvalue); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java new file mode 100644 index 0000000000..4256ab9189 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java @@ -0,0 +1,100 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + + + +/** + * An expression which performs an operation on two expression values. + * + * @version $Revision$ + */ +abstract public class BinaryExpression implements Expression { + protected Expression left; + protected Expression right; + + public BinaryExpression(Expression left, Expression right) { + this.left = left; + this.right = right; + } + + public Expression getLeft() { + return left; + } + + public Expression getRight() { + return right; + } + + + /** + * @see java.lang.Object#toString() + */ + public String toString() { + return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) { + + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + abstract public String getExpressionSymbol(); + + /** + * @param expression + */ + public void setRight(Expression expression) { + right = expression; + } + + /** + * @param expression + */ + public void setLeft(Expression expression) { + left = expression; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java new file mode 100644 index 0000000000..b66de3fbc5 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import javax.jms.JMSException; + + +/** + * A BooleanExpression is an expression that always + * produces a Boolean result. + * + * @version $Revision$ + */ +public interface BooleanExpression extends Expression { + + /** + * @param message + * @return true if the expression evaluates to Boolean.TRUE. + * @throws JMSException + */ + public boolean matches(AMQMessage message) throws JMSException; + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java new file mode 100644 index 0000000000..13d278cf65 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java @@ -0,0 +1,465 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import java.util.HashSet; +import java.util.List; +import java.util.regex.Pattern; + +import javax.jms.JMSException; + +/** + * A filter performing a comparison of two objects + * + * @version $Revision$ + */ +public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression { + + public static BooleanExpression createBetween(Expression value, Expression left, Expression right) { + return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right)); + } + + public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) { + return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right)); + } + + static final private HashSet REGEXP_CONTROL_CHARS = new HashSet(); + + static { + REGEXP_CONTROL_CHARS.add(new Character('.')); + REGEXP_CONTROL_CHARS.add(new Character('\\')); + REGEXP_CONTROL_CHARS.add(new Character('[')); + REGEXP_CONTROL_CHARS.add(new Character(']')); + REGEXP_CONTROL_CHARS.add(new Character('^')); + REGEXP_CONTROL_CHARS.add(new Character('$')); + REGEXP_CONTROL_CHARS.add(new Character('?')); + REGEXP_CONTROL_CHARS.add(new Character('*')); + REGEXP_CONTROL_CHARS.add(new Character('+')); + REGEXP_CONTROL_CHARS.add(new Character('{')); + REGEXP_CONTROL_CHARS.add(new Character('}')); + REGEXP_CONTROL_CHARS.add(new Character('|')); + REGEXP_CONTROL_CHARS.add(new Character('(')); + REGEXP_CONTROL_CHARS.add(new Character(')')); + REGEXP_CONTROL_CHARS.add(new Character(':')); + REGEXP_CONTROL_CHARS.add(new Character('&')); + REGEXP_CONTROL_CHARS.add(new Character('<')); + REGEXP_CONTROL_CHARS.add(new Character('>')); + REGEXP_CONTROL_CHARS.add(new Character('=')); + REGEXP_CONTROL_CHARS.add(new Character('!')); + } + + static class LikeExpression extends UnaryExpression implements BooleanExpression { + + Pattern likePattern; + + /** + * @param right + */ + public LikeExpression(Expression right, String like, int escape) { + super(right); + + StringBuffer regexp = new StringBuffer(like.length() * 2); + regexp.append("\\A"); // The beginning of the input + for (int i = 0; i < like.length(); i++) { + char c = like.charAt(i); + if (escape == (0xFFFF & c)) { + i++; + if (i >= like.length()) { + // nothing left to escape... + break; + } + + char t = like.charAt(i); + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & t)); + } + else if (c == '%') { + regexp.append(".*?"); // Do a non-greedy match + } + else if (c == '_') { + regexp.append("."); // match one + } + else if (REGEXP_CONTROL_CHARS.contains(new Character(c))) { + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & c)); + } + else { + regexp.append(c); + } + } + regexp.append("\\z"); // The end of the input + + likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL); + } + + /** + * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol() + */ + public String getExpressionSymbol() { + return "LIKE"; + } + + /** + * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext) + */ + public Object evaluate(AMQMessage message) throws JMSException { + + Object rv = this.getRight().evaluate(message); + + if (rv == null) { + return null; + } + + if (!(rv instanceof String)) { + return Boolean.FALSE; + //throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass()); + } + + return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE; + } + + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + } + + public static BooleanExpression createLike(Expression left, String right, String escape) { + if (escape != null && escape.length() != 1) { + throw new RuntimeException("The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape); + } + int c = -1; + if (escape != null) { + c = 0xFFFF & escape.charAt(0); + } + + return new LikeExpression(left, right, c); + } + + public static BooleanExpression createNotLike(Expression left, String right, String escape) { + return UnaryExpression.createNOT(createLike(left, right, escape)); + } + + public static BooleanExpression createInFilter(Expression left, List elements) { + + if( !(left instanceof PropertyExpression) ) + throw new RuntimeException("Expected a property for In expression, got: "+left); + return UnaryExpression.createInExpression((PropertyExpression)left, elements, false); + + } + + public static BooleanExpression createNotInFilter(Expression left, List elements) { + + if( !(left instanceof PropertyExpression) ) + throw new RuntimeException("Expected a property for In expression, got: "+left); + return UnaryExpression.createInExpression((PropertyExpression)left, elements, true); + + } + + public static BooleanExpression createIsNull(Expression left) { + return doCreateEqual(left, ConstantExpression.NULL); + } + + public static BooleanExpression createIsNotNull(Expression left) { + return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL)); + } + + public static BooleanExpression createNotEqual(Expression left, Expression right) { + return UnaryExpression.createNOT(createEqual(left, right)); + } + + public static BooleanExpression createEqual(Expression left, Expression right) { + checkEqualOperand(left); + checkEqualOperand(right); + checkEqualOperandCompatability(left, right); + return doCreateEqual(left, right); + } + + private static BooleanExpression doCreateEqual(Expression left, Expression right) { + return new ComparisonExpression(left, right) { + + public Object evaluate(AMQMessage message) throws JMSException { + Object lv = left.evaluate(message); + Object rv = right.evaluate(message); + + // Iff one of the values is null + if (lv == null ^ rv == null) { + return Boolean.FALSE; + } + if (lv == rv || lv.equals(rv)) { + return Boolean.TRUE; + } + if( lv instanceof Comparable && rv instanceof Comparable ) { + return compare((Comparable)lv, (Comparable)rv); + } + return Boolean.FALSE; + } + + protected boolean asBoolean(int answer) { + return answer == 0; + } + + public String getExpressionSymbol() { + return "="; + } + }; + } + + public static BooleanExpression createGreaterThan(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + protected boolean asBoolean(int answer) { + return answer > 0; + } + + public String getExpressionSymbol() { + return ">"; + } + }; + } + + public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + protected boolean asBoolean(int answer) { + return answer >= 0; + } + + public String getExpressionSymbol() { + return ">="; + } + }; + } + + public static BooleanExpression createLessThan(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + + protected boolean asBoolean(int answer) { + return answer < 0; + } + + public String getExpressionSymbol() { + return "<"; + } + + }; + } + + public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + + protected boolean asBoolean(int answer) { + return answer <= 0; + } + + public String getExpressionSymbol() { + return "<="; + } + }; + } + + /** + * Only Numeric expressions can be used in >, >=, < or <= expressions.s + * + * @param expr + */ + public static void checkLessThanOperand(Expression expr ) { + if( expr instanceof ConstantExpression ) { + Object value = ((ConstantExpression)expr).getValue(); + if( value instanceof Number ) + return; + + // Else it's boolean or a String.. + throw new RuntimeException("Value '"+expr+"' cannot be compared."); + } + if( expr instanceof BooleanExpression ) { + throw new RuntimeException("Value '"+expr+"' cannot be compared."); + } + } + + /** + * Validates that the expression can be used in == or <> expression. + * Cannot not be NULL TRUE or FALSE litterals. + * + * @param expr + */ + public static void checkEqualOperand(Expression expr ) { + if( expr instanceof ConstantExpression ) { + Object value = ((ConstantExpression)expr).getValue(); + if( value == null ) + throw new RuntimeException("'"+expr+"' cannot be compared."); + } + } + + /** + * + * @param left + * @param right + */ + private static void checkEqualOperandCompatability(Expression left, Expression right) { + if( left instanceof ConstantExpression && right instanceof ConstantExpression ) { + if( left instanceof BooleanExpression && !(right instanceof BooleanExpression) ) + throw new RuntimeException("'"+left+"' cannot be compared with '"+right+"'"); + } + } + + + + /** + * @param left + * @param right + */ + public ComparisonExpression(Expression left, Expression right) { + super(left, right); + } + + public Object evaluate(AMQMessage message) throws JMSException { + Comparable lv = (Comparable) left.evaluate(message); + if (lv == null) { + return null; + } + Comparable rv = (Comparable) right.evaluate(message); + if (rv == null) { + return null; + } + return compare(lv, rv); + } + + protected Boolean compare(Comparable lv, Comparable rv) { + Class lc = lv.getClass(); + Class rc = rv.getClass(); + // If the the objects are not of the same type, + // try to convert up to allow the comparison. + if (lc != rc) { + if (lc == Byte.class) { + if (rc == Short.class) { + lv = new Short(((Number) lv).shortValue()); + } + else if (rc == Integer.class) { + lv = new Integer(((Number) lv).intValue()); + } + else if (rc == Long.class) { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } else if (lc == Short.class) { + if (rc == Integer.class) { + lv = new Integer(((Number) lv).intValue()); + } + else if (rc == Long.class) { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } else if (lc == Integer.class) { + if (rc == Long.class) { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else if (lc == Long.class) { + if (rc == Integer.class) { + rv = new Long(((Number) rv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else if (lc == Float.class) { + if (rc == Integer.class) { + rv = new Float(((Number) rv).floatValue()); + } + else if (rc == Long.class) { + rv = new Float(((Number) rv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else if (lc == Double.class) { + if (rc == Integer.class) { + rv = new Double(((Number) rv).doubleValue()); + } + else if (rc == Long.class) { + rv = new Double(((Number) rv).doubleValue()); + } + else if (rc == Float.class) { + rv = new Float(((Number) rv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else + return Boolean.FALSE; + } + return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE; + } + + protected abstract boolean asBoolean(int answer); + + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java new file mode 100644 index 0000000000..9bde712da2 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java @@ -0,0 +1,170 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import java.math.BigDecimal; + +import javax.jms.JMSException; + +/** + * Represents a constant expression + * + * @version $Revision$ + */ +public class ConstantExpression implements Expression { + + static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression { + public BooleanConstantExpression(Object value) { + super(value); + } + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + } + + public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null); + public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE); + public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE); + + private Object value; + + public static ConstantExpression createFromDecimal(String text) { + + // Strip off the 'l' or 'L' if needed. + if( text.endsWith("l") || text.endsWith("L") ) + text = text.substring(0, text.length()-1); + + Number value; + try { + value = new Long(text); + } catch ( NumberFormatException e) { + // The number may be too big to fit in a long. + value = new BigDecimal(text); + } + + long l = value.longValue(); + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + value = new Integer(value.intValue()); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createFromHex(String text) { + Number value = new Long(Long.parseLong(text.substring(2), 16)); + long l = value.longValue(); + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + value = new Integer(value.intValue()); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createFromOctal(String text) { + Number value = new Long(Long.parseLong(text, 8)); + long l = value.longValue(); + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + value = new Integer(value.intValue()); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createFloat(String text) { + Number value = new Double(text); + return new ConstantExpression(value); + } + + public ConstantExpression(Object value) { + this.value = value; + } + + public Object evaluate(AMQMessage message) throws JMSException { + return value; + } + + public Object getValue() { + return value; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() { + if (value == null) { + return "NULL"; + } + if (value instanceof Boolean) { + return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE"; + } + if (value instanceof String) { + return encodeString((String) value); + } + return value.toString(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) { + + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return toString().equals(o.toString()); + + } + + + /** + * Encodes the value of string so that it looks like it would look like + * when it was provided in a selector. + * + * @param s + * @return + */ + public static String encodeString(String s) { + StringBuffer b = new StringBuffer(); + b.append('\''); + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + if (c == '\'') { + b.append(c); + } + b.append(c); + } + b.append('\''); + return b.toString(); + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java new file mode 100644 index 0000000000..a15c15fb91 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java @@ -0,0 +1,42 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import javax.jms.JMSException; + + +/** + * Represents an expression + * + * @version $Revision$ + */ +public interface Expression { + + /** + * @return the value of this expression + */ + public Object evaluate(AMQMessage message) throws JMSException; + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java new file mode 100644 index 0000000000..c82de9fa15 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; + +public interface FilterManager +{ + void add(MessageFilter filter); + + void remove(MessageFilter filter); + + boolean allAllow(AMQMessage msg); + + boolean hasFilters(); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java new file mode 100644 index 0000000000..6ecd56586f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.AMQException; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import java.util.Iterator; + + +public class FilterManagerFactory +{ + //private final static Logger _logger = LoggerFactory.getLogger(FilterManagerFactory.class); + private final static org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(FilterManagerFactory.class); + + //fixme move to a common class so it can be refered to from client code. + private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector"; + + public static FilterManager createManager(FieldTable filters) throws AMQException + { + FilterManager manager = null; + + if (filters != null) + { + + manager = new SimpleFilterManager(); + + Iterator it = filters.keySet().iterator(); + _logger.info("Processing filters:"); + while (it.hasNext()) + { + String key = (String) it.next(); + _logger.info("filter:" + key); + if (key.equals(JMS_SELECTOR_FILTER)) + { + String selector = (String) filters.get(key); + + if (selector != null && !selector.equals("")) + { + manager.add(new JMSSelectorFilter(selector)); + } + } + + } + + //If we added no filters don't bear the overhead of having an filter manager + if (!manager.hasFilters()) + { + manager = null; + } + } + else + { + _logger.info("No Filters found."); + } + + + return manager; + + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java new file mode 100644 index 0000000000..4884067237 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.filter.jms.selector.SelectorParser; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.log4j.Logger; + + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +public class JMSSelectorFilter implements MessageFilter +{ + private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class); + + private String _selector; + private BooleanExpression _matcher; + + public JMSSelectorFilter(String selector) throws AMQException + { + _selector = selector; + _logger.info("Created JMSSelectorFilter with selector:" + _selector); + + + try + { + _matcher = new SelectorParser().parse(selector); + } + catch (InvalidSelectorException e) + { + // fixme + // Is this the correct way of throwing exception + throw new AMQInvalidSelectorException(e.getMessage()); + } + + } + + public boolean matches(AMQMessage message) + { + try + { + boolean match = _matcher.matches(message); + _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector); + return match; + } + catch (JMSException e) + { + //fixme this needs to be sorted.. it shouldn't happen + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + return false; + } + + public String getSelector() + { + return _selector; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java new file mode 100644 index 0000000000..714d8c23f5 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java @@ -0,0 +1,95 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import javax.jms.JMSException; + +/** + * A filter performing a comparison of two objects + * + * @version $Revision$ + */ +public abstract class LogicExpression extends BinaryExpression implements BooleanExpression { + + public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) { + return new LogicExpression(lvalue, rvalue) { + + public Object evaluate(AMQMessage message) throws JMSException { + + Boolean lv = (Boolean) left.evaluate(message); + // Can we do an OR shortcut?? + if (lv !=null && lv.booleanValue()) { + return Boolean.TRUE; + } + + Boolean rv = (Boolean) right.evaluate(message); + return rv==null ? null : rv; + } + + public String getExpressionSymbol() { + return "OR"; + } + }; + } + + public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) { + return new LogicExpression(lvalue, rvalue) { + + public Object evaluate(AMQMessage message) throws JMSException { + + Boolean lv = (Boolean) left.evaluate(message); + + // Can we do an AND shortcut?? + if (lv == null) + return null; + if (!lv.booleanValue()) { + return Boolean.FALSE; + } + + Boolean rv = (Boolean) right.evaluate(message); + return rv == null ? null : rv; + } + + public String getExpressionSymbol() { + return "AND"; + } + }; + } + + /** + * @param left + * @param right + */ + public LogicExpression(BooleanExpression left, BooleanExpression right) { + super(left, right); + } + + abstract public Object evaluate(AMQMessage message) throws JMSException; + + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java new file mode 100644 index 0000000000..b8ca75d209 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; + +import javax.jms.JMSException; + +public interface MessageFilter +{ + boolean matches(AMQMessage message) throws JMSException; +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java new file mode 100644 index 0000000000..f3e9965c2e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -0,0 +1,305 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import java.io.IOException; +import java.util.HashMap; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; + +//import org.apache.activemq.command.ActiveMQDestination; +//import org.apache.activemq.command.Message; +//import org.apache.activemq.command.TransactionId; +//import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.log4j.Logger; + +/** + * Represents a property expression + * + * @version $Revision$ + */ +public class PropertyExpression implements Expression +{ + + interface SubExpression + { + public Object evaluate(AMQMessage message); + } + + interface JMSExpression + { + public abstract Object evaluate(JMSMessage message); + } + + static class SubJMSExpression implements SubExpression + { + JMSExpression _expression; + + SubJMSExpression(JMSExpression expression) + { + _expression = expression; + } + + + public Object evaluate(AMQMessage message) + { + JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE); + if (msg != null) + { + return _expression.evaluate(msg); + } + else + { + return null; + } + } + } + + private final static Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class); + + + static final private HashMap JMS_PROPERTY_EXPRESSIONS = new HashMap(); + + static + { + JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSDestination(); + } + } + )); +// +// public Object evaluate(AMQMessage message) +// { +// //fixme +// +// +//// AMQDestination dest = message.getOriginalDestination(); +//// if (dest == null) +//// { +//// dest = message.getDestination(); +//// } +//// if (dest == null) +//// { +//// return null; +//// } +//// return dest.toString(); +// return ""; +// } +// }); + JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSReplyTo(); + } + }) + ); + + JMS_PROPERTY_EXPRESSIONS.put("JMSType", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSType(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + try + { + Integer mode = new Integer(message.getAMQMessage().isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + _logger.info("JMSDeliveryMode is :" + mode); + return mode; + } + catch (AMQException e) + { + //shouldn't happen + } + + return DeliveryMode.NON_PERSISTENT; + } + })); + + JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSPriority(); + } + } + )); + + + JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getAMQMessage().getMessageId(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSTimestamp(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSCorrelationID(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSExpiration(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getAMQMessage().isRedelivered(); + } + } + )); + + } + + private final String name; + private final SubExpression jmsPropertyExpression; + + public PropertyExpression(String name) + { + this.name = name; + jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name); + } + + public Object evaluate(AMQMessage message) throws JMSException + { +// try +// { +// if (message.isDropped()) +// { +// return null; +// } + + if (jmsPropertyExpression != null) + { + return jmsPropertyExpression.evaluate(message); + } +// try + else + { + + BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + + _logger.info("Looking up property:" + name); + _logger.info("Properties are:" + _properties.getHeaders().keySet()); + + return _properties.getHeaders().get(name); + } +// catch (IOException ioe) +// { +// JMSException exception = new JMSException("Could not get property: " + name + " reason: " + ioe.getMessage()); +// exception.initCause(ioe); +// throw exception; +// } +// } +// catch (IOException e) +// { +// JMSException exception = new JMSException(e.getMessage()); +// exception.initCause(e); +// throw exception; +// } + + } + + public String getName() + { + return name; + } + + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + return name; + } + + /** + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return name.hashCode(); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + + if (o == null || !this.getClass().equals(o.getClass())) + { + return false; + } + return name.equals(((PropertyExpression) o).name); + + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java new file mode 100644 index 0000000000..dc2c2c0e6c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class SimpleFilterManager implements FilterManager +{ + private final Logger _logger = LoggerFactory.getLogger(SimpleFilterManager.class); + + private final ConcurrentLinkedQueue _filters; + + public SimpleFilterManager() + { + _logger.debug("Creating SimpleFilterManager"); + _filters = new ConcurrentLinkedQueue(); + } + + public void add(MessageFilter filter) + { + _filters.add(filter); + } + + public void remove(MessageFilter filter) + { + _filters.remove(filter); + } + + public boolean allAllow(AMQMessage msg) + { + for (MessageFilter filter : _filters) + { + try + { + if (!filter.matches(msg)) + { + return false; + } + } + catch (JMSException e) + { + //fixme + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + return false; + } + } + return true; + } + + public boolean hasFilters() + { + return !_filters.isEmpty(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java new file mode 100644 index 0000000000..49ff147411 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java @@ -0,0 +1,263 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import java.math.BigDecimal; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import javax.jms.JMSException; + +/** + * An expression which performs an operation on two expression values + * + * @version $Revision$ + */ +public abstract class UnaryExpression implements Expression { + + private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE); + protected Expression right; + + public static Expression createNegate(Expression left) { + return new UnaryExpression(left) { + public Object evaluate(AMQMessage message) throws JMSException { + Object rvalue = right.evaluate(message); + if (rvalue == null) { + return null; + } + if (rvalue instanceof Number) { + return negate((Number) rvalue); + } + return null; + } + + public String getExpressionSymbol() { + return "-"; + } + }; + } + + public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) { + + // Use a HashSet if there are many elements. + Collection t; + if( elements.size()==0 ) + t=null; + else if( elements.size() < 5 ) + t = elements; + else { + t = new HashSet(elements); + } + final Collection inList = t; + + return new BooleanUnaryExpression(right) { + public Object evaluate(AMQMessage message) throws JMSException { + + Object rvalue = right.evaluate(message); + if (rvalue == null) { + return null; + } + if( rvalue.getClass()!=String.class ) + return null; + + if( (inList!=null && inList.contains(rvalue)) ^ not ) { + return Boolean.TRUE; + } else { + return Boolean.FALSE; + } + + } + + public String toString() { + StringBuffer answer = new StringBuffer(); + answer.append(right); + answer.append(" "); + answer.append(getExpressionSymbol()); + answer.append(" ( "); + + int count=0; + for (Iterator i = inList.iterator(); i.hasNext();) { + Object o = (Object) i.next(); + if( count!=0 ) { + answer.append(", "); + } + answer.append(o); + count++; + } + + answer.append(" )"); + return answer.toString(); + } + + public String getExpressionSymbol() { + if( not ) + return "NOT IN"; + else + return "IN"; + } + }; + } + + abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression { + public BooleanUnaryExpression(Expression left) { + super(left); + } + + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + }; + + + public static BooleanExpression createNOT(BooleanExpression left) { + return new BooleanUnaryExpression(left) { + public Object evaluate(AMQMessage message) throws JMSException { + Boolean lvalue = (Boolean) right.evaluate(message); + if (lvalue == null) { + return null; + } + return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE; + } + + public String getExpressionSymbol() { + return "NOT"; + } + }; + } + + public static BooleanExpression createXPath(final String xpath) { + return new XPathExpression(xpath); + } + + public static BooleanExpression createXQuery(final String xpath) { + return new XQueryExpression(xpath); + } + + public static BooleanExpression createBooleanCast(Expression left) { + return new BooleanUnaryExpression(left) { + public Object evaluate(AMQMessage message) throws JMSException { + Object rvalue = right.evaluate(message); + if (rvalue == null) + return null; + if (!rvalue.getClass().equals(Boolean.class)) + return Boolean.FALSE; + return ((Boolean)rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE; + } + + public String toString() { + return right.toString(); + } + + public String getExpressionSymbol() { + return ""; + } + }; + } + + private static Number negate(Number left) { + Class clazz = left.getClass(); + if (clazz == Integer.class) { + return new Integer(-left.intValue()); + } + else if (clazz == Long.class) { + return new Long(-left.longValue()); + } + else if (clazz == Float.class) { + return new Float(-left.floatValue()); + } + else if (clazz == Double.class) { + return new Double(-left.doubleValue()); + } + else if (clazz == BigDecimal.class) { + // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the + // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it + // as a Big decimal. But it gets Negated right away.. to here we try to covert it back + // to a Long. + BigDecimal bd = (BigDecimal)left; + bd = bd.negate(); + + if( BD_LONG_MIN_VALUE.compareTo(bd)==0 ) { + return new Long(Long.MIN_VALUE); + } + return bd; + } + else { + throw new RuntimeException("Don't know how to negate: "+left); + } + } + + public UnaryExpression(Expression left) { + this.right = left; + } + + public Expression getRight() { + return right; + } + + public void setRight(Expression expression) { + right = expression; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() { + return "(" + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) { + + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + abstract public String getExpressionSymbol(); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java new file mode 100644 index 0000000000..ab952b6fea --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java @@ -0,0 +1,130 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import javax.jms.JMSException; + +//import org.apache.activemq.command.Message; +//import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.qpid.server.queue.AMQMessage; + +/** + * Used to evaluate an XPath Expression in a JMS selector. + */ +public final class XPathExpression implements BooleanExpression { + + private static final Log log = LogFactory.getLog(XPathExpression.class); + private static final String EVALUATOR_SYSTEM_PROPERTY = "org.apache.qpid.server.filter.XPathEvaluatorClassName"; + private static final String DEFAULT_EVALUATOR_CLASS_NAME=XalanXPathEvaluator.class.getName(); + + private static final Constructor EVALUATOR_CONSTRUCTOR; + + static { + String cn = System.getProperty(EVALUATOR_SYSTEM_PROPERTY, DEFAULT_EVALUATOR_CLASS_NAME); + Constructor m = null; + try { + try { + m = getXPathEvaluatorConstructor(cn); + } catch (Throwable e) { + log.warn("Invalid "+XPathEvaluator.class.getName()+" implementation: "+cn+", reason: "+e,e); + cn = DEFAULT_EVALUATOR_CLASS_NAME; + try { + m = getXPathEvaluatorConstructor(cn); + } catch (Throwable e2) { + log.error("Default XPath evaluator could not be loaded",e); + } + } + } finally { + EVALUATOR_CONSTRUCTOR = m; + } + } + + private static Constructor getXPathEvaluatorConstructor(String cn) throws ClassNotFoundException, SecurityException, NoSuchMethodException { + Class c = XPathExpression.class.getClassLoader().loadClass(cn); + if( !XPathEvaluator.class.isAssignableFrom(c) ) { + throw new ClassCastException(""+c+" is not an instance of "+XPathEvaluator.class); + } + return c.getConstructor(new Class[]{String.class}); + } + + private final String xpath; + private final XPathEvaluator evaluator; + + static public interface XPathEvaluator { + public boolean evaluate(AMQMessage message) throws JMSException; + } + + XPathExpression(String xpath) { + this.xpath = xpath; + this.evaluator = createEvaluator(xpath); + } + + private XPathEvaluator createEvaluator(String xpath2) { + try { + return (XPathEvaluator)EVALUATOR_CONSTRUCTOR.newInstance(new Object[]{xpath}); + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if( cause instanceof RuntimeException ) { + throw (RuntimeException)cause; + } + throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e); + } catch (Throwable e) { + throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e); + } + } + + public Object evaluate(AMQMessage message) throws JMSException { +// try { +//FIXME this is flow to disk work +// if( message.isDropped() ) +// return null; + return evaluator.evaluate(message) ? Boolean.TRUE : Boolean.FALSE; +// } catch (IOException e) { +// +// JMSException exception = new JMSException(e.getMessage()); +// exception.initCause(e); +// throw exception; +// +// } + + } + + public String toString() { + return "XPATH "+ConstantExpression.encodeString(xpath); + } + + /** + * @param message + * @return true if the expression evaluates to Boolean.TRUE. + * @throws JMSException + */ + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java new file mode 100644 index 0000000000..53764cbf75 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java @@ -0,0 +1,56 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; + +import javax.jms.JMSException; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +/** + * Used to evaluate an XQuery Expression in a JMS selector. + */ +public final class XQueryExpression implements BooleanExpression { + private final String xpath; + + XQueryExpression(String xpath) { + super(); + this.xpath = xpath; + } + + public Object evaluate(AMQMessage message) throws JMSException { + return Boolean.FALSE; + } + + public String toString() { + return "XQUERY "+ConstantExpression.encodeString(xpath); + } + + /** + * @param message + * @return true if the expression evaluates to Boolean.TRUE. + * @throws JMSException + */ + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java new file mode 100644 index 0000000000..4b78fd18df --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java @@ -0,0 +1,99 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import java.io.StringReader; +import java.io.ByteArrayInputStream; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.TextMessage; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +//import org.apache.activemq.command.Message; +//import org.apache.activemq.util.ByteArrayInputStream; +import org.apache.xpath.CachedXPathAPI; +import org.apache.qpid.server.queue.AMQMessage; +import org.w3c.dom.Document; +import org.w3c.dom.traversal.NodeIterator; +import org.xml.sax.InputSource; + +public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator { + + private final String xpath; + + public XalanXPathEvaluator(String xpath) { + this.xpath = xpath; + } + + public boolean evaluate(AMQMessage m) throws JMSException { + if( m instanceof TextMessage ) { + String text = ((TextMessage)m).getText(); + return evaluate(text); + } else if ( m instanceof BytesMessage ) { + BytesMessage bm = (BytesMessage) m; + byte data[] = new byte[(int) bm.getBodyLength()]; + bm.readBytes(data); + return evaluate(data); + } + return false; + } + + private boolean evaluate(byte[] data) { + try { + + InputSource inputSource = new InputSource(new ByteArrayInputStream(data)); + + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setNamespaceAware(true); + DocumentBuilder dbuilder = factory.newDocumentBuilder(); + Document doc = dbuilder.parse(inputSource); + + CachedXPathAPI cachedXPathAPI = new CachedXPathAPI(); + NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath); + return iterator.nextNode()!=null; + + } catch (Throwable e) { + return false; + } + } + + private boolean evaluate(String text) { + try { + InputSource inputSource = new InputSource(new StringReader(text)); + + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setNamespaceAware(true); + DocumentBuilder dbuilder = factory.newDocumentBuilder(); + Document doc = dbuilder.parse(inputSource); + + // We should associated the cachedXPathAPI object with the message being evaluated + // since that should speedup subsequent xpath expressions. + CachedXPathAPI cachedXPathAPI = new CachedXPathAPI(); + NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath); + return iterator.nextNode()!=null; + } catch (Throwable e) { + return false; + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index d4c94061a0..bf282020ee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -21,10 +21,12 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -32,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.log4j.Logger; @@ -68,14 +71,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener _tokens = new HashSet(); private AMQProtocolSession _publisher; - private final BasicPublishBody _publishBody; + private final BasicPublishBody _publishBody; private ContentHeaderBody _contentHeaderBody; @@ -83,6 +89,8 @@ public class AMQMessage * messages published with the 'immediate' flag. */ private boolean _deliveredToConsumer; + private ConcurrentHashMap _decodedMessages; + private AtomicBoolean _taken; public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody) @@ -96,7 +104,9 @@ public class AMQMessage _publishBody = publishBody; _store = messageStore; _contentBodies = new LinkedList(); + _decodedMessages = new ConcurrentHashMap(); _storeWhenComplete = storeWhenComplete; + _taken = new AtomicBoolean(false); } public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody, @@ -107,6 +117,7 @@ public class AMQMessage _publishBody = publishBody; _contentHeaderBody = contentHeaderBody; _contentBodies = contentBodies; + _decodedMessages = new ConcurrentHashMap(); _messageId = messageId; _store = store; storeMessage(); @@ -271,7 +282,7 @@ public class AMQMessage { _store.removeMessage(_messageId); } - catch(AMQException e) + catch (AMQException e) { //to maintain consistency, we revert the count incrementReference(); @@ -292,7 +303,7 @@ public class AMQMessage public boolean checkToken(Object token) { - if(_tokens.contains(token)) + if (_tokens.contains(token)) { return true; } @@ -308,7 +319,7 @@ public class AMQMessage //if the message is not persistent or the queue is not durable //we will not need to recover the association and so do not //need to record it - if(isPersistent() && queue.isDurable()) + if (isPersistent() && queue.isDurable()) { _store.enqueueMessage(queue.getName(), _messageId); } @@ -318,7 +329,7 @@ public class AMQMessage { //only record associations where both queue and message will survive //a restart, so only need to remove association if this is the case - if(isPersistent() && queue.isDurable()) + if (isPersistent() && queue.isDurable()) { _store.dequeueMessage(queue.getName(), _messageId); } @@ -326,14 +337,14 @@ public class AMQMessage public boolean isPersistent() throws AMQException { - if(_contentHeaderBody == null) + if (_contentHeaderBody == null) { throw new AMQException("Cannot determine delivery mode of message. Content header not found."); } //todo remove literal values to a constant file such as AMQConstants in common return _contentHeaderBody.properties instanceof BasicContentHeaderProperties - &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; + && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; } public void setTxnBuffer(TxnBuffer buffer) @@ -352,8 +363,9 @@ public class AMQMessage * immediate delivery but has not been marked as delivered to a * consumer */ - public void checkDeliveredToConsumer() throws NoConsumersException{ - if(isImmediate() && !_deliveredToConsumer) + public void checkDeliveredToConsumer() throws NoConsumersException + { + if (isImmediate() && !_deliveredToConsumer) { throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies); } @@ -362,8 +374,64 @@ public class AMQMessage /** * Called when this message is delivered to a consumer. (used to * implement the 'immediate' flag functionality). + * And by selectors to determin if the message has already been sent */ - public void setDeliveredToConsumer(){ + public void setDeliveredToConsumer() + { _deliveredToConsumer = true; } + + /** + * Called selectors to determin if the message has already been sent + * @return _deliveredToConsumer + */ + public boolean getDeliveredToConsumer() + { + return _deliveredToConsumer; + } + + + public MessageDecorator getDecodedMessage(String type) + { + MessageDecorator msgtype = null; + + if (_decodedMessages != null) + { + msgtype = _decodedMessages.get(type); + + if (msgtype == null) + { + msgtype = decorateMessage(type); + } + } + + return msgtype; + } + + private MessageDecorator decorateMessage(String type) + { + MessageDecorator msgdec = null; + + if (type.equals(JMS_MESSAGE)) + { + msgdec = new JMSMessage(this); + } + + if (msgdec != null) + { + _decodedMessages.put(type, msgdec); + } + + return msgdec; + } + + public boolean taken() + { + return _taken.getAndSet(true); + } + + public void release() + { + _taken.set(false); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index f2ef97cf9a..e64daef690 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -187,16 +188,29 @@ public class AMQQueue implements Managable, Comparable _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; - //fixme - Pick one. - if (Boolean.getBoolean("concurrentdeliverymanager")) + //fixme - Make this configurable via the broker config.xml + if (System.getProperties().getProperty("deliverymanager") != null) { - _logger.info("Using ConcurrentDeliveryManager"); - _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); + if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager")) + { + _logger.info("Using ConcurrentSelectorDeliveryManager"); + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); + } + else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager")) + { + _logger.info("Using ConcurrentDeliveryManager"); + _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); + } + else + { + _logger.info("Using SynchronizedDeliveryManager"); + _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); + } } else { - _logger.info("Using SynchronizedDeliveryManager"); - _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); + _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager"); + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); } } @@ -348,12 +362,12 @@ public class AMQQueue implements Managable, Comparable _bindings.addBinding(routingKey, exchange); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks) + public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException { debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); - Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks); + Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters); _subscribers.addSubscriber(subscription); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java new file mode 100644 index 0000000000..d8bb6e1948 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -0,0 +1,352 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; +import org.apache.qpid.configuration.Configured; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.configuration.Configurator; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * Manages delivery of messages on behalf of a queue + */ +public class ConcurrentSelectorDeliveryManager implements DeliveryManager +{ + private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class); + + @Configured(path = "advanced.compressBufferOnQueue", + defaultValue = "false") + public boolean compressBufferOnQueue; + /** + * Holds any queued messages + */ + private final Queue _messages = new ConcurrentLinkedQueueAtomicSize(); + //private int _messageCount; + /** + * Ensures that only one asynchronous task is running for this manager at + * any time. + */ + private final AtomicBoolean _processing = new AtomicBoolean(); + /** + * The subscriptions on the queue to whom messages are delivered + */ + private final SubscriptionManager _subscriptions; + + /** + * A reference to the queue we are delivering messages for. We need this to be able + * to pass the code that handles acknowledgements a handle on the queue. + */ + private final AMQQueue _queue; + + + /** + * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced + * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered + * via the async thread. + *

+ * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. + */ + private ReentrantLock _lock = new ReentrantLock(); + + + ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) + { + + //Set values from configuration + Configurator.configure(this); + + if (compressBufferOnQueue) + { + _log.warn("Compressing Buffers on queue."); + } + + _subscriptions = subscriptions; + _queue = queue; + } + + + private boolean addMessageToQueue(AMQMessage msg) + { + // Shrink the ContentBodies to their actual size to save memory. + if (compressBufferOnQueue) + { + Iterator it = msg.getContentBodies().iterator(); + while (it.hasNext()) + { + ContentBody cb = (ContentBody) it.next(); + cb.reduceBufferToFit(); + } + } + + _messages.offer(msg); + + return true; + } + + + public boolean hasQueuedMessages() + { + _lock.lock(); + try + { + return !_messages.isEmpty(); + } + finally + { + _lock.unlock(); + } + } + + public int getQueueMessageCount() + { + return getMessageCount(); + } + + /** + * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. + * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. + * + * @return int the number of messages in the delivery queue. + */ + private int getMessageCount() + { + return _messages.size(); + } + + + public synchronized List getMessages() + { + return new ArrayList(_messages); + } + + public synchronized void removeAMessageFromTop() throws AMQException + { + AMQMessage msg = poll(); + if (msg != null) + { + msg.dequeue(_queue); + } + } + + public synchronized void clearAllMessages() throws AMQException + { + AMQMessage msg = poll(); + while (msg != null) + { + msg.dequeue(_queue); + msg = poll(); + } + } + + + private AMQMessage getNextMessage(Queue messages) + { + AMQMessage message = messages.peek(); + + while (message != null && message.taken()) + { + //remove the already taken message + messages.poll(); + // try the next message + message = messages.peek(); + } + return message; + } + + public void sendNextMessage(Subscription sub, Queue messageQueue, AMQQueue queue) + { + AMQMessage message = null; + try + { + message = getNextMessage(messageQueue); + + // message will be null if we have no messages in the messageQueue. + if (message == null) + { + return; + } + _log.info("Async Delivery Message:" + message + " to :" + sub); + + sub.send(message, queue); + message.setDeliveredToConsumer(); + + //remove sent message from our queue. + messageQueue.poll(); + } + catch (FailedDequeueException e) + { + message.release(); + _log.error("Unable to deliver message as dequeue failed: " + e, e); + } + } + + /** + * Only one thread should ever execute this method concurrently, but + * it can do so while other threads invoke deliver(). + */ + private void processQueue() + { + // Continue to process delivery while we haveSubscribers and messages + boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); + + while (hasSubscribers && hasQueuedMessages()) + { + for (Subscription sub : _subscriptions.getSubscriptions()) + { + if (!sub.isSuspended()) + { + if (sub.hasFilters()) + { + sendNextMessage(sub, sub.getPreDeliveryQueue(), _queue); + } + else + { + sendNextMessage(sub, _messages, _queue); + } + + hasSubscribers = true; + } + else + { + hasSubscribers = false; + } + } + } + } + + private AMQMessage poll() + { + return _messages.poll(); + } + + public void deliver(String name, AMQMessage msg) throws FailedDequeueException + { + _log.info("deliver :" + msg); + + //Check if we have someone to deliver the message to. + _lock.lock(); + try + { + Subscription s = _subscriptions.nextSubscriber(msg); + + if (s == null) //no-one can take the message right now. + { + _log.info("Testing Message(" + msg + ") for Queued Delivery"); + if (!msg.isImmediate()) + { + addMessageToQueue(msg); + + //release lock now message is on queue. + _lock.unlock(); + + //Pre Deliver to all subscriptions + _log.info("We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); + for (Subscription sub : _subscriptions.getSubscriptions()) + { + + // stop if the message gets delivered whilst PreDelivering if we have a shared queue. + if (_queue.isShared() && msg.getDeliveredToConsumer()) + { + _log.info("Stopping PreDelivery as message(" + msg + ") is already delivered."); + continue; + } + + // Only give the message to those that want them. + if (sub.hasFilters() && sub.hasInterest(msg)) + { + sub.enqueueForPreDelivery(msg); + } + } + } + } + else + { + //release lock now + _lock.unlock(); + + _log.info("Delivering Message:" + msg + " to(" + System.identityHashCode(s) + ") :" + s); + //Deliver the message + s.send(msg, _queue); + msg.setDeliveredToConsumer(); + } + } + finally + { + //ensure lock is released + if (_lock.isLocked()) + { + _lock.unlock(); + } + } + } + + Runner asyncDelivery = new Runner(); + + private class Runner implements Runnable + { + public void run() + { + boolean running = true; + while (running) + { + processQueue(); + + //Check that messages have not been added since we did our last peek(); + // Synchronize with the thread that adds to the queue. + // If the queue is still empty then we can exit + + if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) + { + running = false; + _processing.set(false); + } + } + } + } + + public void processAsync(Executor executor) + { + _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + + " Active:" + _subscriptions.hasActiveSubscribers() + + " Processing:" + _processing.get()); + + if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) + { + //are we already running? if so, don't re-run + if (_processing.compareAndSet(false, true)) + { + executor.execute(asyncDelivery); + } + } + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index 49f0a51bf2..523b5f06e9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.AMQException; + +import java.util.Queue; + public interface Subscription { void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException; @@ -27,4 +31,13 @@ public interface Subscription boolean isSuspended(); void queueDeleted(AMQQueue queue); + + boolean hasFilters(); + + boolean hasInterest(AMQMessage msg); + + Queue getPreDeliveryQueue(); + + void enqueueForPreDelivery(AMQMessage msg); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java index 0fd44e4fbc..f464384562 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.FieldTable; /** * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This @@ -32,6 +33,9 @@ import org.apache.qpid.AMQException; */ public interface SubscriptionFactory { + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) + throws AMQException; + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 5cad28b80d..79b0593f69 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -23,12 +23,18 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; +import java.util.Queue; + /** * Encapsulation of a supscription to a queue. *

@@ -48,29 +54,45 @@ public class SubscriptionImpl implements Subscription private final Object sessionKey; + private Queue _messages; + + /** * True if messages need to be acknowledged */ private final boolean _acks; + private FilterManager _filters; public static class Factory implements SubscriptionFactory { + public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException + { + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters); + } + public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null); } public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag); + return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null); } } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, String consumerTag, boolean acks) throws AMQException + { + this(channelId, protocolSession, consumerTag, acks, null); + } + + public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, + String consumerTag, boolean acks, FieldTable filters) + throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) @@ -83,6 +105,17 @@ public class SubscriptionImpl implements Subscription this.consumerTag = consumerTag; sessionKey = protocolSession.getKey(); _acks = acks; + _filters = FilterManagerFactory.createManager(filters); + + if (_filters != null) + { + _messages = new ConcurrentLinkedQueueAtomicSize(); + } + else + { + // Reference the DeliveryManager + _messages = null; + } } public SubscriptionImpl(int channel, AMQProtocolSession protocolSession, @@ -131,7 +164,7 @@ public class SubscriptionImpl implements Subscription { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. - + // By doing this _before_ the send we ensure that it // doesn't get sent if it can't be dequeued, preventing // duplicate delivery on recovery. @@ -178,6 +211,32 @@ public class SubscriptionImpl implements Subscription channel.queueDeleted(queue); } + public boolean hasFilters() + { + return _filters != null; + } + + public boolean hasInterest(AMQMessage msg) + { + return _filters.allAllow(msg); + } + + public Queue getPreDeliveryQueue() + { + return _messages; + } + + public void enqueueForPreDelivery(AMQMessage msg) + { + if (_messages != null) + { + _messages.offer(msg); + } + } + + + + private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java index 353b461c8d..4df88baebc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java @@ -20,12 +20,15 @@ */ package org.apache.qpid.server.queue; +import java.util.List; + /** * Abstraction of actor that will determine the subscriber to whom * a message will be sent. */ public interface SubscriptionManager { + public List getSubscriptions(); public boolean hasActiveSubscribers(); public Subscription nextSubscriber(AMQMessage msg); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 7cc3f5f719..a4afe18e4d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + import java.util.List; import java.util.ListIterator; import java.util.concurrent.CopyOnWriteArrayList; @@ -58,6 +60,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Remove the subscription, returning it if it was found + * * @param subscription * @return null if no match was found */ @@ -90,7 +93,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Return the next unsuspended subscription or null if not found. - * + *

* Performance note: * This method can scan all items twice when looking for a subscription that is not * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this @@ -105,31 +108,58 @@ class SubscriptionSet implements WeightedSubscriptionManager return null; } - try { - final Subscription result = nextSubscriber(); - if (result == null) { + try + { + final Subscription result = nextSubscriberImpl(msg); + if (result == null) + { _currentSubscriber = 0; - return nextSubscriber(); - } else { + return nextSubscriberImpl(msg); + } + else + { return result; } - } catch (IndexOutOfBoundsException e) { + } + catch (IndexOutOfBoundsException e) + { _currentSubscriber = 0; - return nextSubscriber(); + return nextSubscriber(msg); } } - private Subscription nextSubscriber() + private Subscription nextSubscriberImpl(AMQMessage msg) { final ListIterator iterator = _subscriptions.listIterator(_currentSubscriber); - while (iterator.hasNext()) { + while (iterator.hasNext()) + { Subscription subscription = iterator.next(); ++_currentSubscriber; subscriberScanned(); - if (!subscription.isSuspended()) { - return subscription; + + if (!subscription.isSuspended()) + { + if (!subscription.hasFilters()) + { + return subscription; + } + else + { + if (subscription.hasInterest(msg)) + { + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } + } + } } } + return null; } @@ -145,11 +175,19 @@ class SubscriptionSet implements WeightedSubscriptionManager return _subscriptions.isEmpty(); } + public List getSubscriptions() + { + return _subscriptions; + } + public boolean hasActiveSubscribers() { for (Subscription s : _subscriptions) { - if (!s.isSuspended()) return true; + if (!s.isSuspended()) + { + return true; + } } return false; } @@ -159,7 +197,10 @@ class SubscriptionSet implements WeightedSubscriptionManager int count = 0; for (Subscription s : _subscriptions) { - if (!s.isSuspended()) count++; + if (!s.isSuspended()) + { + count++; + } } return count; } @@ -177,7 +218,8 @@ class SubscriptionSet implements WeightedSubscriptionManager } } - int size() { + int size() + { return _subscriptions.size(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java index ea64952bc7..49b0111b67 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java @@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; */ class SynchronizedDeliveryManager implements DeliveryManager { - private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class); + private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class); /** * Holds any queued messages -- cgit v1.2.1