summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java233
4 files changed, 223 insertions, 30 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 9003ec016c..f8025f9e86 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1552,7 +1552,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
- final boolean noConsume, final boolean autoClose);
+ final boolean noConsume, final boolean autoClose) throws JMSException;
/**
* Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 995f84bab9..34b63170c4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -79,6 +79,7 @@ public class AMQSession_0_10 extends AMQSession
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
+ * @param qpidConnection The qpid connection
*/
AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
@@ -107,6 +108,7 @@ public class AMQSession_0_10 extends AMQSession
* @param acknowledgeMode The acknoledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
+ * @param qpidConnection The connection
*/
AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
@@ -276,7 +278,7 @@ public class AMQSession_0_10 extends AMQSession
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
final FieldTable ft, final boolean noConsume,
- final boolean autoClose)
+ final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
@@ -304,8 +306,18 @@ public class AMQSession_0_10 extends AMQSession
boolean nowait, String messageSelector, AMQShortString tag)
throws AMQException, FailoverException
{
+ boolean preAcquire;
+ try
+ {
+ preAcquire = consumer.getMessageSelector() == null || !(consumer.getDestination() instanceof AMQQueue);
+ }
+ catch (JMSException e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
+ }
getQpidSession().messageSubscribe(queueName.toString(), tag.toString(), Session.TRANSFER_CONFIRM_MODE_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE :
+ Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 180a1e663c..8003f7b801 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -327,7 +327,7 @@ public class AMQSession_0_8 extends AMQSession
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft,
- final boolean noConsume, final boolean autoClose)
+ final boolean noConsume, final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 7876cc8e49..30040569bb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -27,10 +27,15 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.transport.ExchangeQueryResult;
import org.apache.qpidity.transport.Future;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.filter.JMSSelectorFilter;
import javax.jms.JMSException;
import java.io.IOException;
@@ -47,51 +52,105 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ /**
+ * The message selector filter associated with this consumer message selector
+ */
+ private MessageFilter _filter = null;
+
+ /**
+ * The underlying QpidSession
+ */
+ private AMQSession_0_10 _0_10session;
+
+ /**
+ * Indicates whether this consumer receives pre-acquired messages
+ */
+ private boolean _preAcquire = true;
+
+ //--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
-
+ _0_10session = (AMQSession_0_10) session;
+ if (messageSelector != null)
+ {
+ try
+ {
+ _filter = new JMSSelectorFilter(messageSelector);
+ }
+ catch (QpidException e)
+ {
+ throw new JMSException("cannot create consumer because of selector issue");
+ }
+ if (destination instanceof AMQQueue)
+ {
+ _preAcquire = false;
+ }
+ }
}
// ----- Interface org.apache.qpidity.client.util.MessageListener
public void onMessage(Message message)
{
- int channelId = getSession().getChannelId();
- long deliveryId = message.getMessageTransferId();
- String consumerTag = getConsumerTag().toString();
- AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
- AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
- boolean redelivered = message.getDeliveryProperties().getRedelivered();
- UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+ boolean messageOk = false;
try
{
- newMessage.receiveBody(message.readData());
+ messageOk = checkPreConditions(message);
}
- catch (IOException e)
+ catch (AMQException e)
{
- getSession().getAMQConnection().exceptionReceived(e);
+ try
+ {
+ getSession().getAMQConnection().getExceptionListener()
+ .onException(new JMSAMQException("Error when receiving message", e));
+ }
+ catch (JMSException e1)
+ {
+ // we should silently log thie exception as it only hanppens when the connection is closed
+ _logger.error("Exception when receiving message", e1);
+ }
}
- Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
- // if there is a replyto destination then we need to request the exchange info
- if (! message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
+ if (messageOk)
{
- Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
- .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
- ExchangeQueryResult res = future.get();
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
- .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
- .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
- newMessage.setReplyToURL(replyToUrl);
+ int channelId = getSession().getChannelId();
+ long deliveryId = message.getMessageTransferId();
+ String consumerTag = getConsumerTag().toString();
+ AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
+ AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
+ boolean redelivered = message.getDeliveryProperties().getRedelivered();
+ UnprocessedMessage_0_10 newMessage =
+ new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+ try
+ {
+ newMessage.receiveBody(message.readData());
+ }
+ catch (IOException e)
+ {
+ getSession().getAMQConnection().exceptionReceived(e);
+ }
+ Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+ // if there is a replyto destination then we need to request the exchange info
+ if (!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
+ {
+ Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
+ .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
+ ExchangeQueryResult res = future.get();
+ // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+ .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
+ .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
+ newMessage.setReplyToURL(replyToUrl);
+ }
+ newMessage.setContentHeader(headers);
+ getSession().messageReceived(newMessage);
}
- newMessage.setContentHeader(headers);
- getSession().messageReceived(newMessage);
+ // else ignore this message
}
//----- overwritten methods
@@ -130,6 +189,128 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
messageFrame.getExchange(), messageFrame.getRoutingKey(),
- messageFrame.getContentHeader(), messageFrame.getBodies(), messageFrame.getReplyToURL());
+ messageFrame.getContentHeader(), messageFrame.getBodies(),
+ messageFrame.getReplyToURL());
+ }
+
+ // private methods
+ /**
+ * Check whether a message can be delivered to this consumer.
+ *
+ * @param message The message to be checked.
+ * @return true if the message matches the selector and can be acquired, false otherwise.
+ * @throws AMQException If the message preConditions cannot be checked due to some internal error.
+ */
+ private boolean checkPreConditions(Message message) throws AMQException
+ {
+ boolean messageOk = true;
+ // TODO Use a tag for fiding out if message filtering is done here or by the broker.
+ try
+ {
+ if (getMessageSelector() != null)
+ {
+ messageOk = _filter.matches((javax.jms.Message) message);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);
+ }
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("messageOk " + messageOk);
+ _logger.debug("_preAcquire " + _preAcquire);
+ }
+ if (!messageOk && _preAcquire)
+ {
+ // this is the case for topics
+ // We need to ack this message
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to ack message");
+ }
+ acknowledgeMessage(message);
+ }
+ else if (!messageOk)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message not OK, releasing");
+ }
+ releaseMessage(message);
+ }
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
+ if (!_preAcquire && messageOk)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to acquire message");
+ }
+ messageOk = acquireMessage(message);
+ }
+ return messageOk;
+ }
+
+ /**
+ * Acknowledge a message
+ *
+ * @param message The message to be acknowledged
+ * @throws AMQException If the message cannot be acquired due to some internal error.
+ */
+ private void acknowledgeMessage(Message message) throws AMQException
+ {
+ if (!_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+ _0_10session.getQpidSession().messageAcknowledge(ranges);
+ _0_10session.getCurrentException();
+ }
+ }
+
+ /**
+ * Release a message
+ *
+ * @param message The message to be released
+ * @throws AMQException If the message cannot be released due to some internal error.
+ */
+ private void releaseMessage(Message message) throws AMQException
+ {
+ if (_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+ _0_10session.getQpidSession().messageRelease(ranges);
+ _0_10session.getCurrentException();
+ }
+ }
+
+ /**
+ * Acquire a message
+ *
+ * @param message The message to be acquired
+ * @return true if the message has been acquired, false otherwise.
+ * @throws AMQException If the message cannot be acquired due to some internal error.
+ */
+ private boolean acquireMessage(Message message) throws AMQException
+ {
+ boolean result = false;
+ if (!_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+
+ _0_10session.getQpidSession()
+ .messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+ _0_10session.getQpidSession().sync();
+ RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages();
+ if (acquired.size() > 0)
+ {
+ result = true;
+ }
+ _0_10session.getCurrentException();
+ }
+ return result;
}
} \ No newline at end of file