diff options
Diffstat (limited to 'java/client')
9 files changed, 72 insertions, 84 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index cc5af07b20..6c7575429a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -40,8 +40,6 @@ public abstract class AMQDestination implements Destination, Referenceable protected final AMQShortString _exchangeClass; - protected final AMQShortString _destinationName; - protected final boolean _isDurable; protected final boolean _isExclusive; @@ -50,6 +48,8 @@ public abstract class AMQDestination implements Destination, Referenceable private AMQShortString _queueName; + private AMQShortString _routingKey; + private String _url; private AMQShortString _urlAsShortString; @@ -73,17 +73,17 @@ public abstract class AMQDestination implements Destination, Referenceable { _exchangeName = binding.getExchangeName(); _exchangeClass = binding.getExchangeClass(); - _destinationName = binding.getDestinationName(); _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName()); + _routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey()); } - protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName) + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName) { - this(exchangeName, exchangeClass, destinationName, false, false, queueName); + this(exchangeName, exchangeClass, routingKey, false, false, queueName); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName) @@ -91,22 +91,23 @@ public abstract class AMQDestination implements Destination, Referenceable this(exchangeName, exchangeClass, destinationName, false, false, null); } - protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive, + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, boolean isAutoDelete, AMQShortString queueName) { - this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false); + this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false); } - protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive, + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { - if (destinationName == null) + // If used with a fannout exchange, the routing key can be null + if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null) { - throw new IllegalArgumentException("Destination exchange must not be null"); + throw new IllegalArgumentException("routingKey exchange must not be null"); } if (exchangeName == null) { - throw new IllegalArgumentException("Exchange exchange must not be null"); + throw new IllegalArgumentException("Exchange name must not be null"); } if (exchangeClass == null) { @@ -114,7 +115,7 @@ public abstract class AMQDestination implements Destination, Referenceable } _exchangeName = exchangeName; _exchangeClass = exchangeClass; - _destinationName = destinationName; + _routingKey = routingKey; _isExclusive = isExclusive; _isAutoDelete = isAutoDelete; _queueName = queueName; @@ -155,11 +156,6 @@ public abstract class AMQDestination implements Destination, Referenceable return ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(_exchangeClass); } - public AMQShortString getDestinationName() - { - return _destinationName; - } - public String getQueueName() { return _queueName == null ? null : _queueName.toString(); @@ -170,8 +166,6 @@ public abstract class AMQDestination implements Destination, Referenceable return _queueName; } - - public void setQueueName(AMQShortString queueName) { @@ -182,7 +176,10 @@ public abstract class AMQDestination implements Destination, Referenceable _byteEncoding = null; } - public abstract AMQShortString getRoutingKey(); + public AMQShortString getRoutingKey() + { + return _routingKey; + } public boolean isExclusive() { @@ -213,7 +210,7 @@ public abstract class AMQDestination implements Destination, Referenceable } public String toURL() - { + { String url = _url; if(url == null) { @@ -225,14 +222,7 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append("://"); sb.append(_exchangeName); - sb.append('/'); - - if (_destinationName != null) - { - sb.append(_destinationName); - } - - sb.append('/'); + sb.append("//"); if (_queueName != null) { @@ -278,7 +268,7 @@ public abstract class AMQDestination implements Destination, Referenceable { int size = _exchangeClass.length() + 1 + _exchangeName.length() + 1 + - (_destinationName == null ? 0 : _destinationName.length()) + 1 + + 0 + // in place of the destination name (_queueName == null ? 0 : _queueName.length()) + 1 + 1; encoding = new byte[size]; @@ -286,14 +276,9 @@ public abstract class AMQDestination implements Destination, Referenceable pos = _exchangeClass.writeToByteArray(encoding, pos); pos = _exchangeName.writeToByteArray(encoding, pos); - if(_destinationName == null) - { - encoding[pos++] = (byte)0; - } - else - { - pos = _destinationName.writeToByteArray(encoding,pos); - } + + encoding[pos++] = (byte)0; + if(_queueName == null) { encoding[pos++] = (byte)0; @@ -337,10 +322,6 @@ public abstract class AMQDestination implements Destination, Referenceable final AMQDestination that = (AMQDestination) o; - if (!_destinationName.equals(that._destinationName)) - { - return false; - } if (!_exchangeClass.equals(that._exchangeClass)) { return false; @@ -363,7 +344,7 @@ public abstract class AMQDestination implements Destination, Referenceable int result; result = _exchangeName.hashCode(); result = 29 * result + _exchangeClass.hashCode(); - result = 29 * result + _destinationName.hashCode(); + //result = 29 * result + _destinationName.hashCode(); if (_queueName != null) { result = 29 * result + _queueName.hashCode(); @@ -386,7 +367,7 @@ public abstract class AMQDestination implements Destination, Referenceable { AMQShortString exchangeClass; AMQShortString exchangeName; - AMQShortString destinationName; + AMQShortString routingKey; AMQShortString queueName; boolean isDurable; boolean isExclusive; @@ -397,8 +378,8 @@ public abstract class AMQDestination implements Destination, Referenceable pos+= exchangeClass.length() + 1; exchangeName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); pos+= exchangeName.length() + 1; - destinationName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); - pos+= (destinationName == null ? 0 : destinationName.length()) + 1; + routingKey = AMQShortString.readFromByteArray(byteEncodedDestination, pos); + pos+= (routingKey == null ? 0 : routingKey.length()) + 1; queueName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); pos+= (queueName == null ? 0 : queueName.length()) + 1; int options = byteEncodedDestination[pos]; @@ -408,15 +389,15 @@ public abstract class AMQDestination implements Destination, Referenceable if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) { - return new AMQQueue(exchangeName,destinationName,queueName,isExclusive,isAutoDelete,isDurable); + return new AMQQueue(exchangeName,routingKey,queueName,isExclusive,isAutoDelete,isDurable); } else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) { - return new AMQTopic(exchangeName,destinationName,isAutoDelete,queueName,isDurable); + return new AMQTopic(exchangeName,routingKey,isAutoDelete,queueName,isDurable); } else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS)) { - return new AMQHeadersExchange(destinationName); + return new AMQHeadersExchange(routingKey); } else { @@ -443,6 +424,10 @@ public abstract class AMQDestination implements Destination, Referenceable { return new AMQHeadersExchange(binding); } + else if (type.equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS)) + { + return new AMQQueue(binding); + } else { throw new IllegalArgumentException("Unknown Exchange Class:" + type + " in binding:" + binding); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java index 1a2fe0d355..b27dfc6dcb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,11 +44,6 @@ public class AMQHeadersExchange extends AMQDestination super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null); } - public AMQShortString getRoutingKey() - { - return getDestinationName(); - } - public boolean isNameRequired() { //Not sure what the best approach is here, probably to treat this like a topic diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 13568594d9..924b20e3ba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,7 +21,6 @@ package org.apache.qpid.client; import javax.jms.Queue; -import javax.jms.Connection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -82,7 +81,7 @@ public class AMQQueue extends AMQDestination implements Queue public AMQQueue(AMQConnection connection, String name, boolean temporary) { - this(connection.getDefaultQueueExchangeName(), new AMQShortString(name),temporary); + this(connection.getDefaultQueueExchangeName(), new AMQShortString(name),temporary); } @@ -111,39 +110,39 @@ public class AMQQueue extends AMQDestination implements Queue // queue name is set to null indicating that the broker assigns a name in the case of temporary queues // temporary queues are typically used as response queues this(exchangeName, name, temporary?null:name, temporary, temporary, !temporary); - + } /** * Create a reference to a queue. Note this does not actually imply the queue exists. - * @param destinationName the queue name + * @param exchangeName the exchange name we want to send the message to + * @param routingKey the routing key * @param queueName the queue name * @param exclusive true if the queue should only permit a single consumer * @param autoDelete true if the queue should be deleted automatically when the last consumers detaches */ - public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete) + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete) { - this(exchangeName, destinationName, queueName, exclusive, autoDelete, false); + this(exchangeName, routingKey, queueName, exclusive, autoDelete, false); } - public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) { - super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive, + super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive, autoDelete, queueName, durable); } - - public AMQShortString getRoutingKey() { - if( getDestinationName() != null && ! getDestinationName().equals("")) + //return getAMQQueueName(); + if (getAMQQueueName() != null && getAMQQueueName().equals(super.getRoutingKey())) { - return getDestinationName(); + return getAMQQueueName(); } else { - return getAMQQueueName(); + return super.getRoutingKey(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 38201f9817..855af2cfd0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -738,7 +738,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQShortString topicName; if (topic instanceof AMQTopic) { - topicName = ((AMQTopic) topic).getDestinationName(); + topicName = ((AMQTopic) topic).getRoutingKey(); } else { @@ -1039,7 +1039,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract TemporaryQueue createTemporaryQueue() throws JMSException; - + public TemporaryTopic createTemporaryTopic() throws JMSException { checkNotClosed(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 81ba795fa4..a63d94b4ca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -526,9 +526,9 @@ public class AMQSession_0_10 extends AMQSession try { // this is done so that we can produce to a temporary queue beofre we create a consumer - sendCreateQueue(result.getDestinationName(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); - sendQueueBind(result.getDestinationName(), result.getDestinationName(), new FieldTable(), result.getExchangeName()); - result.setQueueName(result.getDestinationName()); + sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); + sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName()); + result.setQueueName(result.getRoutingKey()); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 319e728edf..b706ec082c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -74,7 +74,7 @@ public class AMQTopic extends AMQDestination implements Topic public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException { - return new AMQTopic(topic.getExchangeName(), topic.getDestinationName(), false, + return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false, getDurableTopicQueueName(subscriptionName, connection), true); } @@ -86,12 +86,12 @@ public class AMQTopic extends AMQDestination implements Topic public String getTopicName() throws JMSException { - return super.getDestinationName().toString(); + return super.getRoutingKey().toString(); } public AMQShortString getRoutingKey() { - return getDestinationName(); + return getRoutingKey(); } public boolean isNameRequired() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java index 0f3723c58b..768e2862b3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java @@ -33,11 +33,6 @@ public class AMQUndefinedDestination extends AMQDestination super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
}
- public AMQShortString getRoutingKey()
- {
- return getDestinationName();
- }
-
public boolean isNameRequired()
{
return getAMQQueueName() == null;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 1551ca41ae..a31bfe9df1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -131,12 +131,21 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer origMessage.setJMSMessageID(message.getJMSMessageID()); origMessage.setJMSDeliveryMode(deliveryMode); } + BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); if (contentHeaderProperties.reset()) { // set the application properties message.get010Message().getMessageProperties() .setContentType(contentHeaderProperties.getContentType().toString()); + + /* Temp hack to get the JMS client to interoperate with the python client + as it relies on content length to determine the boundaries for + message framesets. The python client should be fixed. + */ + message.get010Message().getMessageProperties().setContentLength(message.getContentLength()); + + AMQShortString type = contentHeaderProperties.getType(); if (type != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 87fadd5e9b..2d8f325d8a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -722,6 +722,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } + public int getContentLength() + { + return _data.remaining(); + } + public void setConsumer(BasicMessageConsumer basicMessageConsumer) { _consumer = basicMessageConsumer; |
