diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-08-20 13:48:00 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-08-20 13:48:00 +0000 |
| commit | 5dd92ba051ee9c7c794a0545dc43316720b66b71 (patch) | |
| tree | 2b80628b4bdf8327c46f50d9ae9ad1268d9e47e6 /java/client/src/main | |
| parent | 377a0c06c172d8f4df10a01b604bb67bb0d5e80d (diff) | |
| download | qpid-python-5dd92ba051ee9c7c794a0545dc43316720b66b71.tar.gz | |
This is a fix for QPID-2809
This is also a workaround for QPID-2808
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@987505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
3 files changed, 150 insertions, 52 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 0dc702dcbc..c6bc1bd622 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -62,6 +62,8 @@ public abstract class AMQDestination implements Destination, Referenceable protected boolean _isAutoDelete; private boolean _browseOnly; + + private boolean _isAddressResolved; private AMQShortString _queueName; @@ -312,6 +314,11 @@ public abstract class AMQDestination implements Destination, Referenceable return _destSyntax; } + protected void setDestSyntax(DestSyntax syntax) + { + _destSyntax = syntax; + } + public AMQShortString getEncodedName() { if(_urlAsShortString == null) @@ -736,6 +743,10 @@ public abstract class AMQDestination implements Destination, Referenceable return _address; } + protected void setAddress(Address addr) { + _address = addr; + } + public int getAddressType(){ return _addressType; } @@ -747,6 +758,10 @@ public abstract class AMQDestination implements Destination, Referenceable public String getAddressName() { return _name; } + + public void setAddressName(String name){ + _name = name; + } public String getSubject() { return _subject; @@ -763,15 +778,23 @@ public abstract class AMQDestination implements Destination, Referenceable public void setCreate(AddressOption option) { _create = option; } - + public AddressOption getAssert() { return _assert; } + public void setAssert(AddressOption option) { + _assert = option; + } + public AddressOption getDelete() { return _delete; } - + + public void setDelete(AddressOption option) { + _delete = option; + } + public Node getTargetNode() { return _targetNode; @@ -817,6 +840,16 @@ public abstract class AMQDestination implements Destination, Referenceable this._routingKey = rk; } + public boolean isAddressResolved() + { + return _isAddressResolved; + } + + public void setAddressResolved(boolean addressResolved) + { + _isAddressResolved = addressResolved; + } + private static Address createAddressFromString(String str) { return Address.parse(str); @@ -861,4 +894,38 @@ public abstract class AMQDestination implements Destination, Referenceable { return _browseOnly; } + + public void setBrowseOnly(boolean b) + { + _browseOnly = b; + } + + public AMQDestination copyDestination() + { + AMQDestination dest = + new AMQAnyDestination(_exchangeName, + _exchangeClass, + _routingKey, + _isExclusive, + _isAutoDelete, + _queueName, + _isDurable, + _bindingKeys + ); + + dest.setDestSyntax(_destSyntax); + dest.setAddress(_address); + dest.setAddressName(_name); + dest.setSubject(_subject); + dest.setCreate(_create); + dest.setAssert(_assert); + dest.setDelete(_create); + dest.setBrowseOnly(_browseOnly); + dest.setAddressType(_addressType); + dest.setTargetNode(_targetNode); + dest.setSourceNode(_sourceNode); + dest.setLink(_link); + dest.setAddressResolved(_isAddressResolved); + return dest; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 430a4bd9e9..0563276457 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -592,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (isTopic || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); } - + getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, @@ -1168,62 +1168,80 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic boolean isConsumer, boolean noWait) throws AMQException { - boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || - (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || - (!isConsumer && dest.getAssert() == AddressOption.SENDER); - - boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || - (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || - (!isConsumer && dest.getCreate() == AddressOption.SENDER); - - - - int type = resolveAddressType(dest); - - switch (type) - { - case AMQDestination.QUEUE_TYPE: + if (dest.isAddressResolved()) + { + if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) { - if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode)) - { - setLegacyFiledsForQueueType(dest); - break; - } - else if(createNode) - { - setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,null,false,noWait); - break; - } + createSubscriptionQueue(dest); } + } + else + { + boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || + (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || + (!isConsumer && dest.getAssert() == AddressOption.SENDER); + + boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || + (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || + (!isConsumer && dest.getCreate() == AddressOption.SENDER); + - case AMQDestination.TOPIC_TYPE: + + int type = resolveAddressType(dest); + + switch (type) { - if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode)) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - if (isConsumer) {createSubscriptionQueue(dest);} - break; + case AMQDestination.QUEUE_TYPE: + { + if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode)) + { + setLegacyFiledsForQueueType(dest); + break; + } + else if(createNode) + { + setLegacyFiledsForQueueType(dest); + send0_10QueueDeclare(dest,null,false,noWait); + break; + } } - else if(createNode) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - sendExchangeDeclare(dest.getAddressName(), - dest.getExchangeClass().asString(), - dest.getTargetNode().getAlternateExchange(), - dest.getTargetNode().getDeclareArgs(), - false); - if (isConsumer) {createSubscriptionQueue(dest);} - break; + + case AMQDestination.TOPIC_TYPE: + { + if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode)) + { + setLegacyFiledsForTopicType(dest); + verifySubject(dest); + if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) + { + createSubscriptionQueue(dest); + } + break; + } + else if(createNode) + { + setLegacyFiledsForTopicType(dest); + verifySubject(dest); + sendExchangeDeclare(dest.getAddressName(), + dest.getExchangeClass().asString(), + dest.getTargetNode().getAlternateExchange(), + dest.getTargetNode().getDeclareArgs(), + false); + if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) + { + createSubscriptionQueue(dest); + } + break; + } } + + default: + throw new AMQException( + "The name '" + dest.getAddressName() + + "' supplied in the address doesn't resolve to an exchange or a queue"); } - default: - throw new AMQException( - "The name '" + dest.getAddressName() + - "' supplied in the address doesn't resolve to an exchange or a queue"); + dest.setAddressResolved(true); } } @@ -1251,6 +1269,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic //both a queue and exchange exist for that name throw new AMQException("Ambiguous address, please specify queue or topic as node type"); } + dest.setAddressType(type); dest.rebuildTargetAndSourceNodes(type); return type; } @@ -1277,7 +1296,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private void createSubscriptionQueue(AMQDestination dest) throws AMQException { QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null - if (dest.getQueueName() == null || !isQueueExist(dest,node,true)) + + if (dest.getQueueName() == null) { if (dest.getLink() != null && dest.getLink().getName() != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index eddaa1a6bb..66766369d7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; @@ -115,6 +116,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM capacity = _0_10session.getAMQConnection().getMaxPrefetch(); } + if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) + { + + if (destination.getLink() == null || destination.getLink().getName() == null) + { + _destination = destination.copyDestination(); + _destination.setQueueName(null); + } + } + } |
