diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-19 15:03:28 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-19 15:03:28 +0000 |
| commit | 79d0c1361620c718574368c8e028073232228dfc (patch) | |
| tree | bc3523e7385e8864c33a8a16dcca7087d9f3fd34 /java | |
| parent | dfb0a9b16bbe71b8530d2228f575b67b6568cf41 (diff) | |
| download | qpid-python-79d0c1361620c718574368c8e028073232228dfc.tar.gz | |
updated for using jms message for filtering incoming messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@577326 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 96 |
1 files changed, 54 insertions, 42 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index a62095cdb2..8d320cd78f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -96,12 +96,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } // ----- Interface org.apache.qpidity.client.util.MessageListener - public void onMessage(Message message) + + /** + * @param jmsMessage this message has already been processed so can't redo preDeliver + * @param channelId + */ + public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId) { boolean messageOk = false; try { - messageOk = checkPreConditions(message); + messageOk = checkPreConditions(jmsMessage); } catch (AMQException e) { @@ -112,44 +117,51 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } catch (JMSException e1) { - // we should silently log thie exception as it only hanppens when the connection is closed - _logger.error("Exception when receiving message", e1); + // we should silently log thie exception as it only hanppens when the connection is closed + _logger.error("Exception when receiving message", e1); } } if (messageOk) { - int channelId = getSession().getChannelId(); - long deliveryId = message.getMessageTransferId(); - String consumerTag = getConsumerTag().toString(); - AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange()); - AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey()); - boolean redelivered = message.getDeliveryProperties().getRedelivered(); - UnprocessedMessage_0_10 newMessage = - new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered); - try - { - newMessage.receiveBody(message.readData()); - } - catch (IOException e) - { - getSession().getAMQConnection().exceptionReceived(e); - } - Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()}; - // if there is a replyto destination then we need to request the exchange info - if (!message.getMessageProperties().getReplyTo().getExchangeName().equals("")) - { - Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession() - .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName()); - ExchangeQueryResult res = future.get(); - // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* - String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo() - .getExchangeName() + "/" + message.getMessageProperties().getReplyTo() - .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey(); - newMessage.setReplyToURL(replyToUrl); - } - newMessage.setContentHeader(headers); - getSession().messageReceived(newMessage); + super.notifyMessage(jmsMessage, channelId); + } + } + + + public void onMessage(Message message) + { + boolean messageOk = false; + int channelId = getSession().getChannelId(); + long deliveryId = message.getMessageTransferId(); + String consumerTag = getConsumerTag().toString(); + AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange()); + AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey()); + boolean redelivered = message.getDeliveryProperties().getRedelivered(); + UnprocessedMessage_0_10 newMessage = + new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered); + try + { + newMessage.receiveBody(message.readData()); + } + catch (IOException e) + { + getSession().getAMQConnection().exceptionReceived(e); + } + Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()}; + // if there is a replyto destination then we need to request the exchange info + if (!message.getMessageProperties().getReplyTo().getExchangeName().equals("")) + { + Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession() + .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName()); + ExchangeQueryResult res = future.get(); + // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo() + .getExchangeName() + "/" + message.getMessageProperties().getReplyTo() + .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey(); + newMessage.setReplyToURL(replyToUrl); } + newMessage.setContentHeader(headers); + getSession().messageReceived(newMessage); // else ignore this message } @@ -201,7 +213,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By * @return true if the message matches the selector and can be acquired, false otherwise. * @throws AMQException If the message preConditions cannot be checked due to some internal error. */ - private boolean checkPreConditions(Message message) throws AMQException + private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException { boolean messageOk = true; // TODO Use a tag for fiding out if message filtering is done here or by the broker. @@ -258,12 +270,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By * @param message The message to be acknowledged * @throws AMQException If the message cannot be acquired due to some internal error. */ - private void acknowledgeMessage(Message message) throws AMQException + private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException { if (!_preAcquire) { RangeSet ranges = new RangeSet(); - ranges.add(message.getMessageTransferId()); + ranges.add(message.getDeliveryTag()); _0_10session.getQpidSession().messageAcknowledge(ranges); _0_10session.getCurrentException(); } @@ -275,12 +287,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By * @param message The message to be released * @throws AMQException If the message cannot be released due to some internal error. */ - private void releaseMessage(Message message) throws AMQException + private void releaseMessage(AbstractJMSMessage message) throws AMQException { if (_preAcquire) { RangeSet ranges = new RangeSet(); - ranges.add(message.getMessageTransferId()); + ranges.add(message.getDeliveryTag()); _0_10session.getQpidSession().messageRelease(ranges); _0_10session.getCurrentException(); } @@ -293,13 +305,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By * @return true if the message has been acquired, false otherwise. * @throws AMQException If the message cannot be acquired due to some internal error. */ - private boolean acquireMessage(Message message) throws AMQException + private boolean acquireMessage(AbstractJMSMessage message) throws AMQException { boolean result = false; if (!_preAcquire) { RangeSet ranges = new RangeSet(); - ranges.add(message.getMessageTransferId()); + ranges.add(message.getDeliveryTag()); _0_10session.getQpidSession() .messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE); |
