diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2008-01-16 21:57:47 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2008-01-16 21:57:47 +0000 |
| commit | ad0910c4f2c7ef5bee8305d5cad76ecf8cf926fb (patch) | |
| tree | 21aeaa1ff2b3d6031f6b010e17974cd26b9e1752 /java | |
| parent | 40a1f2d87faebcbf2f59f7f9597d6451572d5b7c (diff) | |
| download | qpid-python-ad0910c4f2c7ef5bee8305d5cad76ecf8cf926fb.tar.gz | |
Please refer JIRA's 739,740 and 741 for more information about the issues.
AMQDestination
Renamed the destinationName to routingKey as it is incorrectly used.
Also modified it to recognize fannout exchange
AMQQueue
Modified to return the proper routing key to support situations where the queue name and the routing key is different.
BasicMessageProducer_0_10.java
Added a temp hack to interoperate with python. The bug is in python and it needs to be fixed.
Basically python relies on the content length to pass the content frames properly.
So I added a line to calculate the content length and sets it in the message properties.
The rest was modified to reflect the change done in AMQDestination.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@612581 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
10 files changed, 80 insertions, 85 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index cc5af07b20..6c7575429a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -40,8 +40,6 @@ public abstract class AMQDestination implements Destination, Referenceable protected final AMQShortString _exchangeClass; - protected final AMQShortString _destinationName; - protected final boolean _isDurable; protected final boolean _isExclusive; @@ -50,6 +48,8 @@ public abstract class AMQDestination implements Destination, Referenceable private AMQShortString _queueName; + private AMQShortString _routingKey; + private String _url; private AMQShortString _urlAsShortString; @@ -73,17 +73,17 @@ public abstract class AMQDestination implements Destination, Referenceable { _exchangeName = binding.getExchangeName(); _exchangeClass = binding.getExchangeClass(); - _destinationName = binding.getDestinationName(); _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName()); + _routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey()); } - protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName) + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName) { - this(exchangeName, exchangeClass, destinationName, false, false, queueName); + this(exchangeName, exchangeClass, routingKey, false, false, queueName); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName) @@ -91,22 +91,23 @@ public abstract class AMQDestination implements Destination, Referenceable this(exchangeName, exchangeClass, destinationName, false, false, null); } - protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive, + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, boolean isAutoDelete, AMQShortString queueName) { - this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false); + this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false); } - protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive, + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { - if (destinationName == null) + // If used with a fannout exchange, the routing key can be null + if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null) { - throw new IllegalArgumentException("Destination exchange must not be null"); + throw new IllegalArgumentException("routingKey exchange must not be null"); } if (exchangeName == null) { - throw new IllegalArgumentException("Exchange exchange must not be null"); + throw new IllegalArgumentException("Exchange name must not be null"); } if (exchangeClass == null) { @@ -114,7 +115,7 @@ public abstract class AMQDestination implements Destination, Referenceable } _exchangeName = exchangeName; _exchangeClass = exchangeClass; - _destinationName = destinationName; + _routingKey = routingKey; _isExclusive = isExclusive; _isAutoDelete = isAutoDelete; _queueName = queueName; @@ -155,11 +156,6 @@ public abstract class AMQDestination implements Destination, Referenceable return ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(_exchangeClass); } - public AMQShortString getDestinationName() - { - return _destinationName; - } - public String getQueueName() { return _queueName == null ? null : _queueName.toString(); @@ -170,8 +166,6 @@ public abstract class AMQDestination implements Destination, Referenceable return _queueName; } - - public void setQueueName(AMQShortString queueName) { @@ -182,7 +176,10 @@ public abstract class AMQDestination implements Destination, Referenceable _byteEncoding = null; } - public abstract AMQShortString getRoutingKey(); + public AMQShortString getRoutingKey() + { + return _routingKey; + } public boolean isExclusive() { @@ -213,7 +210,7 @@ public abstract class AMQDestination implements Destination, Referenceable } public String toURL() - { + { String url = _url; if(url == null) { @@ -225,14 +222,7 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append("://"); sb.append(_exchangeName); - sb.append('/'); - - if (_destinationName != null) - { - sb.append(_destinationName); - } - - sb.append('/'); + sb.append("//"); if (_queueName != null) { @@ -278,7 +268,7 @@ public abstract class AMQDestination implements Destination, Referenceable { int size = _exchangeClass.length() + 1 + _exchangeName.length() + 1 + - (_destinationName == null ? 0 : _destinationName.length()) + 1 + + 0 + // in place of the destination name (_queueName == null ? 0 : _queueName.length()) + 1 + 1; encoding = new byte[size]; @@ -286,14 +276,9 @@ public abstract class AMQDestination implements Destination, Referenceable pos = _exchangeClass.writeToByteArray(encoding, pos); pos = _exchangeName.writeToByteArray(encoding, pos); - if(_destinationName == null) - { - encoding[pos++] = (byte)0; - } - else - { - pos = _destinationName.writeToByteArray(encoding,pos); - } + + encoding[pos++] = (byte)0; + if(_queueName == null) { encoding[pos++] = (byte)0; @@ -337,10 +322,6 @@ public abstract class AMQDestination implements Destination, Referenceable final AMQDestination that = (AMQDestination) o; - if (!_destinationName.equals(that._destinationName)) - { - return false; - } if (!_exchangeClass.equals(that._exchangeClass)) { return false; @@ -363,7 +344,7 @@ public abstract class AMQDestination implements Destination, Referenceable int result; result = _exchangeName.hashCode(); result = 29 * result + _exchangeClass.hashCode(); - result = 29 * result + _destinationName.hashCode(); + //result = 29 * result + _destinationName.hashCode(); if (_queueName != null) { result = 29 * result + _queueName.hashCode(); @@ -386,7 +367,7 @@ public abstract class AMQDestination implements Destination, Referenceable { AMQShortString exchangeClass; AMQShortString exchangeName; - AMQShortString destinationName; + AMQShortString routingKey; AMQShortString queueName; boolean isDurable; boolean isExclusive; @@ -397,8 +378,8 @@ public abstract class AMQDestination implements Destination, Referenceable pos+= exchangeClass.length() + 1; exchangeName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); pos+= exchangeName.length() + 1; - destinationName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); - pos+= (destinationName == null ? 0 : destinationName.length()) + 1; + routingKey = AMQShortString.readFromByteArray(byteEncodedDestination, pos); + pos+= (routingKey == null ? 0 : routingKey.length()) + 1; queueName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); pos+= (queueName == null ? 0 : queueName.length()) + 1; int options = byteEncodedDestination[pos]; @@ -408,15 +389,15 @@ public abstract class AMQDestination implements Destination, Referenceable if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) { - return new AMQQueue(exchangeName,destinationName,queueName,isExclusive,isAutoDelete,isDurable); + return new AMQQueue(exchangeName,routingKey,queueName,isExclusive,isAutoDelete,isDurable); } else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) { - return new AMQTopic(exchangeName,destinationName,isAutoDelete,queueName,isDurable); + return new AMQTopic(exchangeName,routingKey,isAutoDelete,queueName,isDurable); } else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS)) { - return new AMQHeadersExchange(destinationName); + return new AMQHeadersExchange(routingKey); } else { @@ -443,6 +424,10 @@ public abstract class AMQDestination implements Destination, Referenceable { return new AMQHeadersExchange(binding); } + else if (type.equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS)) + { + return new AMQQueue(binding); + } else { throw new IllegalArgumentException("Unknown Exchange Class:" + type + " in binding:" + binding); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java index 1a2fe0d355..b27dfc6dcb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,11 +44,6 @@ public class AMQHeadersExchange extends AMQDestination super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null); } - public AMQShortString getRoutingKey() - { - return getDestinationName(); - } - public boolean isNameRequired() { //Not sure what the best approach is here, probably to treat this like a topic diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 13568594d9..924b20e3ba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,7 +21,6 @@ package org.apache.qpid.client; import javax.jms.Queue; -import javax.jms.Connection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -82,7 +81,7 @@ public class AMQQueue extends AMQDestination implements Queue public AMQQueue(AMQConnection connection, String name, boolean temporary) { - this(connection.getDefaultQueueExchangeName(), new AMQShortString(name),temporary); + this(connection.getDefaultQueueExchangeName(), new AMQShortString(name),temporary); } @@ -111,39 +110,39 @@ public class AMQQueue extends AMQDestination implements Queue // queue name is set to null indicating that the broker assigns a name in the case of temporary queues // temporary queues are typically used as response queues this(exchangeName, name, temporary?null:name, temporary, temporary, !temporary); - + } /** * Create a reference to a queue. Note this does not actually imply the queue exists. - * @param destinationName the queue name + * @param exchangeName the exchange name we want to send the message to + * @param routingKey the routing key * @param queueName the queue name * @param exclusive true if the queue should only permit a single consumer * @param autoDelete true if the queue should be deleted automatically when the last consumers detaches */ - public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete) + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete) { - this(exchangeName, destinationName, queueName, exclusive, autoDelete, false); + this(exchangeName, routingKey, queueName, exclusive, autoDelete, false); } - public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) { - super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive, + super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive, autoDelete, queueName, durable); } - - public AMQShortString getRoutingKey() { - if( getDestinationName() != null && ! getDestinationName().equals("")) + //return getAMQQueueName(); + if (getAMQQueueName() != null && getAMQQueueName().equals(super.getRoutingKey())) { - return getDestinationName(); + return getAMQQueueName(); } else { - return getAMQQueueName(); + return super.getRoutingKey(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 38201f9817..855af2cfd0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -738,7 +738,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQShortString topicName; if (topic instanceof AMQTopic) { - topicName = ((AMQTopic) topic).getDestinationName(); + topicName = ((AMQTopic) topic).getRoutingKey(); } else { @@ -1039,7 +1039,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract TemporaryQueue createTemporaryQueue() throws JMSException; - + public TemporaryTopic createTemporaryTopic() throws JMSException { checkNotClosed(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 81ba795fa4..a63d94b4ca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -526,9 +526,9 @@ public class AMQSession_0_10 extends AMQSession try { // this is done so that we can produce to a temporary queue beofre we create a consumer - sendCreateQueue(result.getDestinationName(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); - sendQueueBind(result.getDestinationName(), result.getDestinationName(), new FieldTable(), result.getExchangeName()); - result.setQueueName(result.getDestinationName()); + sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); + sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName()); + result.setQueueName(result.getRoutingKey()); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 319e728edf..b706ec082c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -74,7 +74,7 @@ public class AMQTopic extends AMQDestination implements Topic public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException { - return new AMQTopic(topic.getExchangeName(), topic.getDestinationName(), false, + return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false, getDurableTopicQueueName(subscriptionName, connection), true); } @@ -86,12 +86,12 @@ public class AMQTopic extends AMQDestination implements Topic public String getTopicName() throws JMSException { - return super.getDestinationName().toString(); + return super.getRoutingKey().toString(); } public AMQShortString getRoutingKey() { - return getDestinationName(); + return getRoutingKey(); } public boolean isNameRequired() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java index 0f3723c58b..768e2862b3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java @@ -33,11 +33,6 @@ public class AMQUndefinedDestination extends AMQDestination super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
}
- public AMQShortString getRoutingKey()
- {
- return getDestinationName();
- }
-
public boolean isNameRequired()
{
return getAMQQueueName() == null;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 1551ca41ae..a31bfe9df1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -131,12 +131,21 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer origMessage.setJMSMessageID(message.getJMSMessageID()); origMessage.setJMSDeliveryMode(deliveryMode); } + BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); if (contentHeaderProperties.reset()) { // set the application properties message.get010Message().getMessageProperties() .setContentType(contentHeaderProperties.getContentType().toString()); + + /* Temp hack to get the JMS client to interoperate with the python client + as it relies on content length to determine the boundaries for + message framesets. The python client should be fixed. + */ + message.get010Message().getMessageProperties().setContentLength(message.getContentLength()); + + AMQShortString type = contentHeaderProperties.getType(); if (type != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 87fadd5e9b..2d8f325d8a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -722,6 +722,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } + public int getContentLength() + { + return _data.remaining(); + } + public void setConsumer(BasicMessageConsumer basicMessageConsumer) { _consumer = basicMessageConsumer; diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index 1774fa1194..529a05b2e2 100644 --- a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -259,7 +259,14 @@ public class AMQBindingURL implements BindingURL { if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) { - return getQueueName(); + if (containsOption(BindingURL.OPTION_ROUTING_KEY)) + { + return new AMQShortString(getOption(OPTION_ROUTING_KEY)); + } + else + { + return getQueueName(); + } } if (containsOption(BindingURL.OPTION_ROUTING_KEY)) |
