diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-09-01 12:09:54 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-09-01 12:09:54 +0000 |
| commit | 44ad7ccacaab3d56c0a7e29ba13dcc3db57ebdf8 (patch) | |
| tree | 5e7455fe999a28b30c87baa59fa79706b9e382a7 /qpid/java/client | |
| parent | 1430d822998d25832fd4c4266859cc2369f56c58 (diff) | |
| download | qpid-python-44ad7ccacaab3d56c0a7e29ba13dcc3db57ebdf8.tar.gz | |
QPID-4261: extend BindingURLs to allow specifying exchange durable/autodelete/internal options, use the values when sending exchange declares during producer and consumer creation. Fix ExchangeDeclareHandler to set auto-delete properly (though we dont actually support it, and it was removed from the protocol in 0-9-1).
Isolate AMQProtocolHandler use to the 0-8/0-9/0-9-1 specific Session/Producer/Consumer implementations that actually need it instead of letting it bleed through the abstraction and 0-10 implementations that dont use it. Add some other clarifying comments.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1379748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
15 files changed, 310 insertions, 224 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 530186b1f9..096738f9ad 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -52,6 +52,12 @@ public abstract class AMQDestination implements Destination, Referenceable private AMQShortString _exchangeClass; + private boolean _exchangeAutoDelete; + + private boolean _exchangeDurable; + + private boolean _exchangeInternal; + private boolean _isDurable; private boolean _isExclusive; @@ -184,6 +190,7 @@ public abstract class AMQDestination implements Destination, Referenceable private Node _sourceNode; private Link _targetLink; private Link _link; + // ----- / Fields required to support new address syntax ------- @@ -280,6 +287,9 @@ public abstract class AMQDestination implements Destination, Referenceable { _exchangeName = binding.getExchangeName(); _exchangeClass = binding.getExchangeClass(); + _exchangeDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_DURABLE)); + _exchangeAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE)); + _exchangeInternal = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL)); _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); @@ -358,6 +368,10 @@ public abstract class AMQDestination implements Destination, Referenceable _destSyntax = DestSyntax.BURL; _browseOnly = browseOnly; _rejectBehaviour = null; + _exchangeAutoDelete = false; + _exchangeDurable = false; + _exchangeInternal = false; + if (_logger.isDebugEnabled()) { _logger.debug("Based on " + toString() + " the selected destination syntax is " + _destSyntax); @@ -412,6 +426,21 @@ public abstract class AMQDestination implements Destination, Referenceable return _exchangeClass; } + public boolean isExchangeDurable() + { + return _exchangeDurable; + } + + public boolean isExchangeAutoDelete() + { + return _exchangeAutoDelete; + } + + public boolean isExchangeInternal() + { + return _exchangeInternal; + } + public boolean isTopic() { return ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(_exchangeClass); @@ -579,6 +608,27 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); } + if (_exchangeDurable) + { + sb.append(BindingURL.OPTION_EXCHANGE_DURABLE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + + if (_exchangeAutoDelete) + { + sb.append(BindingURL.OPTION_EXCHANGE_AUTODELETE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + + if (_exchangeInternal) + { + sb.append(BindingURL.OPTION_EXCHANGE_INTERNAL); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + //removeKey the last char '?' if there is no options , ',' if there are. sb.deleteCharAt(sb.length() - 1); url = sb.toString(); @@ -935,6 +985,4 @@ public abstract class AMQDestination implements Destination, Referenceable { return _rejectBehaviour; } - - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d4e6ec16e4..f258404e2d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -49,13 +49,11 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; @@ -649,12 +647,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public abstract void acknowledgeMessage(long deliveryTag, boolean multiple); - public MethodRegistry getMethodRegistry() - { - MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); - return methodRegistry; - } - /** * Binds the named queue, with the specified routing key, to the named exchange. * @@ -1550,7 +1542,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException { - declareExchange(name, type, getProtocolHandler(), nowait); + declareExchange(name, type, nowait, false, false, false); } abstract public void sync() throws AMQException; @@ -1690,8 +1682,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws AMQException { - AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler, false); + declareExchange(amqd, false); AMQShortString queueName = declareQueue(amqd, false); bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); } @@ -2582,11 +2573,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** * Register to consume from the queue. - * * @param queueName */ - private void consumeFromQueue(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException + private void consumeFromQueue(C consumer, AMQShortString queueName, boolean nowait) throws AMQException, FailoverException { int tagId = _nextTag++; @@ -2603,7 +2592,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - sendConsume(consumer, queueName, protocolHandler, nowait, tagId); + sendConsume(consumer, queueName, nowait, tagId); } catch (AMQException e) { @@ -2614,7 +2603,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendConsume(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException; + boolean nowait, int tag) throws AMQException, FailoverException; private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate) throws JMSException @@ -2648,9 +2637,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract P createMessageProducer(final Destination destination, final Boolean mandatory, final Boolean immediate, final long producerId) throws JMSException; - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException + private void declareExchange(AMQDestination amqd, boolean nowait) throws AMQException { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), nowait, amqd.isExchangeDurable(), + amqd.isExchangeAutoDelete(), amqd.isExchangeInternal()); } /** @@ -2707,33 +2697,29 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * * @param name The name of the exchange to declare. * @param type The type of the exchange to declare. - * @param protocolHandler The protocol handler to process the communication through. * @param nowait - * + * @param durable + * @param autoDelete + * @param internal * @throws AMQException If the exchange cannot be declared for any reason. * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException + final boolean nowait, final boolean durable, + final boolean autoDelete, final boolean internal) throws AMQException { new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendExchangeDeclare(name, type, protocolHandler, nowait); + sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal); return null; } }, _connection).execute(); } - public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; - - - void declareQueuePassive(AMQDestination queue) throws AMQException - { - declareQueue(queue,false,false,true); - } + public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException; /** * Declares a queue for a JMS destination. @@ -2768,31 +2754,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return declareQueue(amqd, noLocal, nowait, false); } - protected AMQShortString declareQueue(final AMQDestination amqd, - final boolean noLocal, final boolean nowait, final boolean passive) - throws AMQException - { - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new FailoverNoopSupport<AMQShortString, AMQException>( - new FailoverProtectedOperation<AMQShortString, AMQException>() - { - public AMQShortString execute() throws AMQException, FailoverException - { - // Generate the queue name if the destination indicates that a client generated name is to be used. - if (amqd.isNameRequired()) - { - amqd.setQueueName(protocolHandler.generateQueueName()); - } - - sendQueueDeclare(amqd, protocolHandler, nowait, passive); - - return amqd.getAMQQueueName(); - } - }, _connection).execute(); - } - - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait, boolean passive) throws AMQException, FailoverException; + protected abstract AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException; /** * Undeclares the specified queue. @@ -2845,21 +2808,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return ++_nextProducerId; } - protected AMQProtocolHandler getProtocolHandler() - { - return _connection.getProtocolHandler(); - } - - public byte getProtocolMajorVersion() - { - return getProtocolHandler().getProtocolMajorVersion(); - } - - public byte getProtocolMinorVersion() - { - return getProtocolHandler().getProtocolMinorVersion(); - } - protected boolean hasMessageListeners() { return _hasMessageListeners; @@ -2918,8 +2866,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { AMQDestination amqd = consumer.getDestination(); - AMQProtocolHandler protocolHandler = getProtocolHandler(); - if (amqd.getDestSyntax() == DestSyntax.ADDR) { handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait); @@ -2928,7 +2874,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (_declareExchanges) { - declareExchange(amqd, protocolHandler, nowait); + declareExchange(amqd, nowait); } if (_delareQueues || amqd.isNameRequired()) @@ -2973,7 +2919,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait); + consumeFromQueue(consumer, queueName, nowait); } catch (FailoverException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 8a7c6b1a01..dcbdadf46d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -46,12 +46,12 @@ import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; + import static org.apache.qpid.transport.Option.BATCH; import static org.apache.qpid.transport.Option.NONE; import static org.apache.qpid.transport.Option.SYNC; @@ -523,11 +523,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { - - final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, - getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh, - prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose); + getMessageFactoryRegistry(), this, rawSelector, prefetchHigh, prefetchLow, + exclusive, getAcknowledgeMode(), noConsume, autoClose); } /** @@ -591,7 +589,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * This method is invoked when a consumer is created * Registers the consumer with the broker */ - public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, + public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException { @@ -653,7 +651,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this, - getProtocolHandler(), producerId, immediate, mandatory); + producerId, immediate, mandatory); } catch (AMQException e) { @@ -673,26 +671,25 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * creates an exchange if it does not already exist */ - public void sendExchangeDeclare(final AMQShortString name, - final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) - throws AMQException, FailoverException + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { - sendExchangeDeclare(name.asString(), type.asString(), null, null, - nowait); + //The 'internal' parameter is ignored on the 0-10 path, the protocol does not support it + sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete); } public void sendExchangeDeclare(final String name, final String type, final String alternateExchange, final Map<String, Object> args, - final boolean nowait) throws AMQException + final boolean nowait, boolean durable, boolean autoDelete) throws AMQException { getQpidSession().exchangeDeclare( name, type, alternateExchange, args, - name.toString().startsWith("amq.") ? Option.PASSIVE - : Option.NONE); + name.toString().startsWith("amq.") ? Option.PASSIVE : Option.NONE, + durable ? Option.DURABLE : Option.NONE, + autoDelete ? Option.AUTO_DELETE : Option.NONE); // We need to sync so that we get notify of an error. if (!nowait) { @@ -717,18 +714,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Declare a queue with the given queueName */ - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait, boolean passive) - throws AMQException, FailoverException - { - // do nothing this is only used by 0_8 - } - - /** - * Declare a queue with the given queueName - */ - public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait, boolean passive) + public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal, + final boolean nowait, boolean passive) throws AMQException { AMQShortString queueName; @@ -925,12 +912,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return getCurrentException(); } + @Override protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new FailoverNoopSupport<AMQShortString, AMQException>( new FailoverProtectedOperation<AMQShortString, AMQException>() { @@ -947,7 +933,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive); + return send0_10QueueDeclare(amqd, noLocal, nowait, passive); } }, getAMQConnection()).execute(); } @@ -1217,7 +1203,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic else if(createNode) { setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,null,noLocal,noWait, false); + send0_10QueueDeclare(dest,noLocal,noWait,false); sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), null,dest.getExchangeName(),dest, false); break; @@ -1244,7 +1230,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.getExchangeClass().asString(), dest.getTargetNode().getAlternateExchange(), dest.getTargetNode().getDeclareArgs(), - false); + false, false, false); if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) { createSubscriptionQueue(dest,noLocal); @@ -1323,7 +1309,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } node.setExclusive(true); node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,null,noLocal,true, false); + send0_10QueueDeclare(dest,noLocal,true,false); getQpidSession().exchangeBind(dest.getQueueName(), dest.getAddressName(), dest.getSubject(), diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 8ab23a240e..9cd5eb1491 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; @@ -359,9 +360,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } - @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, + @Override + public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException { @@ -380,27 +381,29 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if (nowait) { - protocolHandler.writeFrame(jmsConsume); + getProtocolHandler().writeFrame(jmsConsume); } else { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + getProtocolHandler().syncWrite(jmsConsume, BasicConsumeOkBody.class); } } - public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException + @Override + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { + //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path + ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type, name.toString().startsWith("amq."), - false,false,false,false,null); + durable, autoDelete, internal, false, null); AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait, boolean passive) throws AMQException, FailoverException + private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(), @@ -414,7 +417,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe AMQFrame queueDeclare = body.generateFrame(getChannelId()); - protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + } + + @Override + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, + final boolean nowait, final boolean passive) throws AMQException + { + //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path + + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + return new FailoverNoopSupport<AMQShortString, AMQException>( + new FailoverProtectedOperation<AMQShortString, AMQException>() + { + public AMQShortString execute() throws AMQException, FailoverException + { + // Generate the queue name if the destination indicates that a client generated name is to be used. + if (amqd.isNameRequired()) + { + amqd.setQueueName(protocolHandler.generateQueueName()); + } + + sendQueueDeclare(amqd, passive); + + return amqd.getAMQQueueName(); + } + }, getAMQConnection()).execute(); } public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException @@ -440,10 +468,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments, final boolean noConsume, final boolean autoClose) throws JMSException { - - final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, - getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow, + getMessageFactoryRegistry(),this, arguments, prefetchHigh, prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose); } @@ -662,14 +688,23 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe queueName == null ? null : new AMQShortString(queueName), bindingKey == null ? null : new AMQShortString(bindingKey)); } - + + private AMQProtocolHandler getProtocolHandler() + { + return getAMQConnection().getProtocolHandler(); + } + + public MethodRegistry getMethodRegistry() + { + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + return methodRegistry; + } public AMQException getLastException() { // if the Connection has closed then we should throw any exception that // has occurred that we were not waiting for - AMQStateManager manager = getAMQConnection().getProtocolHandler() - .getStateManager(); + AMQStateManager manager = getProtocolHandler().getStateManager(); Exception e = manager.getLastException(); if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0f8b5717d6..f8e837cd34 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -31,7 +31,6 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.framing.AMQShortString; @@ -87,8 +86,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa private final AMQSession _session; - private final AMQProtocolHandler _protocolHandler; - /** * We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ @@ -140,9 +137,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, - AMQSession session, AMQProtocolHandler protocolHandler, - FieldTable rawSelector, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException + AMQSession session, FieldTable rawSelector, + int prefetchHigh, int prefetchLow, boolean exclusive, + int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { _channelId = channelId; _connection = connection; @@ -150,7 +147,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _destination = destination; _messageFactory = messageFactory; _session = session; - _protocolHandler = protocolHandler; _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; @@ -1042,10 +1038,4 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { return _messageFactory; } - - protected AMQProtocolHandler getProtocolHandler() - { - return _protocolHandler; - } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 26bb51b821..ca5b1ac9c1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -28,7 +28,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.Session; @@ -82,13 +81,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, - AMQSession<?,?> session, AMQProtocolHandler protocolHandler, - FieldTable rawSelector, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) + AMQSession<?,?> session, FieldTable rawSelector, + int prefetchHigh, int prefetchLow, boolean exclusive, + int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { - super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); + super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector, + prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index b00f9dd98a..fc7eacc760 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -29,7 +29,6 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_8; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQFrame; @@ -52,12 +51,12 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session, - AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException + FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, + int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session, - protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive, - acknowledgeMode, browseOnly, autoClose); + rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, + browseOnly, autoClose); final FieldTable consumerArguments = getArguments(); if (isAutoClose()) { @@ -93,13 +92,19 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe } } + @Override + public AMQSession_0_8 getSession() + { + return (AMQSession_0_8) super.getSession(); + } + void sendCancel() throws AMQException, FailoverException { BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false); final AMQFrame cancelFrame = body.generateFrame(getChannelId()); - getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class); + getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class); if (_logger.isDebugEnabled()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 9b3b2ce0e9..5cd596108a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.client; -import java.io.UnsupportedEncodingException; import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; @@ -36,12 +35,10 @@ import javax.jms.Topic; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -71,18 +68,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private AMQDestination _destination; /** - * Default encoding used for messages produced by this producer. - */ - private String _encoding; - - /** - * Default encoding used for message produced by this producer. - */ - private String _mimeType; - - private AMQProtocolHandler _protocolHandler; - - /** * True if this producer was created from a transacted session */ private boolean _transacted; @@ -135,14 +120,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, - Boolean immediate, Boolean mandatory) throws AMQException + AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { _logger = logger; _connection = connection; _destination = destination; _transacted = transacted; - _protocolHandler = protocolHandler; _channelId = channelId; _session = session; _producerId = producerId; @@ -163,6 +146,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac setPublishMode(); } + protected AMQConnection getConnection() + { + return _connection; + } + void setPublishMode() { // Publish mode could be configured at destination level as well. @@ -558,18 +546,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - public void setMimeType(String mimeType) throws JMSException - { - checkNotClosed(); - _mimeType = mimeType; - } - - public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException - { - checkNotClosed(); - _encoding = encoding; - } - private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException { checkNotClosed(); @@ -645,16 +621,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _destination = destination; } - protected AMQProtocolHandler getProtocolHandler() - { - return _protocolHandler; - } - - protected void setProtocolHandler(AMQProtocolHandler protocolHandler) - { - _protocolHandler = protocolHandler; - } - protected int getChannelId() { return _channelId; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index a3a1e9c28b..3b4f642d4c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -27,7 +27,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.messaging.address.Link.Reliability; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; @@ -60,10 +59,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer private byte[] userIDBytes; BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, - Boolean immediate, Boolean mandatory) throws AMQException + AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { - super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); + super(_logger, connection, destination, transacted, channelId, session, producerId, immediate, mandatory); userIDBytes = Strings.toUTF8(getUserID()); } @@ -79,7 +77,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer (name, destination.getExchangeClass().toString(), null, null, - name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); + name.startsWith("amq.") ? Option.PASSIVE : Option.NONE, + destination.isExchangeDurable() ? Option.DURABLE : Option.NONE, + destination.isExchangeAutoDelete() ? Option.AUTO_DELETE : Option.NONE); } } else diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 04cc876b76..bb270b0878 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -50,7 +50,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { - super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); + super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory); } void declareDestination(AMQDestination destination) @@ -63,14 +63,14 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer destination.getExchangeName(), destination.getExchangeClass(), destination.getExchangeName().toString().startsWith("amq."), - false, - false, - false, + destination.isExchangeDurable(), + destination.isExchangeAutoDelete(), + destination.isExchangeInternal(), true, null); AMQFrame declare = body.generateFrame(getChannelId()); - getProtocolHandler().writeFrame(declare); + getConnection().getProtocolHandler().writeFrame(declare); } } @@ -171,7 +171,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer throw jmse; } - getProtocolHandler().writeFrame(compositeFrame); + getConnection().getProtocolHandler().writeFrame(compositeFrame); } /** @@ -233,4 +233,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer return frameCount; } + @Override + public AMQSession_0_8 getSession() + { + return (AMQSession_0_8) super.getSession(); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java index bec8b0917d..82c2b88c30 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java @@ -23,25 +23,11 @@ package org.apache.qpid.jms; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; -import java.io.UnsupportedEncodingException; /** */ public interface MessageProducer extends javax.jms.MessageProducer { - /** - * Set the default MIME type for messages produced by this producer. This reduces the overhead of each message. - * @param mimeType - */ - void setMimeType(String mimeType) throws JMSException; - - /** - * Set the default encoding for messages produced by this producer. This reduces the overhead of each message. - * @param encoding the encoding as understood by XXXX how do I specify this?? RG - * @throws UnsupportedEncodingException if the encoding is not understood - */ - void setEncoding(String encoding) throws UnsupportedEncodingException, JMSException; - void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean immediate) throws JMSException; diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 028e2d5cc3..d0cd24adf6 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -276,7 +276,7 @@ public class AMQSession_0_10Test extends QpidTestCase { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, null, null, false, true); - session.sendConsume(consumer, new AMQShortString("test"), null, true, 1); + session.sendConsume(consumer, new AMQShortString("test"), true, 1); } catch (Exception e) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java index 722cbd0752..066ece7ed1 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java @@ -48,7 +48,7 @@ public class BasicMessageConsumer_0_8_Test extends TestCase TestAMQSession testSession = new TestAMQSession(conn); BasicMessageConsumer_0_8 consumer = - new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour()); } @@ -68,7 +68,7 @@ public class BasicMessageConsumer_0_8_Test extends TestCase final TestAMQSession testSession = new TestAMQSession(conn); final BasicMessageConsumer_0_8 consumer = - new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour()); } @@ -94,7 +94,7 @@ public class BasicMessageConsumer_0_8_Test extends TestCase TestAMQSession testSession = new TestAMQSession(conn); BasicMessageConsumer_0_8 consumer = - new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour()); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java index 9addb0ee71..8f578e6a2f 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java @@ -193,6 +193,126 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getQueueName().equals("test:testQueueD")); } + public void testExchangeOptionsNotPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL)); + + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertFalse(dest.isExchangeAutoDelete()); + assertFalse(dest.isExchangeDurable()); + assertFalse(dest.isExchangeInternal()); + } + + public void testExchangeAutoDeleteOptionPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_AUTODELETE + "='true'"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + + assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL)); + + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertTrue(dest.isExchangeAutoDelete()); + assertFalse(dest.isExchangeDurable()); + assertFalse(dest.isExchangeInternal()); + } + + public void testExchangeDurableOptionPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_DURABLE + "='true'"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + + assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL)); + + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertTrue(dest.isExchangeDurable()); + assertFalse(dest.isExchangeAutoDelete()); + assertFalse(dest.isExchangeInternal()); + } + + public void testExchangeInternalOptionPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_INTERNAL + "='true'"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + + assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE)); + + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertTrue(dest.isExchangeInternal()); + assertFalse(dest.isExchangeDurable()); + assertFalse(dest.isExchangeAutoDelete()); + } + public void testRejectBehaviourPresent() throws URISyntaxException { String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'"; diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index f199961b6f..751066abbc 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -124,7 +124,7 @@ public class TestAMQSession extends AMQSession_0_8 return false; } - public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException + public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException { } @@ -139,13 +139,13 @@ public class TestAMQSession extends AMQSession_0_8 return null; } - public void sendExchangeDeclare(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException + public void sendExchangeDeclare(AMQShortString name, AMQShortString type, boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { } public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler, - boolean nowait, boolean passive) throws AMQException, FailoverException + boolean passive) throws AMQException, FailoverException { } |
