summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-05-28 03:18:52 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-05-28 03:18:52 +0000
commit02d8c94d844fc05e329a501cc33296380963d19b (patch)
tree95ce5e873806ade62d509fa6fcfbfd5c9cc68e32 /java/client/src/main
parent920f7ed7c4a52b2544a13e40dd7952846cf993b3 (diff)
downloadqpid-python-02d8c94d844fc05e329a501cc33296380963d19b.tar.gz
1. Capacity can now be specified as " capacity : {source: 5, target 10}" in addition to "capacity:10" where both source and target is set to 10.
2. If the exchange type if direct and no subject is set, then the routing_key is set to "" instead of throwing an exception. 3. Added a fix to infer the exchange type if specified in the x-declares section. 4. The link can now specify an optional 'name' parameter which will be used as the queue name if present. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@949083 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java30
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java21
5 files changed, 56 insertions, 28 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 593613f962..e1f29087a4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -139,7 +139,7 @@ public abstract class AMQDestination implements Destination, Referenceable
protected Node _targetNode;
protected Node _sourceNode;
protected Link _targetLink;
- protected Link _sourceLink;
+ protected Link _link;
// ----- / Fields required to support new address syntax -------
static
@@ -739,14 +739,14 @@ public abstract class AMQDestination implements Destination, Referenceable
_sourceNode = node;
}
- public Link getSourceLink()
+ public Link getLink()
{
- return _sourceLink;
+ return _link;
}
- public void setSourceLink(Link link)
+ public void setLink(Link link)
{
- _sourceLink = link;
+ _link = link;
}
public void setExchangeName(AMQShortString name)
@@ -792,7 +792,7 @@ public abstract class AMQDestination implements Destination, Referenceable
_addressType = _addrHelper.getTargetNodeType();
_targetNode = _addrHelper.getTargetNode(_addressType);
_sourceNode = _addrHelper.getSourceNode(_addressType);
- _sourceLink = _addrHelper.getLink();
+ _link = _addrHelper.getLink();
}
// This method is needed if we didn't know the node type at the beginning.
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 4aca7454bd..55cf5fe64b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -612,9 +612,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
long capacity = 0;
if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getSourceLink().getCapacity() > 0)
+ destination.getLink().getConsumerCapacity() > 0)
{
- capacity = destination.getSourceLink().getCapacity();
+ capacity = destination.getLink().getConsumerCapacity();
}
else if (prefetch())
{
@@ -1229,10 +1229,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
dest.setRoutingKey(ExchangeDefaults.WILDCARD_ANY);
dest.setSubject(ExchangeDefaults.WILDCARD_ANY.toString());
}
- else if (dest.getExchangeClass() == ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+ else
{
- throw new AMQException("If sending to an exchange of type direct," +
- " a valid subject must be specified");
+ dest.setRoutingKey(new AMQShortString(""));
+ dest.setSubject("");
}
}
}
@@ -1242,9 +1242,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null
if (dest.getQueueName() == null || !isQueueExist(dest,node,true))
{
- // can name : my-queue be used in x-declare?
- // if so set it to dest queue name
- // if (dest.getQueueName() == null) { dest.setName(node.getName()) }
+ if (dest.getLink() != null && dest.getLink().getName() != null)
+ {
+ dest.setQueueName(new AMQShortString(dest.getLink().getName()));
+ }
send0_10QueueDeclare(dest,null,false,false);
}
node.addBinding(new Binding(dest.getAddressName(),
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 321f5855d7..9d597d8290 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -106,9 +106,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
// Destination setting overrides connection defaults
if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getSourceLink().getCapacity() > 0)
+ destination.getLink().getConsumerCapacity() > 0)
{
- capacity = destination.getSourceLink().getCapacity();
+ capacity = destination.getLink().getConsumerCapacity();
}
else if (getSession().prefetch())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 60890e3ef7..dc7aca7d3e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -58,6 +58,8 @@ public class AddressHelper
public static final String BINDINGS = "bindings";
public static final String BROWSE_ONLY = "browse";
public static final String CAPACITY = "capacity";
+ public static final String CAPACITY_SOURCE = "source";
+ public static final String CAPACITY_TARGET = "target";
public static final String NAME = "name";
public static final String EXCHANGE = "exchange";
public static final String QUEUE = "queue";
@@ -220,11 +222,7 @@ public class AddressHelper
{
return AMQDestination.QUEUE_TYPE;
}
- else if ((nodeProps.getString(TYPE).equals("topic") ||
- nodeProps.getString(TYPE).equals("direct") ||
- nodeProps.getString(TYPE).equals("fanout") ||
- nodeProps.getString(TYPE).equals("match") ||
- nodeProps.getString(TYPE).equals("xml")) )
+ else if (nodeProps.getString(TYPE).equals("topic"))
{
return AMQDestination.TOPIC_TYPE;
}
@@ -258,7 +256,8 @@ public class AddressHelper
Map declareArgs = getDeclareArgs(parent);
MapAccessor argsMap = new MapAccessor(declareArgs);
ExchangeNode node = new ExchangeNode();
- node.setExchangeType(nodeProps.getString(TYPE));
+ node.setExchangeType(argsMap.getString(TYPE) == null?
+ "topic":argsMap.getString(TYPE));
node.setDeclareArgs(getQpidExchangeOptions(argsMap));
fillInCommonNodeArgs(node,parent,argsMap);
return node;
@@ -285,6 +284,11 @@ public class AddressHelper
node.setBindings(getBindings(parent));
}
+ /**
+ * if the type == queue x-declare args from the node props is used.
+ * if the type == exchange x-declare args from the link props is used
+ * else just create a default temp queue.
+ */
public Node getSourceNode(int addressType)
{
if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
@@ -309,7 +313,19 @@ public class AddressHelper
{
link.setDurable(linkProps.getBoolean(DURABLE)== null? false : linkProps.getBoolean(DURABLE));
link.setName(linkProps.getString(NAME));
- link.setCapacity(linkProps.getInt(CAPACITY));
+
+ if (((Map)address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
+ {
+ MapAccessor capacityProps = new MapAccessor(
+ (Map)((Map)address.getOptions().get(LINK)).get(CAPACITY));
+ link.setConsumerCapacity(capacityProps.getInt(CAPACITY_SOURCE));
+ link.setProducerCapacity(capacityProps.getInt(CAPACITY_TARGET));
+ }
+ else
+ {
+ link.setConsumerCapacity(linkProps.getInt(CAPACITY));
+ link.setProducerCapacity(linkProps.getInt(CAPACITY));
+ }
link.setFilter(linkProps.getString(FILTER));
// so far filter type not used
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index 367191e74e..0ebcaf548b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -31,7 +31,8 @@ public class Link
protected FilterType _filterType = FilterType.SUBJECT;
protected boolean _isNoLocal;
protected boolean _isDurable;
- protected int _capacity = 0;
+ protected int _consumerCapacity = 0;
+ protected int _producerCapacity = 0;
protected Node node;
public Node getNode()
@@ -84,16 +85,26 @@ public class Link
_isNoLocal = noLocal;
}
- public int getCapacity()
+ public int getConsumerCapacity()
{
- return _capacity;
+ return _consumerCapacity;
}
- public void setCapacity(int capacity)
+ public void setConsumerCapacity(int capacity)
{
- this._capacity = capacity;
+ _consumerCapacity = capacity;
+ }
+
+ public int getProducerCapacity()
+ {
+ return _producerCapacity;
}
+ public void setProducerCapacity(int capacity)
+ {
+ _producerCapacity = capacity;
+ }
+
public String getName()
{
return name;