From 0f9446a6014f1ebeb97e52fe2eb9933f0f15f3ab Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Tue, 18 Sep 2007 18:33:39 +0000 Subject: added message selector evaluation (for 0_10 only) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@577011 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 2 +- .../org/apache/qpid/client/AMQSession_0_10.java | 16 +- .../org/apache/qpid/client/AMQSession_0_8.java | 2 +- .../qpid/client/BasicMessageConsumer_0_10.java | 233 ++++++++++++++++++--- 4 files changed, 223 insertions(+), 30 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9003ec016c..f8025f9e86 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1552,7 +1552,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, - final boolean noConsume, final boolean autoClose); + final boolean noConsume, final boolean autoClose) throws JMSException; /** * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 995f84bab9..34b63170c4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -79,6 +79,7 @@ public class AMQSession_0_10 extends AMQSession * @param messageFactoryRegistry The message factory factory for the session. * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. + * @param qpidConnection The qpid connection */ AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, @@ -107,6 +108,7 @@ public class AMQSession_0_10 extends AMQSession * @param acknowledgeMode The acknoledgement mode for the session. * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. + * @param qpidConnection The connection */ AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) @@ -276,7 +278,7 @@ public class AMQSession_0_10 extends AMQSession final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft, final boolean noConsume, - final boolean autoClose) + final boolean autoClose) throws JMSException { final AMQProtocolHandler protocolHandler = getProtocolHandler(); @@ -304,8 +306,18 @@ public class AMQSession_0_10 extends AMQSession boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException { + boolean preAcquire; + try + { + preAcquire = consumer.getMessageSelector() == null || !(consumer.getDestination() instanceof AMQQueue); + } + catch (JMSException e) + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); + } getQpidSession().messageSubscribe(queueName.toString(), tag.toString(), Session.TRANSFER_CONFIRM_MODE_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, + preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : + Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 180a1e663c..8003f7b801 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -327,7 +327,7 @@ public class AMQSession_0_8 extends AMQSession public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft, - final boolean noConsume, final boolean autoClose) + final boolean noConsume, final boolean autoClose) throws JMSException { final AMQProtocolHandler protocolHandler = getProtocolHandler(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 7876cc8e49..30040569bb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -27,10 +27,15 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpidity.api.Message; import org.apache.qpidity.transport.Struct; import org.apache.qpidity.transport.ExchangeQueryResult; import org.apache.qpidity.transport.Future; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.filter.MessageFilter; +import org.apache.qpidity.filter.JMSSelectorFilter; import javax.jms.JMSException; import java.io.IOException; @@ -47,51 +52,105 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer future = ((AMQSession_0_10) getSession()).getQpidSession() - .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName()); - ExchangeQueryResult res = future.get(); - // :///[]/[]?