summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-10-15 10:59:02 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-10-15 10:59:02 +0000
commite87fb503c248bc9069f21052221106c744f2865d (patch)
tree94fd5686d11a007ec14f3d6ba3f337b9d57b8f0c
parent143e0b952e77e5a2d487ffd7cad0fc82bdeeb45c (diff)
downloadqpid-python-e87fb503c248bc9069f21052221106c744f2865d.tar.gz
Changed handling of replyTo
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@584728 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java23
1 files changed, 18 insertions, 5 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 08eba25d16..c273205b2d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -147,17 +147,15 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
getSession().getAMQConnection().exceptionReceived(e);
}
Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
- // if there is a replyto destination then we need to request the exchange info
+ // if there is a replyto destination then we need to request the exchange info
ReplyTo replyTo = message.getMessageProperties().getReplyTo();
if (replyTo != null &&
replyTo.getExchangeName() != null &&
!replyTo.getExchangeName().equals(""))
{
- Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
- .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
- ExchangeQueryResult res = future.get();
// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+ // the exchnage class will be set later from within the sesion thread
+ String replyToUrl = message.getMessageProperties().getReplyTo()
.getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
.getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
newMessage.setReplyToURL(replyToUrl);
@@ -199,6 +197,21 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
super.postDeliver(msg);
}
+ void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+ {
+ // if there is a replyto destination then we need to request the exchange info
+ String replyToURL = messageFrame.getReplyToURL() ;
+ if (replyToURL != null && ! replyToURL.equals(""))
+ {
+ String exchangeName = replyToURL.substring(0, replyToURL.indexOf('/'));
+ Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(exchangeName);
+ ExchangeQueryResult res = future.get();
+ // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ String replyToUrl = res.getType() + "://" + replyToURL;
+ ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
+ }
+ super.notifyMessage(messageFrame, channelId);
+ }
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception