summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-09-19 15:03:28 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-09-19 15:03:28 +0000
commit79d0c1361620c718574368c8e028073232228dfc (patch)
treebc3523e7385e8864c33a8a16dcca7087d9f3fd34 /java
parentdfb0a9b16bbe71b8530d2228f575b67b6568cf41 (diff)
downloadqpid-python-79d0c1361620c718574368c8e028073232228dfc.tar.gz
updated for using jms message for filtering incoming messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@577326 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java96
1 files changed, 54 insertions, 42 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index a62095cdb2..8d320cd78f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -96,12 +96,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
// ----- Interface org.apache.qpidity.client.util.MessageListener
- public void onMessage(Message message)
+
+ /**
+ * @param jmsMessage this message has already been processed so can't redo preDeliver
+ * @param channelId
+ */
+ public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
{
boolean messageOk = false;
try
{
- messageOk = checkPreConditions(message);
+ messageOk = checkPreConditions(jmsMessage);
}
catch (AMQException e)
{
@@ -112,44 +117,51 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
catch (JMSException e1)
{
- // we should silently log thie exception as it only hanppens when the connection is closed
- _logger.error("Exception when receiving message", e1);
+ // we should silently log thie exception as it only hanppens when the connection is closed
+ _logger.error("Exception when receiving message", e1);
}
}
if (messageOk)
{
- int channelId = getSession().getChannelId();
- long deliveryId = message.getMessageTransferId();
- String consumerTag = getConsumerTag().toString();
- AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
- AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
- boolean redelivered = message.getDeliveryProperties().getRedelivered();
- UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
- try
- {
- newMessage.receiveBody(message.readData());
- }
- catch (IOException e)
- {
- getSession().getAMQConnection().exceptionReceived(e);
- }
- Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
- // if there is a replyto destination then we need to request the exchange info
- if (!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
- {
- Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
- .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
- ExchangeQueryResult res = future.get();
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
- .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
- .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
- newMessage.setReplyToURL(replyToUrl);
- }
- newMessage.setContentHeader(headers);
- getSession().messageReceived(newMessage);
+ super.notifyMessage(jmsMessage, channelId);
+ }
+ }
+
+
+ public void onMessage(Message message)
+ {
+ boolean messageOk = false;
+ int channelId = getSession().getChannelId();
+ long deliveryId = message.getMessageTransferId();
+ String consumerTag = getConsumerTag().toString();
+ AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
+ AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
+ boolean redelivered = message.getDeliveryProperties().getRedelivered();
+ UnprocessedMessage_0_10 newMessage =
+ new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+ try
+ {
+ newMessage.receiveBody(message.readData());
+ }
+ catch (IOException e)
+ {
+ getSession().getAMQConnection().exceptionReceived(e);
+ }
+ Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+ // if there is a replyto destination then we need to request the exchange info
+ if (!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
+ {
+ Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
+ .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
+ ExchangeQueryResult res = future.get();
+ // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+ .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
+ .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
+ newMessage.setReplyToURL(replyToUrl);
}
+ newMessage.setContentHeader(headers);
+ getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -201,7 +213,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
* @return true if the message matches the selector and can be acquired, false otherwise.
* @throws AMQException If the message preConditions cannot be checked due to some internal error.
*/
- private boolean checkPreConditions(Message message) throws AMQException
+ private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
{
boolean messageOk = true;
// TODO Use a tag for fiding out if message filtering is done here or by the broker.
@@ -258,12 +270,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
* @param message The message to be acknowledged
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private void acknowledgeMessage(Message message) throws AMQException
+ private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
{
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
+ ranges.add(message.getDeliveryTag());
_0_10session.getQpidSession().messageAcknowledge(ranges);
_0_10session.getCurrentException();
}
@@ -275,12 +287,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
* @param message The message to be released
* @throws AMQException If the message cannot be released due to some internal error.
*/
- private void releaseMessage(Message message) throws AMQException
+ private void releaseMessage(AbstractJMSMessage message) throws AMQException
{
if (_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
+ ranges.add(message.getDeliveryTag());
_0_10session.getQpidSession().messageRelease(ranges);
_0_10session.getCurrentException();
}
@@ -293,13 +305,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
* @return true if the message has been acquired, false otherwise.
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private boolean acquireMessage(Message message) throws AMQException
+ private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
{
boolean result = false;
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
+ ranges.add(message.getDeliveryTag());
_0_10session.getQpidSession()
.messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);