diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-05 15:12:50 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-05 15:12:50 +0000 |
| commit | 1548d94328342f17b95b538cb688f1a0ea907429 (patch) | |
| tree | a437e55e97ee90d969aebc2006a1a8ef8d6809bd /java/client | |
| parent | aa40aa0e853c459d09a27eef15e12091b9d1a50b (diff) | |
| download | qpid-python-1548d94328342f17b95b538cb688f1a0ea907429.tar.gz | |
Added message selector
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562885 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
13 files changed, 2129 insertions, 89 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java index 740a065d69..f94d26b854 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java @@ -33,8 +33,16 @@ import org.apache.qpidity.api.Message; */ public interface Session { + public static final short ACQUIRE_MODE_NO_ACQUIRE = 0; + public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1; + public static final short CONFIRM_MODE_REQUIRED = 1; + public static final short CONFIRM_MODE_NOT_REQUIRED = 0; + public static final short MESSAGE_FLOW_MODE_CREDIT = 0; + public static final short MESSAGE_FLOW_MODE_WINDOW = 1; + public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; + public static final short MESSAGE_FLOW_UNIT_BYTE = 1; - //------------------------------------------------------ + //------------------------------------------------------ // Session housekeeping methods //------------------------------------------------------ /** @@ -154,19 +162,27 @@ public interface Session * <ul> * <li> NO_LOCAL * <li> EXCLUSIVE - * <li> NO_ACQUIRE - * <li> CONFIRM * </ul> * <p> In the absence of a particular option, defaul values are: * <ul> * <li> NO_LOCAL = false * <li> EXCLUSIVE = false - * <li> PRE-ACCQUIRE - * <li> CONFIRM = false * </ul> * * @param queue The queue this receiver is receiving messages from. * @param destination The destination for the subscriber ,a.k.a the delivery tag. + * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + * <p/> + * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + * </ul> + * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired + * <p/> + * <li> pre-acquire (1): the message is acquired when the transfer starts + * </ul> * @param listener The listener for this destination. When big message are transfered then * it is recommended to use a {@link MessagePartListener}. * @param options Set of Options. @@ -174,8 +190,9 @@ public interface Session * on the providers implementation. * @throws QpidException If the session fails to create the receiver due to some error. */ - public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map<String, ?> filter, - Option... options) throws QpidException; + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, + MessagePartListener listener, Map<String, ?> filter, Option... options) + throws QpidException; /** * This method cancels a consumer. This does not affect already delivered messages, but it does @@ -262,7 +279,7 @@ public interface Session * further credit is received. * * @param destination The destination to stop. - * @throws QpidException If stopping fails due to some error. + * @throws QpidException If stopping fails due to some error. */ public void messageStop(String destination) throws QpidException; @@ -274,7 +291,7 @@ public interface Session * @param range Range of acknowledged messages. * @throws QpidException If the acknowledgement of the messages fails due to some error. */ - public void messageAcknowledge(Range... range) throws QpidException; + public void messageAcknowledge(Range<Long>... range) throws QpidException; /** * Reject ranges of acquired messages. @@ -284,7 +301,7 @@ public interface Session * @param range Range of rejected messages. * @throws QpidException If those messages cannot be rejected dus to some error */ - public void messageReject(Range... range) throws QpidException; + public void messageReject(Range<Long>... range) throws QpidException; /** * Try to acquire ranges of messages hence releasing them form the queue. @@ -298,7 +315,7 @@ public interface Session * @return Ranges of explicitly acquired messages. * @throws QpidException If this message cannot be acquired dus to some error */ - public Range[] messageAcquire(Range... range) throws QpidException; + public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException; /** * Give up responsibility for processing ranges of messages. @@ -307,7 +324,7 @@ public interface Session * @param range Ranges of messages to be released. * @throws QpidException If this message cannot be released dus to some error. */ - public void messageRelease(Range... range) throws QpidException; + public void messageRelease(Range<Long>... range) throws QpidException; // ----------------------------------------------- // Local transaction methods @@ -359,8 +376,8 @@ public interface Session * @throws QpidException If the session fails to declare the queue due to some error. * @see Option */ - public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, - Option... options) throws QpidException; + public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options) + throws QpidException; /** * Bind a queue with an exchange. @@ -370,8 +387,8 @@ public interface Session * @param routingKey The routing key. * @throws QpidException If the session fails to bind the queue due to some error. */ - public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws - QpidException; + public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) + throws QpidException; /** * Unbind a queue from an exchange. @@ -381,8 +398,8 @@ public interface Session * @param routingKey The routing key. * @throws QpidException If the session fails to unbind the queue due to some error. */ - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws - QpidException; + public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) + throws QpidException; /** * Purge a queue. i.e. delete all enqueued messages diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java index 0952d730e2..5641869e3e 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java @@ -20,6 +20,8 @@ package org.apache.qpid.nclient.jms; //import org.apache.qpid.nclient.api.MessageReceiver; import org.apache.qpid.nclient.jms.message.QpidMessage; +import org.apache.qpid.nclient.jms.filter.JMSSelectorFilter; +import org.apache.qpid.nclient.jms.filter.MessageFilter; import org.apache.qpid.nclient.impl.MessagePartListenerAdapter; import org.apache.qpid.nclient.MessagePartListener; import org.apache.qpidity.Range; @@ -33,7 +35,8 @@ import javax.jms.*; */ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { - public static final short MESSAGE_FLOW_MODE = 0; // we use message flow mode + // we can receive up to 100 messages for an asynchronous listener + public static final int MAX_MESSAGE_TRANSFERRED = 100; /** * This MessageConsumer's messageselector. @@ -41,6 +44,11 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer private String _messageSelector = null; /** + * The message selector filter associated with this consumer message selector + */ + private MessageFilter _filter = null; + + /** * NoLocal * If true, and the destination is a topic then inhibits the delivery of messages published * by its own connection. The behavior for NoLocal is not specified if the destination is a queue. @@ -53,6 +61,11 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer protected String _subscriptionName; /** + * Indicates whether this consumer receives pre-acquired messages + */ + private boolean _preAcquire = true; + + /** * A MessagePartListener set up for this consumer. */ private MessageListener _messageListener; @@ -71,7 +84,17 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * Indicates that this consumer is receiving a synch message */ private boolean _isReceiving = false; - + + /** + * Indicates that a nowait is receiving a message. + */ + private boolean _isNoWaitIsReceiving = false; + + /** + * Number of mesages received asynchronously + * Nether exceed MAX_MESSAGE_TRANSFERRED + */ + private int _messageAsyncrhonouslyReceived = 0; //----- Constructors /** @@ -89,7 +112,11 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer boolean noLocal, String subscriptionName) throws Exception { super(session, destination); - _messageSelector = messageSelector; + if (messageSelector != null) + { + _messageSelector = messageSelector; + _filter = new JMSSelectorFilter(messageSelector); + } _noLocal = noLocal; _subscriptionName = subscriptionName; _isStopped = getSession().isStopped(); @@ -102,24 +129,37 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * asynchronous or directly to this consumer when it is synchronously accessed. */ MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this)); - // we use the default options: EXCLUSIVE = false, PRE-ACCQUIRE and CONFIRM = off - if (_noLocal) - { - getSession().getQpidSession() - .messageSubscribe(destination.getName(), getMessageActorID(), messageAssembler, null, - Option.NO_LOCAL); - } - else + getSession().getQpidSession() + .messageSubscribe(destination.getName(), getMessageActorID(), + org.apache.qpid.nclient.Session.CONFIRM_MODE_NOT_REQUIRED, + // When the message selctor is set we do not acquire the messages + _messageSelector != null ? org.apache.qpid.nclient.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpid.nclient.Session.ACQUIRE_MODE_PRE_ACQUIRE, + messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION); + if (_messageSelector != null) { - getSession().getQpidSession() - .messageSubscribe(destination.getName(), getMessageActorID(), messageAssembler, null); + _preAcquire = false; } } else { // this is a topic we need to create a temporary queue for this consumer // unless this is a durable subscriber + if (subscriptionName != null) + { + // this ia a durable subscriber + // create a persistent queue for this subscriber + // getSession().getQpidSession().queueDeclare(destination.getName()); + } + else + { + // this is a non durable subscriber + // create a temporary queue + + } } + // set the flow mode + getSession().getQpidSession() + .messageFlowMode(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_MODE_CREDIT); } //----- Message consumer API @@ -168,7 +208,35 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // this method is synchronized as onMessage also access _messagelistener // onMessage, getMessageListener and this method are the only synchronized methods checkNotClosed(); - _messageListener = messageListener; + try + { + _messageListener = messageListener; + if (messageListener != null) + { + resetAsynchMessageReceived(); + } + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Contact the broker and ask for the delivery of MAX_MESSAGE_TRANSFERRED messages + * + * @throws QpidException If there is a communication error + */ + private void resetAsynchMessageReceived() throws QpidException + { + if (!_isStopped && _messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED) + { + getSession().getQpidSession().messageStop(getMessageActorID()); + } + _messageAsyncrhonouslyReceived = 0; + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, + MAX_MESSAGE_TRANSFERRED); } /** @@ -265,7 +333,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer if (!_isStopped) { // if this consumer is stopped then this will be call when starting - getSession().getQpidSession().messageFlow(getMessageActorID(), MESSAGE_FLOW_MODE, 1); + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); received = getSession().getQpidSession().messageFlush(getMessageActorID()); } if (!received && timeout < 0) @@ -275,6 +344,11 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } else { + // right we need to let onMessage know that a nowait is potentially waiting for a message + if (timeout < 0) + { + _isNoWaitIsReceiving = true; + } while (_incomingMessage == null && !_isClosed) { try @@ -295,9 +369,10 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer getSession().acknowledgeMessage(_incomingMessage); } _incomingMessage = null; - // We now release any message received for this consumer - _isReceiving = false; } + // We now release any message received for this consumer + _isReceiving = false; + _isNoWaitIsReceiving = false; } return result; } @@ -329,7 +404,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { // there is a synch call waiting for a message to be delivered // so tell the broker to deliver a message - getSession().getQpidSession().messageFlow(getMessageActorID(), MESSAGE_FLOW_MODE, 1); + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); getSession().getQpidSession().messageFlush(getMessageActorID()); } } @@ -344,64 +420,109 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { try { + // if there is a message selector then we need to evaluate it. + boolean messageOk = true; + if (_messageSelector != null) + { + messageOk = _filter.matches(message.getJMSMessage()); + } + // right now we need to acquire this message if needed + if (!_preAcquire && messageOk) + { + messageOk = acquireMessage(message); + } + // if this consumer is synchronous then set the current message and // notify the waiting thread if (_messageListener == null) { synchronized (_incomingMessageLock) { - if (_isReceiving) + if (messageOk) { - _incomingMessage = message; - _incomingMessageLock.notify(); + // we have received a proper message that we can deliver + if (_isReceiving) + { + _incomingMessage = message; + _incomingMessageLock.notify(); + } + else + { + // this message has been received after a received as returned + // we need to release it + releaseMessage(message); + } } else { - // this message has been received after a received as returned - // we need to release it - releaseMessage(message); + // oups the message did not match the selector or we did not manage to acquire it + // If the receiver is still waiting for a message + // then we need to request a new one from the server + if (_isReceiving) + { + getSession().getQpidSession() + .messageFlow(getMessageActorID(), + org.apache.qpid.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + boolean received = getSession().getQpidSession().messageFlush(getMessageActorID()); + if (!received && _isNoWaitIsReceiving) + { + // Right a message nowait is waiting for a message + // but no one can be delivered it then need to return + _incomingMessageLock.notify(); + } + } } } } else { - // This is an asynchronous message - // tell the session that a message is in process - getSession().preProcessMessage(message); - // If the session is transacted we need to ack the message first - // This is because a message is associated with its tx only when acked - if (getSession().getTransacted()) - { - getSession().acknowledgeMessage(message); - } - // The JMS specs says: - /* The result of a listener throwing a RuntimeException depends on the session?s - * acknowledgment mode. - ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message - * will be immediately redelivered. The number of times a JMS provider will - * redeliver the same message before giving up is provider-dependent. - ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered. - * --- Transacted Session - the next message for the listener is delivered. - * - * The number of time we try redelivering the message is 0 - **/ - try - { - _messageListener.onMessage(message.getJMSMessage()); - } - catch (RuntimeException re) + _messageAsyncrhonouslyReceived++; + if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED) { - // do nothing as this message will not be redelivered + // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages + resetAsynchMessageReceived(); } - // If the session has been recovered we then need to redelivered this message - if (getSession().isInRecovery()) + // only deliver the message if it is valid + if (messageOk) { - releaseMessage(message); - } - else if (!getSession().getTransacted()) - { - // Tell the jms Session to ack this message if required - getSession().acknowledgeMessage(message); + // This is an asynchronous message + // tell the session that a message is in process + getSession().preProcessMessage(message); + // If the session is transacted we need to ack the message first + // This is because a message is associated with its tx only when acked + if (getSession().getTransacted()) + { + getSession().acknowledgeMessage(message); + } + // The JMS specs says: + /* The result of a listener throwing a RuntimeException depends on the session?s + * acknowledgment mode. + ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message + * will be immediately redelivered. The number of times a JMS provider will + * redeliver the same message before giving up is provider-dependent. + ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered. + * --- Transacted Session - the next message for the listener is delivered. + * + * The number of time we try redelivering the message is 0 + **/ + try + { + _messageListener.onMessage(message.getJMSMessage()); + } + catch (RuntimeException re) + { + // do nothing as this message will not be redelivered + } + // If the session has been recovered we then need to redelivered this message + if (getSession().isInRecovery()) + { + releaseMessage(message); + } + else if (!getSession().getTransacted()) + { + // Tell the jms Session to ack this message if required + getSession().acknowledgeMessage(message); + } } } } @@ -415,27 +536,37 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * Release a message * * @param message The message to be released - * @throws JMSException If the message cannot be released due to some internal error. + * @throws QpidException If the message cannot be released due to some internal error. */ - private void releaseMessage(QpidMessage message) throws JMSException + private void releaseMessage(QpidMessage message) throws QpidException { - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); - try + if (_preAcquire) { + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); getSession().getQpidSession().messageRelease(range); } - catch (QpidException e) + } + + /** + * Acquire a message + * + * @param message The message to be acquired + * @return true if the message has been acquired, false otherwise. + * @throws QpidException If the message cannot be acquired due to some internal error. + */ + private boolean acquireMessage(QpidMessage message) throws QpidException + { + boolean result = false; + if (!_preAcquire) { - // notify the Exception listener - if (getSession().getConnection().getExceptionListener() != null) - { - getSession().getConnection().getExceptionListener() - .onException(ExceptionHelper.convertQpidExceptionToJMSException(e)); - } - if (_logger.isDebugEnabled()) + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + + Range<Long>[] rangeResult = getSession().getQpidSession().messageAcquire(range); + if (rangeResult.length > 0) { - _logger.debug("Excpetion when releasing message " + message, e); + result = rangeResult[0].getLower().compareTo(message.getMessageID()) == 0; } } + return result; } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ArithmeticExpression.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ArithmeticExpression.java new file mode 100644 index 0000000000..38201023cd --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ArithmeticExpression.java @@ -0,0 +1,270 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.nclient.jms.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// +import org.apache.qpidity.QpidException; + +import javax.jms.Message; + +/** + * An expression which performs an operation on two expression values + */ +public abstract class ArithmeticExpression extends BinaryExpression +{ + + protected static final int INTEGER = 1; + protected static final int LONG = 2; + protected static final int DOUBLE = 3; + + + public ArithmeticExpression(Expression left, Expression right) + { + super(left, right); + } + + public static Expression createPlus(Expression left, Expression right) + { + return new ArithmeticExpression(left, right) + { + protected Object evaluate(Object lvalue, Object rvalue) + { + if (lvalue instanceof String) + { + String text = (String) lvalue; + return text + rvalue; + } + else if (lvalue instanceof Number) + { + return plus((Number) lvalue, asNumber(rvalue)); + } + + throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "+"; + } + }; + } + + public static Expression createMinus(Expression left, Expression right) + { + return new ArithmeticExpression(left, right) + { + protected Object evaluate(Object lvalue, Object rvalue) + { + if (lvalue instanceof Number) + { + return minus((Number) lvalue, asNumber(rvalue)); + } + + throw new RuntimeException("Cannot call minus operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "-"; + } + }; + } + + public static Expression createMultiply(Expression left, Expression right) + { + return new ArithmeticExpression(left, right) + { + + protected Object evaluate(Object lvalue, Object rvalue) + { + if (lvalue instanceof Number) + { + return multiply((Number) lvalue, asNumber(rvalue)); + } + + throw new RuntimeException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "*"; + } + }; + } + + public static Expression createDivide(Expression left, Expression right) + { + return new ArithmeticExpression(left, right) + { + + protected Object evaluate(Object lvalue, Object rvalue) + { + if (lvalue instanceof Number) + { + return divide((Number) lvalue, asNumber(rvalue)); + } + + throw new RuntimeException("Cannot call divide operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "/"; + } + }; + } + + public static Expression createMod(Expression left, Expression right) + { + return new ArithmeticExpression(left, right) + { + + protected Object evaluate(Object lvalue, Object rvalue) + { + if (lvalue instanceof Number) + { + return mod((Number) lvalue, asNumber(rvalue)); + } + + throw new RuntimeException("Cannot call mod operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "%"; + } + }; + } + + protected Number plus(Number left, Number right) + { + switch (numberType(left, right)) + { + + case INTEGER: + return new Integer(left.intValue() + right.intValue()); + + case LONG: + return new Long(left.longValue() + right.longValue()); + + default: + return new Double(left.doubleValue() + right.doubleValue()); + } + } + + protected Number minus(Number left, Number right) + { + switch (numberType(left, right)) + { + + case INTEGER: + return new Integer(left.intValue() - right.intValue()); + + case LONG: + return new Long(left.longValue() - right.longValue()); + + default: + return new Double(left.doubleValue() - right.doubleValue()); + } + } + + protected Number multiply(Number left, Number right) + { + switch (numberType(left, right)) + { + + case INTEGER: + return new Integer(left.intValue() * right.intValue()); + + case LONG: + return new Long(left.longValue() * right.longValue()); + + default: + return new Double(left.doubleValue() * right.doubleValue()); + } + } + + protected Number divide(Number left, Number right) + { + return new Double(left.doubleValue() / right.doubleValue()); + } + + protected Number mod(Number left, Number right) + { + return new Double(left.doubleValue() % right.doubleValue()); + } + + private int numberType(Number left, Number right) + { + if (isDouble(left) || isDouble(right)) + { + return DOUBLE; + } + else if ((left instanceof Long) || (right instanceof Long)) + { + return LONG; + } + else + { + return INTEGER; + } + } + + private boolean isDouble(Number n) + { + return (n instanceof Float) || (n instanceof Double); + } + + protected Number asNumber(Object value) + { + if (value instanceof Number) + { + return (Number) value; + } + else + { + throw new RuntimeException("Cannot convert value: " + value + " into a number"); + } + } + + public Object evaluate(Message message) throws QpidException + { + Object lvalue = left.evaluate(message); + if (lvalue == null) + { + return null; + } + + Object rvalue = right.evaluate(message); + if (rvalue == null) + { + return null; + } + + return evaluate(lvalue, rvalue); + } + + /** + * @param lvalue + * @param rvalue + * @return + */ + protected abstract Object evaluate(Object lvalue, Object rvalue); + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BinaryExpression.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BinaryExpression.java new file mode 100644 index 0000000000..97a089c47b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BinaryExpression.java @@ -0,0 +1,106 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.nclient.jms.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +/** + * An expression which performs an operation on two expression values. + */ +public abstract class BinaryExpression implements Expression +{ + protected Expression left; + protected Expression right; + + public BinaryExpression(Expression left, Expression right) + { + this.left = left; + this.right = right; + } + + public Expression getLeft() + { + return left; + } + + public Expression getRight() + { + return right; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + + if ((o == null) || !this.getClass().equals(o.getClass())) + { + return false; + } + + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + public abstract String getExpressionSymbol(); + + /** + * @param expression + */ + public void setRight(Expression expression) + { + right = expression; + } + + /** + * @param expression + */ + public void setLeft(Expression expression) + { + left = expression; + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BooleanExpression.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BooleanExpression.java new file mode 100644 index 0000000000..97f3dd5504 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BooleanExpression.java @@ -0,0 +1,36 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.nclient.jms.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpidity.QpidException; + +import javax.jms.Message; + +/** + * A BooleanExpression is an expression that always + * produces a Boolean result. + */ +public interface BooleanExpression extends Expression +{ + + public boolean matches(Message message) throws QpidException; + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ComparisonExpression.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ComparisonExpression.java new file mode 100644 index 0000000000..3c0184d77e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ComparisonExpression.java @@ -0,0 +1,595 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpidity.QpidException; + +import javax.jms.Message; +import java.util.HashSet; +import java.util.List; +import java.util.regex.Pattern; + +/** + * A filter performing a comparison of two objects + */ +public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression +{ + + public static BooleanExpression createBetween(Expression value, Expression left, Expression right) + { + return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right)); + } + + public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) + { + return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right)); + } + + private static final HashSet REGEXP_CONTROL_CHARS = new HashSet(); + + static + { + REGEXP_CONTROL_CHARS.add(new Character('.')); + REGEXP_CONTROL_CHARS.add(new Character('\\')); + REGEXP_CONTROL_CHARS.add(new Character('[')); + REGEXP_CONTROL_CHARS.add(new Character(']')); + REGEXP_CONTROL_CHARS.add(new Character('^')); + REGEXP_CONTROL_CHARS.add(new Character('$')); + REGEXP_CONTROL_CHARS.add(new Character('?')); + REGEXP_CONTROL_CHARS.add(new Character('*')); + REGEXP_CONTROL_CHARS.add(new Character('+')); + REGEXP_CONTROL_CHARS.add(new Character('{')); + REGEXP_CONTROL_CHARS.add(new Character('}')); + REGEXP_CONTROL_CHARS.add(new Character('|')); + REGEXP_CONTROL_CHARS.add(new Character('(')); + REGEXP_CONTROL_CHARS.add(new Character(')')); + REGEXP_CONTROL_CHARS.add(new Character(':')); + REGEXP_CONTROL_CHARS.add(new Character('&')); + REGEXP_CONTROL_CHARS.add(new Character('<')); + REGEXP_CONTROL_CHARS.add(new Character('>')); + REGEXP_CONTROL_CHARS.add(new Character('=')); + REGEXP_CONTROL_CHARS.add(new Character('!')); + } + + static class LikeExpression extends UnaryExpression implements BooleanExpression + { + + Pattern likePattern; + + /** + * @param right + */ + public LikeExpression(Expression right, String like, int escape) + { + super(right); + + StringBuffer regexp = new StringBuffer(like.length() * 2); + regexp.append("\\A"); // The beginning of the input + for (int i = 0; i < like.length(); i++) + { + char c = like.charAt(i); + if (escape == (0xFFFF & c)) + { + i++; + if (i >= like.length()) + { + // nothing left to escape... + break; + } + + char t = like.charAt(i); + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & t)); + } + else if (c == '%') + { + regexp.append(".*?"); // Do a non-greedy match + } + else if (c == '_') + { + regexp.append("."); // match one + } + else if (REGEXP_CONTROL_CHARS.contains(new Character(c))) + { + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & c)); + } + else + { + regexp.append(c); + } + } + + regexp.append("\\z"); // The end of the input + + likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL); + } + + /** + * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol() + */ + public String getExpressionSymbol() + { + return "LIKE"; + } + + /** + * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext) + */ + public Object evaluate(Message message) throws QpidException + { + + Object rv = this.getRight().evaluate(message); + + if (rv == null) + { + return null; + } + + if (!(rv instanceof String)) + { + return + Boolean.FALSE; + // throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass()); + } + + return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE; + } + + public boolean matches(Message message) throws QpidException + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + } + + public static BooleanExpression createLike(Expression left, String right, String escape) + { + if ((escape != null) && (escape.length() != 1)) + { + throw new RuntimeException( + "The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape); + } + + int c = -1; + if (escape != null) + { + c = 0xFFFF & escape.charAt(0); + } + + return new LikeExpression(left, right, c); + } + + public static BooleanExpression createNotLike(Expression left, String right, String escape) + { + return UnaryExpression.createNOT(createLike(left, right, escape)); + } + + public static BooleanExpression createInFilter(Expression left, List elements) + { + + if (!(left instanceof PropertyExpression)) + { + throw new RuntimeException("Expected a property for In expression, got: " + left); + } + + return UnaryExpression.createInExpression((PropertyExpression) left, elements, false); + + } + + public static BooleanExpression createNotInFilter(Expression left, List elements) + { + + if (!(left instanceof PropertyExpression)) + { + throw new RuntimeException("Expected a property for In expression, got: " + left); + } + + return UnaryExpression.createInExpression((PropertyExpression) left, elements, true); + + } + + public static BooleanExpression createIsNull(Expression left) + { + return doCreateEqual(left, ConstantExpression.NULL); + } + + public static BooleanExpression createIsNotNull(Expression left) + { + return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL)); + } + + public static BooleanExpression createNotEqual(Expression left, Expression right) + { + return UnaryExpression.createNOT(createEqual(left, right)); + } + + public static BooleanExpression createEqual(Expression left, Expression right) + { + checkEqualOperand(left); + checkEqualOperand(right); + checkEqualOperandCompatability(left, right); + + return doCreateEqual(left, right); + } + + private static BooleanExpression doCreateEqual(Expression left, Expression right) + { + return new ComparisonExpression(left, right) + { + + public Object evaluate(Message message) throws QpidException + { + Object lv = left.evaluate(message); + Object rv = right.evaluate(message); + + // Iff one of the values is null + if ((lv == null) ^ (rv == null)) + { + return Boolean.FALSE; + } + + if ((lv == rv) || lv.equals(rv)) + { + return Boolean.TRUE; + } + + if ((lv instanceof Comparable) && (rv instanceof Comparable)) + { + return compare((Comparable) lv, (Comparable) rv); + } + + return Boolean.FALSE; + } + + protected boolean asBoolean(int answer) + { + return answer == 0; + } + + public String getExpressionSymbol() + { + return "="; + } + }; + } + + public static BooleanExpression createGreaterThan(final Expression left, final Expression right) + { + checkLessThanOperand(left); + checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + protected boolean asBoolean(int answer) + { + return answer > 0; + } + + public String getExpressionSymbol() + { + return ">"; + } + }; + } + + public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) + { + checkLessThanOperand(left); + checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + protected boolean asBoolean(int answer) + { + return answer >= 0; + } + + public String getExpressionSymbol() + { + return ">="; + } + }; + } + + public static BooleanExpression createLessThan(final Expression left, final Expression right) + { + checkLessThanOperand(left); + checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + + protected boolean asBoolean(int answer) + { + return answer < 0; + } + + public String getExpressionSymbol() + { + return "<"; + } + + }; + } + + public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) + { + checkLessThanOperand(left); + checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + + protected boolean asBoolean(int answer) + { + return answer <= 0; + } + + public String getExpressionSymbol() + { + return "<="; + } + }; + } + + /** + * Only Numeric expressions can be used in >, >=, < or <= expressions.s + * + * @param expr + */ + public static void checkLessThanOperand(Expression expr) + { + if (expr instanceof ConstantExpression) + { + Object value = ((ConstantExpression) expr).getValue(); + if (value instanceof Number) + { + return; + } + + // Else it's boolean or a String.. + throw new RuntimeException("Value '" + expr + "' cannot be compared."); + } + + if (expr instanceof BooleanExpression) + { + throw new RuntimeException("Value '" + expr + "' cannot be compared."); + } + } + + /** + * Validates that the expression can be used in == or <> expression. + * Cannot not be NULL TRUE or FALSE litterals. + * + * @param expr + */ + public static void checkEqualOperand(Expression expr) + { + if (expr instanceof ConstantExpression) + { + Object value = ((ConstantExpression) expr).getValue(); + if (value == null) + { + throw new RuntimeException("'" + expr + "' cannot be compared."); + } + } + } + + /** + * + * @param left + * @param right + */ + private static void checkEqualOperandCompatability(Expression left, Expression right) + { + if ((left instanceof ConstantExpression) && (right instanceof ConstantExpression)) + { + if ((left instanceof BooleanExpression) && !(right instanceof BooleanExpression)) + { + throw new RuntimeException("'" + left + "' cannot be compared with '" + right + "'"); + } + } + } + + /** + * @param left + * @param right + */ + public ComparisonExpression(Expression left, Expression right) + { + super(left, right); + } + + public Object evaluate(Message message) throws QpidException + { + Comparable lv = (Comparable) left.evaluate(message); + if (lv == null) + { + return null; + } + + Comparable rv = (Comparable) right.evaluate(message); + if (rv == null) + { + return null; + } + + return compare(lv, rv); + } + + protected Boolean compare(Comparable lv, Comparable rv) + { + Class lc = lv.getClass(); + Class rc = rv.getClass(); + // If the the objects are not of the same type, + // try to convert up to allow the comparison. + if (lc != rc) + { + if (lc == Byte.class) + { + if (rc == Short.class) + { + lv = new Short(((Number) lv).shortValue()); + } + else if (rc == Integer.class) + { + lv = new Integer(((Number) lv).intValue()); + } + else if (rc == Long.class) + { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) + { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) + { + lv = new Double(((Number) lv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Short.class) + { + if (rc == Integer.class) + { + lv = new Integer(((Number) lv).intValue()); + } + else if (rc == Long.class) + { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) + { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) + { + lv = new Double(((Number) lv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Integer.class) + { + if (rc == Long.class) + { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) + { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) + { + lv = new Double(((Number) lv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Long.class) + { + if (rc == Integer.class) + { + rv = new Long(((Number) rv).longValue()); + } + else if (rc == Float.class) + { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) + { + lv = new Double(((Number) lv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Float.class) + { + if (rc == Integer.class) + { + rv = new Float(((Number) rv).floatValue()); + } + else if (rc == Long.class) + { + rv = new Float(((Number) rv).floatValue()); + } + else if (rc == Double.class) + { + lv = new Double(((Number) lv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Double.class) + { + if (rc == Integer.class) + { + rv = new Double(((Number) rv).doubleValue()); + } + else if (rc == Long.class) + { + rv = new Double(((Number) rv).doubleValue()); + } + else if (rc == Float.class) + { + rv = new Float(((Number) rv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else + { + return Boolean.FALSE; + } + } + + return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE; + } + + protected abstract boolean asBoolean(int answer); + + public boolean matches(Message message) throws QpidException + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ConstantExpression.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ConstantExpression.java new file mode 100644 index 0000000000..6db57dffed --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ConstantExpression.java @@ -0,0 +1,212 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpidity.QpidException; + +import java.math.BigDecimal; + + +import javax.jms.Message; + +/** + * Represents a constant expression + */ +public class ConstantExpression implements Expression +{ + + static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression + { + public BooleanConstantExpression(Object value) + { + super(value); + } + + public boolean matches(Message message) throws QpidException + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + } + + public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null); + public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE); + public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE); + + private Object value; + + public static ConstantExpression createFromDecimal(String text) + { + + // Strip off the 'l' or 'L' if needed. + if (text.endsWith("l") || text.endsWith("L")) + { + text = text.substring(0, text.length() - 1); + } + + Number value; + try + { + value = new Long(text); + } + catch (NumberFormatException e) + { + // The number may be too big to fit in a long. + value = new BigDecimal(text); + } + + long l = value.longValue(); + if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) + { + value = new Integer(value.intValue()); + } + + return new ConstantExpression(value); + } + + public static ConstantExpression createFromHex(String text) + { + Number value = new Long(Long.parseLong(text.substring(2), 16)); + long l = value.longValue(); + if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) + { + value = new Integer(value.intValue()); + } + + return new ConstantExpression(value); + } + + public static ConstantExpression createFromOctal(String text) + { + Number value = new Long(Long.parseLong(text, 8)); + long l = value.longValue(); + if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) + { + value = new Integer(value.intValue()); + } + + return new ConstantExpression(value); + } + + public static ConstantExpression createFloat(String text) + { + Number value = new Double(text); + + return new ConstantExpression(value); + } + + public ConstantExpression(Object value) + { + this.value = value; + } + + public Object evaluate(Message message) throws QpidException + { + return value; + } + + public Object getValue() + { + return value; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + if (value == null) + { + return "NULL"; + } + + if (value instanceof Boolean) + { + return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE"; + } + + if (value instanceof String) + { + return encodeString((String) value); + } + + return value.toString(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + + if ((o == null) || !this.getClass().equals(o.getClass())) + { + return false; + } + + return toString().equals(o.toString()); + + } + + /** + * Encodes the value of string so that it looks like it would look like + * when it was provided in a selector. + * + * @param s + * @return + */ + public static String encodeString(String s) + { + StringBuffer b = new StringBuffer(); + b.append('\''); + for (int i = 0; i < s.length(); i++) + { + char c = s.charAt(i); + if (c == '\'') + { + b.append(c); + } + + b.append(c); + } + + b.append('\''); + + return b.toString(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/Expression.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/Expression.java new file mode 100644 index 0000000000..d016c1a03b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/Expression.java @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.nclient.jms.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpidity.QpidException; + +import javax.jms.Message; + +/** + * Represents an expression + */ +public interface Expression +{ + /** + * @param message The message to evaluate + * @return the value of this expression + */ + public Object evaluate(Message message) throws QpidException; +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/JMSSelectorFilter.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/JMSSelectorFilter.java new file mode 100644 index 0000000000..553bbf25c3 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/JMSSelectorFilter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.nclient.jms.filter; + +import org.apache.qpid.nclient.jms.filter.selector.SelectorParser; +import org.apache.qpidity.QpidException; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import javax.jms.Message; + + +public class JMSSelectorFilter implements MessageFilter +{ + /** + * this JMSSelectorFilter's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class); + + private String _selector; + private BooleanExpression _matcher; + + public JMSSelectorFilter(String selector) throws QpidException + { + _selector = selector; + if (_logger.isDebugEnabled()) + { + _logger.debug("Created JMSSelectorFilter with selector:" + _selector); + } + _matcher = new SelectorParser().parse(selector); + } + + public boolean matches(Message message) + { + try + { + boolean match = _matcher.matches(message); + if (_logger.isDebugEnabled()) + { + _logger.debug(message + " match(" + match + ") selector(" + System + .identityHashCode(_selector) + "):" + _selector); + } + return match; + } + catch (QpidException e) + { + _logger.warn("Caght exception when evaluating message selector for message " + message, e); + } + return false; + } + + public String getSelector() + { + return _selector; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/LogicExpression.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/LogicExpression.java new file mode 100644 index 0000000000..4856fa0555 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/LogicExpression.java @@ -0,0 +1,111 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.nclient.jms.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpidity.QpidException; + +import javax.jms.Message; + +/** + * A filter performing a comparison of two objects + */ +public abstract class LogicExpression extends BinaryExpression implements BooleanExpression +{ + + public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) + { + return new LogicExpression(lvalue, rvalue) + { + + public Object evaluate(Message message) throws QpidException + { + + Boolean lv = (Boolean) left.evaluate(message); + // Can we do an OR shortcut?? + if ((lv != null) && lv.booleanValue()) + { + return Boolean.TRUE; + } + + Boolean rv = (Boolean) right.evaluate(message); + + return (rv == null) ? null : rv; + } + + public String getExpressionSymbol() + { + return "OR"; + } + }; + } + + public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) + { + return new LogicExpression(lvalue, rvalue) + { + + public Object evaluate(Message message) throws QpidException + { + + Boolean lv = (Boolean) left.evaluate(message); + + // Can we do an AND shortcut?? + if (lv == null) + { + return null; + } + + if (!lv.booleanValue()) + { + return Boolean.FALSE; + } + + Boolean rv = (Boolean) right.evaluate(message); + + return (rv == null) ? null : rv; + } + + public String getExpressionSymbol() + { + return "AND"; + } + }; + } + + /** + * @param left + * @param right + */ + public LogicExpression(BooleanExpression left, BooleanExpression right) + { + super(left, right); + } + + public abstract Object evaluate(Message message) throws QpidException; + + public boolean matches(Message message) throws QpidException + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/MessageFilter.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/MessageFilter.java new file mode 100644 index 0000000000..f22ca27013 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/MessageFilter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.nclient.jms.filter; + +import org.apache.qpidity.QpidException; + +import javax.jms.Message; + +public interface MessageFilter +{ + boolean matches(Message message) throws QpidException; +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/PropertyExpression.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/PropertyExpression.java new file mode 100644 index 0000000000..24b5868bbc --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/PropertyExpression.java @@ -0,0 +1,93 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient.jms.filter; + +import org.apache.qpidity.QpidException; +import org.slf4j.LoggerFactory; + +import javax.jms.Message; +import java.lang.reflect.Method; + +/** + * Represents a property expression + */ +public class PropertyExpression implements Expression +{ + private static final org.slf4j.Logger _logger = LoggerFactory.getLogger(PropertyExpression.class); + + private Method _getter; + + public PropertyExpression(String name) + { + Class clazz = Message.class; + try + { + _getter = clazz.getMethod("get" + name, null); + } + catch (NoSuchMethodException e) + { + _logger.warn("Cannot compare property: " + name, e); + } + } + + public Object evaluate(Message message) throws QpidException + { + Object result = null; + if( _getter != null ) + { + try + { + result = _getter.invoke(message, null); + } + catch (Exception e) + { + throw new QpidException("cannot evaluate property ", "message selector", e); + } + } + return result; + } + + /** + * @see Object#toString() + */ + public String toString() + { + return _getter.toString(); + } + + /** + * @see Object#hashCode() + */ + public int hashCode() + { + return _getter.hashCode(); + } + + /** + * @see Object#equals(Object) + */ + public boolean equals(Object o) + { + if ((o == null) || !this.getClass().equals(o.getClass())) + { + return false; + } + return _getter.equals(((PropertyExpression) o)._getter); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/UnaryExpression.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/UnaryExpression.java new file mode 100644 index 0000000000..df70a27a52 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/UnaryExpression.java @@ -0,0 +1,328 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpidity.QpidException; + +import java.math.BigDecimal; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import javax.jms.Message; + +/** + * An expression which performs an operation on two expression values + */ +public abstract class UnaryExpression implements Expression +{ + + private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE); + protected Expression right; + + public static Expression createNegate(Expression left) + { + return new UnaryExpression(left) + { + public Object evaluate(Message message) throws QpidException + { + Object rvalue = right.evaluate(message); + if (rvalue == null) + { + return null; + } + + if (rvalue instanceof Number) + { + return negate((Number) rvalue); + } + + return null; + } + + public String getExpressionSymbol() + { + return "-"; + } + }; + } + + public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) + { + + // Use a HashSet if there are many elements. + Collection t; + if (elements.size() == 0) + { + t = null; + } + else if (elements.size() < 5) + { + t = elements; + } + else + { + t = new HashSet(elements); + } + + final Collection inList = t; + + return new BooleanUnaryExpression(right) + { + public Object evaluate(Message message) throws QpidException + { + + Object rvalue = right.evaluate(message); + if (rvalue == null) + { + return null; + } + + if (rvalue.getClass() != String.class) + { + return null; + } + + if (((inList != null) && inList.contains(rvalue)) ^ not) + { + return Boolean.TRUE; + } + else + { + return Boolean.FALSE; + } + + } + + public String toString() + { + StringBuffer answer = new StringBuffer(); + answer.append(right); + answer.append(" "); + answer.append(getExpressionSymbol()); + answer.append(" ( "); + + int count = 0; + for (Iterator i = inList.iterator(); i.hasNext();) + { + Object o = (Object) i.next(); + if (count != 0) + { + answer.append(", "); + } + + answer.append(o); + count++; + } + + answer.append(" )"); + + return answer.toString(); + } + + public String getExpressionSymbol() + { + if (not) + { + return "NOT IN"; + } + else + { + return "IN"; + } + } + }; + } + + abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression + { + public BooleanUnaryExpression(Expression left) + { + super(left); + } + + public boolean matches(Message message) throws QpidException + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + } + + ; + + public static BooleanExpression createNOT(BooleanExpression left) + { + return new BooleanUnaryExpression(left) + { + public Object evaluate(Message message) throws QpidException + { + Boolean lvalue = (Boolean) right.evaluate(message); + if (lvalue == null) + { + return null; + } + + return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE; + } + + public String getExpressionSymbol() + { + return "NOT"; + } + }; + } + public static BooleanExpression createBooleanCast(Expression left) + { + return new BooleanUnaryExpression(left) + { + public Object evaluate(Message message) throws QpidException + { + Object rvalue = right.evaluate(message); + if (rvalue == null) + { + return null; + } + + if (!rvalue.getClass().equals(Boolean.class)) + { + return Boolean.FALSE; + } + + return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE; + } + + public String toString() + { + return right.toString(); + } + + public String getExpressionSymbol() + { + return ""; + } + }; + } + + private static Number negate(Number left) + { + Class clazz = left.getClass(); + if (clazz == Integer.class) + { + return new Integer(-left.intValue()); + } + else if (clazz == Long.class) + { + return new Long(-left.longValue()); + } + else if (clazz == Float.class) + { + return new Float(-left.floatValue()); + } + else if (clazz == Double.class) + { + return new Double(-left.doubleValue()); + } + else if (clazz == BigDecimal.class) + { + // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the + // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it + // as a Big decimal. But it gets Negated right away.. to here we try to covert it back + // to a Long. + BigDecimal bd = (BigDecimal) left; + bd = bd.negate(); + + if (BD_LONG_MIN_VALUE.compareTo(bd) == 0) + { + return new Long(Long.MIN_VALUE); + } + + return bd; + } + else + { + throw new RuntimeException("Don't know how to negate: " + left); + } + } + + public UnaryExpression(Expression left) + { + this.right = left; + } + + public Expression getRight() + { + return right; + } + + public void setRight(Expression expression) + { + right = expression; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + return "(" + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + + if ((o == null) || !this.getClass().equals(o.getClass())) + { + return false; + } + + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + public abstract String getExpressionSymbol(); + +} |
