diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-15 10:59:02 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-15 10:59:02 +0000 |
| commit | e87fb503c248bc9069f21052221106c744f2865d (patch) | |
| tree | 94fd5686d11a007ec14f3d6ba3f337b9d57b8f0c | |
| parent | 143e0b952e77e5a2d487ffd7cad0fc82bdeeb45c (diff) | |
| download | qpid-python-e87fb503c248bc9069f21052221106c744f2865d.tar.gz | |
Changed handling of replyTo
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@584728 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 23 |
1 files changed, 18 insertions, 5 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 08eba25d16..c273205b2d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -147,17 +147,15 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By getSession().getAMQConnection().exceptionReceived(e); } Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()}; - // if there is a replyto destination then we need to request the exchange info + // if there is a replyto destination then we need to request the exchange info ReplyTo replyTo = message.getMessageProperties().getReplyTo(); if (replyTo != null && replyTo.getExchangeName() != null && !replyTo.getExchangeName().equals("")) { - Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession() - .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName()); - ExchangeQueryResult res = future.get(); // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* - String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo() + // the exchnage class will be set later from within the sesion thread + String replyToUrl = message.getMessageProperties().getReplyTo() .getExchangeName() + "/" + message.getMessageProperties().getReplyTo() .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey(); newMessage.setReplyToURL(replyToUrl); @@ -199,6 +197,21 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super.postDeliver(msg); } + void notifyMessage(UnprocessedMessage messageFrame, int channelId) + { + // if there is a replyto destination then we need to request the exchange info + String replyToURL = messageFrame.getReplyToURL() ; + if (replyToURL != null && ! replyToURL.equals("")) + { + String exchangeName = replyToURL.substring(0, replyToURL.indexOf('/')); + Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(exchangeName); + ExchangeQueryResult res = future.get(); + // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + String replyToUrl = res.getType() + "://" + replyToURL; + ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl); + } + super.notifyMessage(messageFrame, channelId); + } public AbstractJMSMessage createJMSMessageFromUnprocessedMessage( UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception |
