summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java280
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java51
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java41
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java22
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java30
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java43
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java2
11 files changed, 399 insertions, 111 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index b64d355f80..2a91ff3ce2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -87,6 +87,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
+ private static final long DEFAULT_CLOSE_TIMEOUT = 2000l;
+
private final long _connectionNumber;
/**
@@ -160,7 +162,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
- private static final long DEFAULT_TIMEOUT = 1000 * 30;
private AMQConnectionDelegate _delegate;
@@ -873,7 +874,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close() throws JMSException
{
- close(DEFAULT_TIMEOUT);
+ close(DEFAULT_CLOSE_TIMEOUT);
}
public void close(long timeout) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0b299a22cd..0183c30276 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -313,6 +313,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return _immediatePrefetch;
}
+ abstract void handleNodeDelete(final AMQDestination dest) throws AMQException;
+
+ abstract void handleLinkDelete(final AMQDestination dest) throws AMQException;
+
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 46f999e452..68b7cf1f88 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1462,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
+ @Override
void handleNodeDelete(AMQDestination dest) throws AMQException
{
if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index e5ca82f56a..0145d15111 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -29,6 +29,7 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -64,6 +65,7 @@ import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.Strings;
public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -175,12 +177,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, final AMQDestination dest,
+ final AMQShortString exchangeName, final AMQDestination destination,
final boolean nowait) throws AMQException, FailoverException
{
- getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
- (getTicket(),queueName,exchangeName,routingKey,false,arguments).
- generateFrame(getChannelId()), QueueBindOkBody.class);
+ if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL)
+ {
+
+ getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
+ (getTicket(), queueName, exchangeName, routingKey, false, arguments).
+ generateFrame(getChannelId()), QueueBindOkBody.class);
+
+ }
+ else
+ {
+ // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
+ List<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>();
+ bindings.addAll(destination.getNode().getBindings());
+
+ String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
+ destination.getAddressName(): "amq.topic";
+
+ for (AMQDestination.Binding binding: bindings)
+ {
+ // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
+ // The null check below is a way to side step that issue while fixing QPID-4146
+ // Note this issue only affects producers.
+ if (binding.getQueue() == null && queueName == null)
+ {
+ continue;
+ }
+ String queue = binding.getQueue() == null?
+ queueName.asString(): binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchange :
+ binding.getExchange();
+
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ doBind(destination, binding, queue, exchange);
+ }
+ }
}
public void sendClose(long timeout) throws AMQException, FailoverException
@@ -547,10 +586,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
Map<String,Object> bindingArguments = new HashMap<String, Object>();
bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
- bindQueue(AMQShortString.valueOf(queueName),
- AMQShortString.valueOf(dest.getSubject()),
- FieldTable.convertToFieldTable(bindingArguments),
- AMQShortString.valueOf(dest.getAddressName()),dest,false);
+ final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments);
+ doBind(dest, binding, queueName, dest.getAddressName());
}
@@ -589,6 +626,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
+ public void sendExchangeDelete(final String name) throws AMQException, FailoverException
+ {
+ ExchangeDeleteBody body =
+ getMethodRegistry().createExchangeDeleteBody(getTicket(),AMQShortString.valueOf(name),false, false);
+ AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
+
+ getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+
private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException
{
AMQShortString queueName = amqd.getAMQQueueName();
@@ -821,18 +867,25 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException
{
- AMQFrame queueDeclare =
- getMethodRegistry().createQueueDeclareBody(getTicket(),
- amqd.getAMQQueueName(),
- true,
- amqd.isDurable(),
- amqd.isExclusive(),
- amqd.isAutoDelete(),
- false,
- null).generateFrame(getChannelId());
- QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
- getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
- return okHandler.getMessageCount();
+ if(isBound(null, amqd.getAMQQueueName(), null))
+ {
+ AMQFrame queueDeclare =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ true,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+ null).generateFrame(getChannelId());
+ QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+ return okHandler.getMessageCount();
+ }
+ else
+ {
+ return 0l;
+ }
}
protected boolean tagLE(long tag1, long tag2)
@@ -916,6 +969,11 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
arguments.put(AddressHelper.NO_LOCAL, noLocal);
}
+ String altExchange = node.getAlternateExchange();
+ if(altExchange != null && !"".equals(altExchange))
+ {
+ arguments.put("alternateExchange", altExchange);
+ }
(new FailoverNoopSupport<Void, AMQException>(
new FailoverProtectedOperation<Void, AMQException>()
@@ -942,13 +1000,21 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
{
Node node = dest.getNode();
+ String altExchange = dest.getNode().getAlternateExchange();
+ Map<String, Object> arguments = node.getDeclareArgs();
+
+ if(altExchange != null && !"".equals(altExchange))
+ {
+ arguments.put("alternateExchange", altExchange);
+ }
+
// can't set alt. exchange
declareExchange(AMQShortString.valueOf(dest.getAddressName()),
AMQShortString.valueOf(node.getExchangeType()),
false,
node.isDurable(),
node.isAutoDelete(),
- FieldTable.convertToFieldTable(node.getDeclareArgs()), false);
+ FieldTable.convertToFieldTable(arguments), false);
// If bindings are specified without a queue name and is called by the producer,
// the broker will send an exception as expected.
@@ -962,9 +1028,79 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
final String queue,
final String exchange) throws AMQException
{
- bindQueue(AMQShortString.valueOf(queue),AMQShortString.valueOf(binding.getBindingKey()),
- FieldTable.convertToFieldTable(binding.getArgs()),
- AMQShortString.valueOf(exchange),dest);
+ final String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ QueueBindBody queueBindBody =
+ methodRegistry.createQueueBindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(bindingKey),
+ false,
+ FieldTable.convertToFieldTable(binding.getArgs()));
+
+ getProtocolHandler().syncWrite(queueBindBody.
+ generateFrame(getChannelId()), QueueBindOkBody.class);
+ return null;
+ }
+ }, getAMQConnection()).execute();
+
+ }
+
+
+ protected void doUnbind(final AMQDestination.Binding binding,
+ final String queue,
+ final String exchange) throws AMQException
+ {
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+ if (isBound(null, AMQShortString.valueOf(queue), null))
+ {
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ AMQMethodBody body;
+ if (methodRegistry instanceof MethodRegistry_0_9)
+ {
+ String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+ MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry;
+ body = methodRegistry_0_9.createQueueUnbindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(bindingKey),
+ null);
+ }
+ else if (methodRegistry instanceof MethodRegistry_0_91)
+ {
+ MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry;
+ body = methodRegistry_0_91.createQueueUnbindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(binding.getBindingKey()),
+ null);
+
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8");
+ }
+ getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class);
+ return null;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }, getAMQConnection()).execute();
}
public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
@@ -1057,6 +1193,102 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return match;
}
+ @Override
+ void handleNodeDelete(final AMQDestination dest) throws AMQException
+ {
+ if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
+ {
+ if (isExchangeExist(dest,false))
+ {
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendExchangeDelete(dest.getAddressName());
+ return null;
+ }
+ }, getAMQConnection()).execute();
+ dest.setAddressResolved(0);
+ }
+ }
+ else
+ {
+ if (isQueueExist(dest,false))
+ {
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendQueueDelete(AMQShortString.valueOf(dest.getAddressName()));
+ return null;
+ }
+ }, getAMQConnection()).execute();
+ dest.setAddressResolved(0);
+ }
+ }
+ }
+
+ @Override
+ void handleLinkDelete(AMQDestination dest) throws AMQException
+ {
+ // We need to destroy link bindings
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (AMQDestination.Binding binding: dest.getLink().getBindings())
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Unbinding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ doUnbind(binding, queue, exchange);
+ }
+ }
+
+
+ void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException
+ {
+ // We need to delete the subscription queue.
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
+ dest.getLink().getSubscriptionQueue().isExclusive() &&
+ isQueueExist(dest.getQueueName(), false, false, false, false, null))
+ {
+ (new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ sendQueueDelete(AMQShortString.valueOf(dest.getQueueName()));
+ return null;
+ }
+ }, getAMQConnection())).execute();
+
+ }
+ }
+
protected void flushAcknowledgments()
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 01e89b78c1..187be8522c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -20,19 +20,35 @@
*/
package org.apache.qpid.client;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.transport.TransportException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -376,7 +377,23 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
*/
public boolean isExclusive()
{
- return _exclusive;
+
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ return true;
+ }
+ else
+ {
+ return dest.getLink().getSubscription().isExclusive();
+ }
+ }
+ else
+ {
+ return _exclusive;
+ }
}
public boolean isReceiving()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 658fb25ce4..8f91a7db08 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -17,12 +17,18 @@
*/
package org.apache.qpid.client;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* This is a 0.10 message consumer.
*/
@@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
clearReceiveQueue();
}
}
-
- public boolean isExclusive()
- {
- AMQDestination dest = this.getDestination();
- if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
- {
- return true;
- }
- else
- {
- return dest.getLink().getSubscription().isExclusive();
- }
- }
- else
- {
- return super.isExclusive();
- }
- }
+
void postSubscription() throws AMQException
{
@@ -509,10 +490,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.RECEIVER )
{
- ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
+ getSession().handleNodeDelete(dest);
}
// Subscription queue is handled as part of linkDelete method.
- ((AMQSession_0_10) getSession()).handleLinkDelete(dest);
+ getSession().handleLinkDelete(dest);
if (!isDurableSubscriber())
{
((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
@@ -566,4 +547,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return capacity;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 23d65e15d8..cdffc73932 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -118,13 +118,33 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
final AMQFrame cancelFrame = body.generateFrame(getChannelId());
getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
-
+ postSubscription();
+ getSession().sync();
if (_logger.isDebugEnabled())
{
_logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
}
+ void postSubscription() throws AMQException
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
+ dest.getDelete() == AMQDestination.AddressOption.RECEIVER )
+ {
+ getSession().handleNodeDelete(dest);
+ }
+ // Subscription queue is handled as part of linkDelete method.
+ getSession().handleLinkDelete(dest);
+ if (!isDurableSubscriber())
+ {
+ getSession().deleteSubscriptionQueue(dest);
+ }
+ }
+ }
+
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 33bafe8f20..1d47ce9a07 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.util.UUID;
+
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -32,13 +33,15 @@ import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
+
+import org.slf4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
-import org.slf4j.Logger;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -286,6 +289,31 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
{
setClosed();
_session.deregisterProducer(_producerId);
+ AMQDestination dest = getAMQDestination();
+ AMQSession ssn = getSession();
+ if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ try
+ {
+ if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
+ dest.getDelete() == AMQDestination.AddressOption.SENDER )
+ {
+ ssn.handleNodeDelete(dest);
+ }
+ ssn.handleLinkDelete(dest);
+ }
+ catch(TransportException e)
+ {
+ throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ }
+ catch (AMQException e)
+ {
+ JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
+ ex.setLinkedException(e);
+ ex.initCause(e);
+ throw ex;
+ }
+ }
}
public void send(Message message) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index eb8104b02c..06a3b08272 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -48,7 +47,6 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.GZIPUtils;
import org.apache.qpid.util.Strings;
@@ -90,8 +88,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
try
{
getSession().resolveAddress(destination,false,false);
- ((AMQSession_0_10)getSession()).handleLinkCreation(destination);
- ((AMQSession_0_10)getSession()).sync();
+ getSession().handleLinkCreation(destination);
+ getSession().sync();
}
catch(Exception e)
{
@@ -278,31 +276,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
public void close() throws JMSException
{
super.close();
- AMQDestination dest = getAMQDestination();
- AMQSession_0_10 ssn = (AMQSession_0_10) getSession();
- if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- try
- {
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.SENDER )
- {
- ssn.handleNodeDelete(dest);
- }
- ssn.handleLinkDelete(dest);
- }
- catch(TransportException e)
- {
- throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
- }
- catch (AMQException e)
- {
- JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
- ex.setLinkedException(e);
- ex.initCause(e);
- throw ex;
- }
- }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 89bf146398..e1b399e10a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -34,15 +34,18 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.CompositeAMQDataBlock;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.util.GZIPUtils;
@@ -63,6 +66,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
getSession().resolveAddress(destination, false, false);
+
+ getSession().handleLinkCreation(destination);
+ getSession().sync();
}
else
{
@@ -92,18 +98,43 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
boolean immediate) throws JMSException
{
+
+
+
+ AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
+ BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
+
+ AMQShortString routingKey = destination.getRoutingKey();
+
+ FieldTable headers = delegate.getContentHeaderProperties().getHeaders();
+
+ if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
+ (destination.getSubject() != null
+ || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null)))
+ {
+
+ if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null)
+ {
+ // use default subject in address string
+ headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject());
+ }
+
+ if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ routingKey = AMQShortString.valueOf(headers.getString(QpidMessageProperties.QPID_SUBJECT));
+ }
+ }
+
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
- destination.getExchangeName(),
- destination.getRoutingKey(),
- mandatory,
- immediate);
+ destination.getExchangeName(),
+ routingKey,
+ mandatory,
+ immediate);
AMQFrame publishFrame = body.generateFrame(getChannelId());
message.prepareForSending();
ByteBuffer payload = message.getData();
- AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
- BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
contentHeaderProperties.setUserId(getUserID());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 21f1623dd1..747668ff9c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -159,7 +159,7 @@ public abstract class BlockingWaiter<T>
{
_waiting.set(true);
- while (!_ready)
+ while (!_ready && _error == null)
{
try
{