summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java83
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueue.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java5
9 files changed, 72 insertions, 84 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index cc5af07b20..6c7575429a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -40,8 +40,6 @@ public abstract class AMQDestination implements Destination, Referenceable
protected final AMQShortString _exchangeClass;
- protected final AMQShortString _destinationName;
-
protected final boolean _isDurable;
protected final boolean _isExclusive;
@@ -50,6 +48,8 @@ public abstract class AMQDestination implements Destination, Referenceable
private AMQShortString _queueName;
+ private AMQShortString _routingKey;
+
private String _url;
private AMQShortString _urlAsShortString;
@@ -73,17 +73,17 @@ public abstract class AMQDestination implements Destination, Referenceable
{
_exchangeName = binding.getExchangeName();
_exchangeClass = binding.getExchangeClass();
- _destinationName = binding.getDestinationName();
_isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
_queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName());
+ _routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey());
}
- protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName)
{
- this(exchangeName, exchangeClass, destinationName, false, false, queueName);
+ this(exchangeName, exchangeClass, routingKey, false, false, queueName);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName)
@@ -91,22 +91,23 @@ public abstract class AMQDestination implements Destination, Referenceable
this(exchangeName, exchangeClass, destinationName, false, false, null);
}
- protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive,
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
boolean isAutoDelete, AMQShortString queueName)
{
- this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false);
+ this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false);
}
- protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive,
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
- if (destinationName == null)
+ // If used with a fannout exchange, the routing key can be null
+ if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null)
{
- throw new IllegalArgumentException("Destination exchange must not be null");
+ throw new IllegalArgumentException("routingKey exchange must not be null");
}
if (exchangeName == null)
{
- throw new IllegalArgumentException("Exchange exchange must not be null");
+ throw new IllegalArgumentException("Exchange name must not be null");
}
if (exchangeClass == null)
{
@@ -114,7 +115,7 @@ public abstract class AMQDestination implements Destination, Referenceable
}
_exchangeName = exchangeName;
_exchangeClass = exchangeClass;
- _destinationName = destinationName;
+ _routingKey = routingKey;
_isExclusive = isExclusive;
_isAutoDelete = isAutoDelete;
_queueName = queueName;
@@ -155,11 +156,6 @@ public abstract class AMQDestination implements Destination, Referenceable
return ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(_exchangeClass);
}
- public AMQShortString getDestinationName()
- {
- return _destinationName;
- }
-
public String getQueueName()
{
return _queueName == null ? null : _queueName.toString();
@@ -170,8 +166,6 @@ public abstract class AMQDestination implements Destination, Referenceable
return _queueName;
}
-
-
public void setQueueName(AMQShortString queueName)
{
@@ -182,7 +176,10 @@ public abstract class AMQDestination implements Destination, Referenceable
_byteEncoding = null;
}
- public abstract AMQShortString getRoutingKey();
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
public boolean isExclusive()
{
@@ -213,7 +210,7 @@ public abstract class AMQDestination implements Destination, Referenceable
}
public String toURL()
- {
+ {
String url = _url;
if(url == null)
{
@@ -225,14 +222,7 @@ public abstract class AMQDestination implements Destination, Referenceable
sb.append("://");
sb.append(_exchangeName);
- sb.append('/');
-
- if (_destinationName != null)
- {
- sb.append(_destinationName);
- }
-
- sb.append('/');
+ sb.append("//");
if (_queueName != null)
{
@@ -278,7 +268,7 @@ public abstract class AMQDestination implements Destination, Referenceable
{
int size = _exchangeClass.length() + 1 +
_exchangeName.length() + 1 +
- (_destinationName == null ? 0 : _destinationName.length()) + 1 +
+ 0 + // in place of the destination name
(_queueName == null ? 0 : _queueName.length()) + 1 +
1;
encoding = new byte[size];
@@ -286,14 +276,9 @@ public abstract class AMQDestination implements Destination, Referenceable
pos = _exchangeClass.writeToByteArray(encoding, pos);
pos = _exchangeName.writeToByteArray(encoding, pos);
- if(_destinationName == null)
- {
- encoding[pos++] = (byte)0;
- }
- else
- {
- pos = _destinationName.writeToByteArray(encoding,pos);
- }
+
+ encoding[pos++] = (byte)0;
+
if(_queueName == null)
{
encoding[pos++] = (byte)0;
@@ -337,10 +322,6 @@ public abstract class AMQDestination implements Destination, Referenceable
final AMQDestination that = (AMQDestination) o;
- if (!_destinationName.equals(that._destinationName))
- {
- return false;
- }
if (!_exchangeClass.equals(that._exchangeClass))
{
return false;
@@ -363,7 +344,7 @@ public abstract class AMQDestination implements Destination, Referenceable
int result;
result = _exchangeName.hashCode();
result = 29 * result + _exchangeClass.hashCode();
- result = 29 * result + _destinationName.hashCode();
+ //result = 29 * result + _destinationName.hashCode();
if (_queueName != null)
{
result = 29 * result + _queueName.hashCode();
@@ -386,7 +367,7 @@ public abstract class AMQDestination implements Destination, Referenceable
{
AMQShortString exchangeClass;
AMQShortString exchangeName;
- AMQShortString destinationName;
+ AMQShortString routingKey;
AMQShortString queueName;
boolean isDurable;
boolean isExclusive;
@@ -397,8 +378,8 @@ public abstract class AMQDestination implements Destination, Referenceable
pos+= exchangeClass.length() + 1;
exchangeName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
pos+= exchangeName.length() + 1;
- destinationName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= (destinationName == null ? 0 : destinationName.length()) + 1;
+ routingKey = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= (routingKey == null ? 0 : routingKey.length()) + 1;
queueName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
pos+= (queueName == null ? 0 : queueName.length()) + 1;
int options = byteEncodedDestination[pos];
@@ -408,15 +389,15 @@ public abstract class AMQDestination implements Destination, Referenceable
if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
- return new AMQQueue(exchangeName,destinationName,queueName,isExclusive,isAutoDelete,isDurable);
+ return new AMQQueue(exchangeName,routingKey,queueName,isExclusive,isAutoDelete,isDurable);
}
else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
- return new AMQTopic(exchangeName,destinationName,isAutoDelete,queueName,isDurable);
+ return new AMQTopic(exchangeName,routingKey,isAutoDelete,queueName,isDurable);
}
else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
{
- return new AMQHeadersExchange(destinationName);
+ return new AMQHeadersExchange(routingKey);
}
else
{
@@ -443,6 +424,10 @@ public abstract class AMQDestination implements Destination, Referenceable
{
return new AMQHeadersExchange(binding);
}
+ else if (type.equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS))
+ {
+ return new AMQQueue(binding);
+ }
else
{
throw new IllegalArgumentException("Unknown Exchange Class:" + type + " in binding:" + binding);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
index 1a2fe0d355..b27dfc6dcb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,11 +44,6 @@ public class AMQHeadersExchange extends AMQDestination
super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null);
}
- public AMQShortString getRoutingKey()
- {
- return getDestinationName();
- }
-
public boolean isNameRequired()
{
//Not sure what the best approach is here, probably to treat this like a topic
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
index 13568594d9..924b20e3ba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,7 +21,6 @@
package org.apache.qpid.client;
import javax.jms.Queue;
-import javax.jms.Connection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -82,7 +81,7 @@ public class AMQQueue extends AMQDestination implements Queue
public AMQQueue(AMQConnection connection, String name, boolean temporary)
{
- this(connection.getDefaultQueueExchangeName(), new AMQShortString(name),temporary);
+ this(connection.getDefaultQueueExchangeName(), new AMQShortString(name),temporary);
}
@@ -111,39 +110,39 @@ public class AMQQueue extends AMQDestination implements Queue
// queue name is set to null indicating that the broker assigns a name in the case of temporary queues
// temporary queues are typically used as response queues
this(exchangeName, name, temporary?null:name, temporary, temporary, !temporary);
-
+
}
/**
* Create a reference to a queue. Note this does not actually imply the queue exists.
- * @param destinationName the queue name
+ * @param exchangeName the exchange name we want to send the message to
+ * @param routingKey the routing key
* @param queueName the queue name
* @param exclusive true if the queue should only permit a single consumer
* @param autoDelete true if the queue should be deleted automatically when the last consumers detaches
*/
- public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete)
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete)
{
- this(exchangeName, destinationName, queueName, exclusive, autoDelete, false);
+ this(exchangeName, routingKey, queueName, exclusive, autoDelete, false);
}
- public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
{
- super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
+ super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive,
autoDelete, queueName, durable);
}
-
-
public AMQShortString getRoutingKey()
{
- if( getDestinationName() != null && ! getDestinationName().equals(""))
+ //return getAMQQueueName();
+ if (getAMQQueueName() != null && getAMQQueueName().equals(super.getRoutingKey()))
{
- return getDestinationName();
+ return getAMQQueueName();
}
else
{
- return getAMQQueueName();
+ return super.getRoutingKey();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 38201f9817..855af2cfd0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -738,7 +738,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
AMQShortString topicName;
if (topic instanceof AMQTopic)
{
- topicName = ((AMQTopic) topic).getDestinationName();
+ topicName = ((AMQTopic) topic).getRoutingKey();
}
else
{
@@ -1039,7 +1039,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
-
+
public TemporaryTopic createTemporaryTopic() throws JMSException
{
checkNotClosed();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 81ba795fa4..a63d94b4ca 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -526,9 +526,9 @@ public class AMQSession_0_10 extends AMQSession
try
{
// this is done so that we can produce to a temporary queue beofre we create a consumer
- sendCreateQueue(result.getDestinationName(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
- sendQueueBind(result.getDestinationName(), result.getDestinationName(), new FieldTable(), result.getExchangeName());
- result.setQueueName(result.getDestinationName());
+ sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
+ sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName());
+ result.setQueueName(result.getRoutingKey());
}
catch (Exception e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 319e728edf..b706ec082c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -74,7 +74,7 @@ public class AMQTopic extends AMQDestination implements Topic
public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
{
- return new AMQTopic(topic.getExchangeName(), topic.getDestinationName(), false,
+ return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
getDurableTopicQueueName(subscriptionName, connection),
true);
}
@@ -86,12 +86,12 @@ public class AMQTopic extends AMQDestination implements Topic
public String getTopicName() throws JMSException
{
- return super.getDestinationName().toString();
+ return super.getRoutingKey().toString();
}
public AMQShortString getRoutingKey()
{
- return getDestinationName();
+ return getRoutingKey();
}
public boolean isNameRequired()
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
index 0f3723c58b..768e2862b3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
@@ -33,11 +33,6 @@ public class AMQUndefinedDestination extends AMQDestination
super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
}
- public AMQShortString getRoutingKey()
- {
- return getDestinationName();
- }
-
public boolean isNameRequired()
{
return getAMQQueueName() == null;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 1551ca41ae..a31bfe9df1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -131,12 +131,21 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
origMessage.setJMSMessageID(message.getJMSMessageID());
origMessage.setJMSDeliveryMode(deliveryMode);
}
+
BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
if (contentHeaderProperties.reset())
{
// set the application properties
message.get010Message().getMessageProperties()
.setContentType(contentHeaderProperties.getContentType().toString());
+
+ /* Temp hack to get the JMS client to interoperate with the python client
+ as it relies on content length to determine the boundaries for
+ message framesets. The python client should be fixed.
+ */
+ message.get010Message().getMessageProperties().setContentLength(message.getContentLength());
+
+
AMQShortString type = contentHeaderProperties.getType();
if (type != null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 87fadd5e9b..2d8f325d8a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -722,6 +722,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
}
+ public int getContentLength()
+ {
+ return _data.remaining();
+ }
+
public void setConsumer(BasicMessageConsumer basicMessageConsumer)
{
_consumer = basicMessageConsumer;