diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-15 10:59:02 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-15 10:59:02 +0000 |
| commit | 404e81857662ac1775b79d14d07b80dc9740a41f (patch) | |
| tree | 48f36cf22df8401f12a487f246f28d86740a6b3f | |
| parent | b0f78fc025d36d39863049e9c05a6c9926c8eaab (diff) | |
| download | qpid-python-404e81857662ac1775b79d14d07b80dc9740a41f.tar.gz | |
Changed handling of replyTo
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@584728 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 23 |
1 files changed, 18 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 08eba25d16..c273205b2d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -147,17 +147,15 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By getSession().getAMQConnection().exceptionReceived(e); } Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()}; - // if there is a replyto destination then we need to request the exchange info + // if there is a replyto destination then we need to request the exchange info ReplyTo replyTo = message.getMessageProperties().getReplyTo(); if (replyTo != null && replyTo.getExchangeName() != null && !replyTo.getExchangeName().equals("")) { - Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession() - .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName()); - ExchangeQueryResult res = future.get(); // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* - String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo() + // the exchnage class will be set later from within the sesion thread + String replyToUrl = message.getMessageProperties().getReplyTo() .getExchangeName() + "/" + message.getMessageProperties().getReplyTo() .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey(); newMessage.setReplyToURL(replyToUrl); @@ -199,6 +197,21 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super.postDeliver(msg); } + void notifyMessage(UnprocessedMessage messageFrame, int channelId) + { + // if there is a replyto destination then we need to request the exchange info + String replyToURL = messageFrame.getReplyToURL() ; + if (replyToURL != null && ! replyToURL.equals("")) + { + String exchangeName = replyToURL.substring(0, replyToURL.indexOf('/')); + Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(exchangeName); + ExchangeQueryResult res = future.get(); + // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + String replyToUrl = res.getType() + "://" + replyToURL; + ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl); + } + super.notifyMessage(messageFrame, channelId); + } public AbstractJMSMessage createJMSMessageFromUnprocessedMessage( UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception |
