diff options
Diffstat (limited to 'qpid/java/client/src')
11 files changed, 399 insertions, 111 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b64d355f80..2a91ff3ce2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -87,6 +87,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); + private static final long DEFAULT_CLOSE_TIMEOUT = 2000l; + private final long _connectionNumber; /** @@ -160,7 +162,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); - private static final long DEFAULT_TIMEOUT = 1000 * 30; private AMQConnectionDelegate _delegate; @@ -873,7 +874,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close() throws JMSException { - close(DEFAULT_TIMEOUT); + close(DEFAULT_CLOSE_TIMEOUT); } public void close(long timeout) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0b299a22cd..0183c30276 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -313,6 +313,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return _immediatePrefetch; } + abstract void handleNodeDelete(final AMQDestination dest) throws AMQException; + + abstract void handleLinkDelete(final AMQDestination dest) throws AMQException; + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 46f999e452..68b7cf1f88 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1462,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + @Override void handleNodeDelete(AMQDestination dest) throws AMQException { if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index e5ca82f56a..0145d15111 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -29,6 +29,7 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -64,6 +65,7 @@ import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.Strings; public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { @@ -175,12 +177,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination dest, + final AMQShortString exchangeName, final AMQDestination destination, final boolean nowait) throws AMQException, FailoverException { - getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody - (getTicket(),queueName,exchangeName,routingKey,false,arguments). - generateFrame(getChannelId()), QueueBindOkBody.class); + if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL) + { + + getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody + (getTicket(), queueName, exchangeName, routingKey, false, arguments). + generateFrame(getChannelId()), QueueBindOkBody.class); + + } + else + { + // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected. + List<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>(); + bindings.addAll(destination.getNode().getBindings()); + + String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? + destination.getAddressName(): "amq.topic"; + + for (AMQDestination.Binding binding: bindings) + { + // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. + // The null check below is a way to side step that issue while fixing QPID-4146 + // Note this issue only affects producers. + if (binding.getQueue() == null && queueName == null) + { + continue; + } + String queue = binding.getQueue() == null? + queueName.asString(): binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchange : + binding.getExchange(); + + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + doBind(destination, binding, queue, exchange); + } + } } public void sendClose(long timeout) throws AMQException, FailoverException @@ -547,10 +586,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe Map<String,Object> bindingArguments = new HashMap<String, Object>(); bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); - bindQueue(AMQShortString.valueOf(queueName), - AMQShortString.valueOf(dest.getSubject()), - FieldTable.convertToFieldTable(bindingArguments), - AMQShortString.valueOf(dest.getAddressName()),dest,false); + final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments); + doBind(dest, binding, queueName, dest.getAddressName()); } @@ -589,6 +626,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } + public void sendExchangeDelete(final String name) throws AMQException, FailoverException + { + ExchangeDeleteBody body = + getMethodRegistry().createExchangeDeleteBody(getTicket(),AMQShortString.valueOf(name),false, false); + AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); + + getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException { AMQShortString queueName = amqd.getAMQQueueName(); @@ -821,18 +867,25 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException { - AMQFrame queueDeclare = - getMethodRegistry().createQueueDeclareBody(getTicket(), - amqd.getAMQQueueName(), - true, - amqd.isDurable(), - amqd.isExclusive(), - amqd.isAutoDelete(), - false, - null).generateFrame(getChannelId()); - QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); - getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); - return okHandler.getMessageCount(); + if(isBound(null, amqd.getAMQQueueName(), null)) + { + AMQFrame queueDeclare = + getMethodRegistry().createQueueDeclareBody(getTicket(), + amqd.getAMQQueueName(), + true, + amqd.isDurable(), + amqd.isExclusive(), + amqd.isAutoDelete(), + false, + null).generateFrame(getChannelId()); + QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + return okHandler.getMessageCount(); + } + else + { + return 0l; + } } protected boolean tagLE(long tag1, long tag2) @@ -916,6 +969,11 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { arguments.put(AddressHelper.NO_LOCAL, noLocal); } + String altExchange = node.getAlternateExchange(); + if(altExchange != null && !"".equals(altExchange)) + { + arguments.put("alternateExchange", altExchange); + } (new FailoverNoopSupport<Void, AMQException>( new FailoverProtectedOperation<Void, AMQException>() @@ -942,13 +1000,21 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe void handleExchangeNodeCreation(AMQDestination dest) throws AMQException { Node node = dest.getNode(); + String altExchange = dest.getNode().getAlternateExchange(); + Map<String, Object> arguments = node.getDeclareArgs(); + + if(altExchange != null && !"".equals(altExchange)) + { + arguments.put("alternateExchange", altExchange); + } + // can't set alt. exchange declareExchange(AMQShortString.valueOf(dest.getAddressName()), AMQShortString.valueOf(node.getExchangeType()), false, node.isDurable(), node.isAutoDelete(), - FieldTable.convertToFieldTable(node.getDeclareArgs()), false); + FieldTable.convertToFieldTable(arguments), false); // If bindings are specified without a queue name and is called by the producer, // the broker will send an exception as expected. @@ -962,9 +1028,79 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe final String queue, final String exchange) throws AMQException { - bindQueue(AMQShortString.valueOf(queue),AMQShortString.valueOf(binding.getBindingKey()), - FieldTable.convertToFieldTable(binding.getArgs()), - AMQShortString.valueOf(exchange),dest); + final String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + + + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + QueueBindBody queueBindBody = + methodRegistry.createQueueBindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + false, + FieldTable.convertToFieldTable(binding.getArgs())); + + getProtocolHandler().syncWrite(queueBindBody. + generateFrame(getChannelId()), QueueBindOkBody.class); + return null; + } + }, getAMQConnection()).execute(); + + } + + + protected void doUnbind(final AMQDestination.Binding binding, + final String queue, + final String exchange) throws AMQException + { + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + + if (isBound(null, AMQShortString.valueOf(queue), null)) + { + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + AMQMethodBody body; + if (methodRegistry instanceof MethodRegistry_0_9) + { + String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry; + body = methodRegistry_0_9.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + null); + } + else if (methodRegistry instanceof MethodRegistry_0_91) + { + MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry; + body = methodRegistry_0_91.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(binding.getBindingKey()), + null); + + } + else + { + throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); + } + getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class); + return null; + } + else + { + return null; + } + } + }, getAMQConnection()).execute(); } public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException @@ -1057,6 +1193,102 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return match; } + @Override + void handleNodeDelete(final AMQDestination dest) throws AMQException + { + if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) + { + if (isExchangeExist(dest,false)) + { + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendExchangeDelete(dest.getAddressName()); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + else + { + if (isQueueExist(dest,false)) + { + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendQueueDelete(AMQShortString.valueOf(dest.getAddressName())); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + } + + @Override + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (AMQDestination.Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + doUnbind(binding, queue, exchange); + } + } + + + void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException + { + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest.getQueueName(), false, false, false, false, null)) + { + (new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDelete(AMQShortString.valueOf(dest.getQueueName())); + return null; + } + }, getAMQConnection())).execute(); + + } + } + protected void flushAcknowledgments() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 01e89b78c1..187be8522c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,19 +20,35 @@ */ package org.apache.qpid.client; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.apache.qpid.transport.TransportException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); @@ -376,7 +377,23 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa */ public boolean isExclusive() { - return _exclusive; + + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } } public boolean isReceiving() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 658fb25ce4..8f91a7db08 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -17,12 +17,18 @@ */ package org.apache.qpid.client; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; - /** * This is a 0.10 message consumer. */ @@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM clearReceiveQueue(); } } - - public boolean isExclusive() - { - AMQDestination dest = this.getDestination(); - if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) - { - return true; - } - else - { - return dest.getLink().getSubscription().isExclusive(); - } - } - else - { - return super.isExclusive(); - } - } + void postSubscription() throws AMQException { @@ -509,10 +490,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.RECEIVER ) { - ((AMQSession_0_10) getSession()).handleNodeDelete(dest); + getSession().handleNodeDelete(dest); } // Subscription queue is handled as part of linkDelete method. - ((AMQSession_0_10) getSession()).handleLinkDelete(dest); + getSession().handleLinkDelete(dest); if (!isDurableSubscriber()) { ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest); @@ -566,4 +547,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return capacity; } -}
\ No newline at end of file +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 23d65e15d8..cdffc73932 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -118,13 +118,33 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe final AMQFrame cancelFrame = body.generateFrame(getChannelId()); getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class); - + postSubscription(); + getSession().sync(); if (_logger.isDebugEnabled()) { _logger.debug("CancelOk'd for consumer:" + debugIdentity()); } } + void postSubscription() throws AMQException + { + AMQDestination dest = this.getDestination(); + if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS || + dest.getDelete() == AMQDestination.AddressOption.RECEIVER ) + { + getSession().handleNodeDelete(dest); + } + // Subscription queue is handled as part of linkDelete method. + getSession().handleLinkDelete(dest); + if (!isDurableSubscriber()) + { + getSession().deleteSubscriptionQueue(dest); + } + } + } + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 33bafe8f20..1d47ce9a07 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.util.UUID; + import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -32,13 +33,15 @@ import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; + +import org.slf4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; -import org.slf4j.Logger; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -286,6 +289,31 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac { setClosed(); _session.deregisterProducer(_producerId); + AMQDestination dest = getAMQDestination(); + AMQSession ssn = getSession(); + if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + try + { + if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS || + dest.getDelete() == AMQDestination.AddressOption.SENDER ) + { + ssn.handleNodeDelete(dest); + } + ssn.handleLinkDelete(dest); + } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } + catch (AMQException e) + { + JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); + ex.setLinkedException(e); + ex.initCause(e); + throw ex; + } + } } public void send(Message message) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index eb8104b02c..06a3b08272 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -48,7 +47,6 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.GZIPUtils; import org.apache.qpid.util.Strings; @@ -90,8 +88,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer try { getSession().resolveAddress(destination,false,false); - ((AMQSession_0_10)getSession()).handleLinkCreation(destination); - ((AMQSession_0_10)getSession()).sync(); + getSession().handleLinkCreation(destination); + getSession().sync(); } catch(Exception e) { @@ -278,31 +276,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer public void close() throws JMSException { super.close(); - AMQDestination dest = getAMQDestination(); - AMQSession_0_10 ssn = (AMQSession_0_10) getSession(); - if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - try - { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) - { - ssn.handleNodeDelete(dest); - } - ssn.handleLinkDelete(dest); - } - catch(TransportException e) - { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); - } - catch (AMQException e) - { - JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); - ex.setLinkedException(e); - ex.initCause(e); - throw ex; - } - } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 89bf146398..e1b399e10a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -34,15 +34,18 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.CompositeAMQDataBlock; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.util.GZIPUtils; @@ -63,6 +66,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { getSession().resolveAddress(destination, false, false); + + getSession().handleLinkCreation(destination); + getSession().sync(); } else { @@ -92,18 +98,43 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { + + + + AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); + BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); + + AMQShortString routingKey = destination.getRoutingKey(); + + FieldTable headers = delegate.getContentHeaderProperties().getHeaders(); + + if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && + (destination.getSubject() != null + || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null))) + { + + if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null) + { + // use default subject in address string + headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject()); + } + + if (destination.getAddressType() == AMQDestination.TOPIC_TYPE) + { + routingKey = AMQShortString.valueOf(headers.getString(QpidMessageProperties.QPID_SUBJECT)); + } + } + BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(), - destination.getExchangeName(), - destination.getRoutingKey(), - mandatory, - immediate); + destination.getExchangeName(), + routingKey, + mandatory, + immediate); AMQFrame publishFrame = body.generateFrame(getChannelId()); message.prepareForSending(); ByteBuffer payload = message.getData(); - AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); - BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); contentHeaderProperties.setUserId(getUserID()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 21f1623dd1..747668ff9c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -159,7 +159,7 @@ public abstract class BlockingWaiter<T> { _waiting.set(true); - while (!_ready) + while (!_ready && _error == null) { try { |
