summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-08-20 13:48:00 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-08-20 13:48:00 +0000
commit5dd92ba051ee9c7c794a0545dc43316720b66b71 (patch)
tree2b80628b4bdf8327c46f50d9ae9ad1268d9e47e6 /java/client/src
parent377a0c06c172d8f4df10a01b604bb67bb0d5e80d (diff)
downloadqpid-python-5dd92ba051ee9c7c794a0545dc43316720b66b71.tar.gz
This is a fix for QPID-2809
This is also a workaround for QPID-2808 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@987505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java71
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java120
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java11
3 files changed, 150 insertions, 52 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 0dc702dcbc..c6bc1bd622 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -62,6 +62,8 @@ public abstract class AMQDestination implements Destination, Referenceable
protected boolean _isAutoDelete;
private boolean _browseOnly;
+
+ private boolean _isAddressResolved;
private AMQShortString _queueName;
@@ -312,6 +314,11 @@ public abstract class AMQDestination implements Destination, Referenceable
return _destSyntax;
}
+ protected void setDestSyntax(DestSyntax syntax)
+ {
+ _destSyntax = syntax;
+ }
+
public AMQShortString getEncodedName()
{
if(_urlAsShortString == null)
@@ -736,6 +743,10 @@ public abstract class AMQDestination implements Destination, Referenceable
return _address;
}
+ protected void setAddress(Address addr) {
+ _address = addr;
+ }
+
public int getAddressType(){
return _addressType;
}
@@ -747,6 +758,10 @@ public abstract class AMQDestination implements Destination, Referenceable
public String getAddressName() {
return _name;
}
+
+ public void setAddressName(String name){
+ _name = name;
+ }
public String getSubject() {
return _subject;
@@ -763,15 +778,23 @@ public abstract class AMQDestination implements Destination, Referenceable
public void setCreate(AddressOption option) {
_create = option;
}
-
+
public AddressOption getAssert() {
return _assert;
}
+ public void setAssert(AddressOption option) {
+ _assert = option;
+ }
+
public AddressOption getDelete() {
return _delete;
}
-
+
+ public void setDelete(AddressOption option) {
+ _delete = option;
+ }
+
public Node getTargetNode()
{
return _targetNode;
@@ -817,6 +840,16 @@ public abstract class AMQDestination implements Destination, Referenceable
this._routingKey = rk;
}
+ public boolean isAddressResolved()
+ {
+ return _isAddressResolved;
+ }
+
+ public void setAddressResolved(boolean addressResolved)
+ {
+ _isAddressResolved = addressResolved;
+ }
+
private static Address createAddressFromString(String str)
{
return Address.parse(str);
@@ -861,4 +894,38 @@ public abstract class AMQDestination implements Destination, Referenceable
{
return _browseOnly;
}
+
+ public void setBrowseOnly(boolean b)
+ {
+ _browseOnly = b;
+ }
+
+ public AMQDestination copyDestination()
+ {
+ AMQDestination dest =
+ new AMQAnyDestination(_exchangeName,
+ _exchangeClass,
+ _routingKey,
+ _isExclusive,
+ _isAutoDelete,
+ _queueName,
+ _isDurable,
+ _bindingKeys
+ );
+
+ dest.setDestSyntax(_destSyntax);
+ dest.setAddress(_address);
+ dest.setAddressName(_name);
+ dest.setSubject(_subject);
+ dest.setCreate(_create);
+ dest.setAssert(_assert);
+ dest.setDelete(_create);
+ dest.setBrowseOnly(_browseOnly);
+ dest.setAddressType(_addressType);
+ dest.setTargetNode(_targetNode);
+ dest.setSourceNode(_sourceNode);
+ dest.setLink(_link);
+ dest.setAddressResolved(_isAddressResolved);
+ return dest;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 430a4bd9e9..0563276457 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -592,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
(isTopic || consumer.getMessageSelector() == null ||
consumer.getMessageSelector().equals(""));
}
-
+
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
@@ -1168,62 +1168,80 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
boolean isConsumer,
boolean noWait) throws AMQException
{
- boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getAssert() == AddressOption.SENDER);
-
- boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getCreate() == AddressOption.SENDER);
-
-
-
- int type = resolveAddressType(dest);
-
- switch (type)
- {
- case AMQDestination.QUEUE_TYPE:
+ if (dest.isAddressResolved())
+ {
+ if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
- if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode))
- {
- setLegacyFiledsForQueueType(dest);
- break;
- }
- else if(createNode)
- {
- setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,null,false,noWait);
- break;
- }
+ createSubscriptionQueue(dest);
}
+ }
+ else
+ {
+ boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
+ (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getAssert() == AddressOption.SENDER);
+
+ boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
+ (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getCreate() == AddressOption.SENDER);
+
- case AMQDestination.TOPIC_TYPE:
+
+ int type = resolveAddressType(dest);
+
+ switch (type)
{
- if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode))
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- if (isConsumer) {createSubscriptionQueue(dest);}
- break;
+ case AMQDestination.QUEUE_TYPE:
+ {
+ if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode))
+ {
+ setLegacyFiledsForQueueType(dest);
+ break;
+ }
+ else if(createNode)
+ {
+ setLegacyFiledsForQueueType(dest);
+ send0_10QueueDeclare(dest,null,false,noWait);
+ break;
+ }
}
- else if(createNode)
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- sendExchangeDeclare(dest.getAddressName(),
- dest.getExchangeClass().asString(),
- dest.getTargetNode().getAlternateExchange(),
- dest.getTargetNode().getDeclareArgs(),
- false);
- if (isConsumer) {createSubscriptionQueue(dest);}
- break;
+
+ case AMQDestination.TOPIC_TYPE:
+ {
+ if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode))
+ {
+ setLegacyFiledsForTopicType(dest);
+ verifySubject(dest);
+ if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
+ {
+ createSubscriptionQueue(dest);
+ }
+ break;
+ }
+ else if(createNode)
+ {
+ setLegacyFiledsForTopicType(dest);
+ verifySubject(dest);
+ sendExchangeDeclare(dest.getAddressName(),
+ dest.getExchangeClass().asString(),
+ dest.getTargetNode().getAlternateExchange(),
+ dest.getTargetNode().getDeclareArgs(),
+ false);
+ if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
+ {
+ createSubscriptionQueue(dest);
+ }
+ break;
+ }
}
+
+ default:
+ throw new AMQException(
+ "The name '" + dest.getAddressName() +
+ "' supplied in the address doesn't resolve to an exchange or a queue");
}
- default:
- throw new AMQException(
- "The name '" + dest.getAddressName() +
- "' supplied in the address doesn't resolve to an exchange or a queue");
+ dest.setAddressResolved(true);
}
}
@@ -1251,6 +1269,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
//both a queue and exchange exist for that name
throw new AMQException("Ambiguous address, please specify queue or topic as node type");
}
+ dest.setAddressType(type);
dest.rebuildTargetAndSourceNodes(type);
return type;
}
@@ -1277,7 +1296,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private void createSubscriptionQueue(AMQDestination dest) throws AMQException
{
QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null
- if (dest.getQueueName() == null || !isQueueExist(dest,node,true))
+
+ if (dest.getQueueName() == null)
{
if (dest.getLink() != null && dest.getLink().getName() != null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index eddaa1a6bb..66766369d7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
@@ -115,6 +116,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
capacity = _0_10session.getAMQConnection().getMaxPrefetch();
}
+ if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
+ {
+
+ if (destination.getLink() == null || destination.getLink().getName() == null)
+ {
+ _destination = destination.copyDestination();
+ _destination.setQueueName(null);
+ }
+ }
+
}