summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-09-01 12:09:54 +0000
committerRobert Gemmell <robbie@apache.org>2012-09-01 12:09:54 +0000
commit44ad7ccacaab3d56c0a7e29ba13dcc3db57ebdf8 (patch)
tree5e7455fe999a28b30c87baa59fa79706b9e382a7 /qpid/java/client
parent1430d822998d25832fd4c4266859cc2369f56c58 (diff)
downloadqpid-python-44ad7ccacaab3d56c0a7e29ba13dcc3db57ebdf8.tar.gz
QPID-4261: extend BindingURLs to allow specifying exchange durable/autodelete/internal options, use the values when sending exchange declares during producer and consumer creation. Fix ExchangeDeclareHandler to set auto-delete properly (though we dont actually support it, and it was removed from the protocol in 0-9-1).
Isolate AMQProtocolHandler use to the 0-8/0-9/0-9-1 specific Session/Producer/Consumer implementations that actually need it instead of letting it bleed through the abstraction and 0-10 implementations that dont use it. Add some other clarifying comments. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1379748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java52
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java94
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java54
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java69
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java17
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java46
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java17
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java14
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java6
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java120
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java6
15 files changed, 310 insertions, 224 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 530186b1f9..096738f9ad 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -52,6 +52,12 @@ public abstract class AMQDestination implements Destination, Referenceable
private AMQShortString _exchangeClass;
+ private boolean _exchangeAutoDelete;
+
+ private boolean _exchangeDurable;
+
+ private boolean _exchangeInternal;
+
private boolean _isDurable;
private boolean _isExclusive;
@@ -184,6 +190,7 @@ public abstract class AMQDestination implements Destination, Referenceable
private Node _sourceNode;
private Link _targetLink;
private Link _link;
+
// ----- / Fields required to support new address syntax -------
@@ -280,6 +287,9 @@ public abstract class AMQDestination implements Destination, Referenceable
{
_exchangeName = binding.getExchangeName();
_exchangeClass = binding.getExchangeClass();
+ _exchangeDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+ _exchangeAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+ _exchangeInternal = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
_isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
@@ -358,6 +368,10 @@ public abstract class AMQDestination implements Destination, Referenceable
_destSyntax = DestSyntax.BURL;
_browseOnly = browseOnly;
_rejectBehaviour = null;
+ _exchangeAutoDelete = false;
+ _exchangeDurable = false;
+ _exchangeInternal = false;
+
if (_logger.isDebugEnabled())
{
_logger.debug("Based on " + toString() + " the selected destination syntax is " + _destSyntax);
@@ -412,6 +426,21 @@ public abstract class AMQDestination implements Destination, Referenceable
return _exchangeClass;
}
+ public boolean isExchangeDurable()
+ {
+ return _exchangeDurable;
+ }
+
+ public boolean isExchangeAutoDelete()
+ {
+ return _exchangeAutoDelete;
+ }
+
+ public boolean isExchangeInternal()
+ {
+ return _exchangeInternal;
+ }
+
public boolean isTopic()
{
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(_exchangeClass);
@@ -579,6 +608,27 @@ public abstract class AMQDestination implements Destination, Referenceable
sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
}
+ if (_exchangeDurable)
+ {
+ sb.append(BindingURL.OPTION_EXCHANGE_DURABLE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_exchangeAutoDelete)
+ {
+ sb.append(BindingURL.OPTION_EXCHANGE_AUTODELETE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_exchangeInternal)
+ {
+ sb.append(BindingURL.OPTION_EXCHANGE_INTERNAL);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
//removeKey the last char '?' if there is no options , ',' if there are.
sb.deleteCharAt(sb.length() - 1);
url = sb.toString();
@@ -935,6 +985,4 @@ public abstract class AMQDestination implements Destination, Referenceable
{
return _rejectBehaviour;
}
-
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d4e6ec16e4..f258404e2d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -49,13 +49,11 @@ import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
@@ -649,12 +647,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
- public MethodRegistry getMethodRegistry()
- {
- MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
- return methodRegistry;
- }
-
/**
* Binds the named queue, with the specified routing key, to the named exchange.
*
@@ -1550,7 +1542,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
{
- declareExchange(name, type, getProtocolHandler(), nowait);
+ declareExchange(name, type, nowait, false, false, false);
}
abstract public void sync() throws AMQException;
@@ -1690,8 +1682,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throws
AMQException
{
- AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler, false);
+ declareExchange(amqd, false);
AMQShortString queueName = declareQueue(amqd, false);
bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
}
@@ -2582,11 +2573,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/**
* Register to consume from the queue.
- *
* @param queueName
*/
- private void consumeFromQueue(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException
+ private void consumeFromQueue(C consumer, AMQShortString queueName, boolean nowait) throws AMQException, FailoverException
{
int tagId = _nextTag++;
@@ -2603,7 +2592,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- sendConsume(consumer, queueName, protocolHandler, nowait, tagId);
+ sendConsume(consumer, queueName, nowait, tagId);
}
catch (AMQException e)
{
@@ -2614,7 +2603,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendConsume(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException;
+ boolean nowait, int tag) throws AMQException, FailoverException;
private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
throws JMSException
@@ -2648,9 +2637,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract P createMessageProducer(final Destination destination, final Boolean mandatory,
final Boolean immediate, final long producerId) throws JMSException;
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
+ private void declareExchange(AMQDestination amqd, boolean nowait) throws AMQException
{
- declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
+ declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), nowait, amqd.isExchangeDurable(),
+ amqd.isExchangeAutoDelete(), amqd.isExchangeInternal());
}
/**
@@ -2707,33 +2697,29 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*
* @param name The name of the exchange to declare.
* @param type The type of the exchange to declare.
- * @param protocolHandler The protocol handler to process the communication through.
* @param nowait
- *
+ * @param durable
+ * @param autoDelete
+ * @param internal
* @throws AMQException If the exchange cannot be declared for any reason.
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void declareExchange(final AMQShortString name, final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+ final boolean nowait, final boolean durable,
+ final boolean autoDelete, final boolean internal) throws AMQException
{
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendExchangeDeclare(name, type, protocolHandler, nowait);
+ sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal);
return null;
}
}, _connection).execute();
}
- public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException;
-
-
- void declareQueuePassive(AMQDestination queue) throws AMQException
- {
- declareQueue(queue,false,false,true);
- }
+ public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException;
/**
* Declares a queue for a JMS destination.
@@ -2768,31 +2754,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return declareQueue(amqd, noLocal, nowait, false);
}
- protected AMQShortString declareQueue(final AMQDestination amqd,
- final boolean noLocal, final boolean nowait, final boolean passive)
- throws AMQException
- {
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
- return new FailoverNoopSupport<AMQShortString, AMQException>(
- new FailoverProtectedOperation<AMQShortString, AMQException>()
- {
- public AMQShortString execute() throws AMQException, FailoverException
- {
- // Generate the queue name if the destination indicates that a client generated name is to be used.
- if (amqd.isNameRequired())
- {
- amqd.setQueueName(protocolHandler.generateQueueName());
- }
-
- sendQueueDeclare(amqd, protocolHandler, nowait, passive);
-
- return amqd.getAMQQueueName();
- }
- }, _connection).execute();
- }
-
- public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait, boolean passive) throws AMQException, FailoverException;
+ protected abstract AMQShortString declareQueue(final AMQDestination amqd,
+ final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException;
/**
* Undeclares the specified queue.
@@ -2845,21 +2808,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return ++_nextProducerId;
}
- protected AMQProtocolHandler getProtocolHandler()
- {
- return _connection.getProtocolHandler();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolHandler().getProtocolMajorVersion();
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolHandler().getProtocolMinorVersion();
- }
-
protected boolean hasMessageListeners()
{
return _hasMessageListeners;
@@ -2918,8 +2866,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
AMQDestination amqd = consumer.getDestination();
- AMQProtocolHandler protocolHandler = getProtocolHandler();
-
if (amqd.getDestSyntax() == DestSyntax.ADDR)
{
handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait);
@@ -2928,7 +2874,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (_declareExchanges)
{
- declareExchange(amqd, protocolHandler, nowait);
+ declareExchange(amqd, nowait);
}
if (_delareQueues || amqd.isNameRequired())
@@ -2973,7 +2919,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+ consumeFromQueue(consumer, queueName, nowait);
}
catch (FailoverException e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 8a7c6b1a01..dcbdadf46d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -46,12 +46,12 @@ import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.*;
+
import static org.apache.qpid.transport.Option.BATCH;
import static org.apache.qpid.transport.Option.NONE;
import static org.apache.qpid.transport.Option.SYNC;
@@ -523,11 +523,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
final FieldTable rawSelector, final boolean noConsume,
final boolean autoClose) throws JMSException
{
-
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
- getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh,
- prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose);
+ getMessageFactoryRegistry(), this, rawSelector, prefetchHigh, prefetchLow,
+ exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
/**
@@ -591,7 +589,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* This method is invoked when a consumer is created
* Registers the consumer with the broker
*/
- public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName,
boolean nowait, int tag)
throws AMQException, FailoverException
{
@@ -653,7 +651,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
try
{
return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this,
- getProtocolHandler(), producerId, immediate, mandatory);
+ producerId, immediate, mandatory);
}
catch (AMQException e)
{
@@ -673,26 +671,25 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* creates an exchange if it does not already exist
*/
- public void sendExchangeDeclare(final AMQShortString name,
- final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait)
- throws AMQException, FailoverException
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
{
- sendExchangeDeclare(name.asString(), type.asString(), null, null,
- nowait);
+ //The 'internal' parameter is ignored on the 0-10 path, the protocol does not support it
+ sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete);
}
public void sendExchangeDeclare(final String name, final String type,
final String alternateExchange, final Map<String, Object> args,
- final boolean nowait) throws AMQException
+ final boolean nowait, boolean durable, boolean autoDelete) throws AMQException
{
getQpidSession().exchangeDeclare(
name,
type,
alternateExchange,
args,
- name.toString().startsWith("amq.") ? Option.PASSIVE
- : Option.NONE);
+ name.toString().startsWith("amq.") ? Option.PASSIVE : Option.NONE,
+ durable ? Option.DURABLE : Option.NONE,
+ autoDelete ? Option.AUTO_DELETE : Option.NONE);
// We need to sync so that we get notify of an error.
if (!nowait)
{
@@ -717,18 +714,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Declare a queue with the given queueName
*/
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait, boolean passive)
- throws AMQException, FailoverException
- {
- // do nothing this is only used by 0_8
- }
-
- /**
- * Declare a queue with the given queueName
- */
- public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait, boolean passive)
+ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal,
+ final boolean nowait, boolean passive)
throws AMQException
{
AMQShortString queueName;
@@ -925,12 +912,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return getCurrentException();
}
+ @Override
protected AMQShortString declareQueue(final AMQDestination amqd,
final boolean noLocal, final boolean nowait, final boolean passive)
throws AMQException
{
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
-
return new FailoverNoopSupport<AMQShortString, AMQException>(
new FailoverProtectedOperation<AMQShortString, AMQException>()
{
@@ -947,7 +933,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
}
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive);
+ return send0_10QueueDeclare(amqd, noLocal, nowait, passive);
}
}, getAMQConnection()).execute();
}
@@ -1217,7 +1203,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
else if(createNode)
{
setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,null,noLocal,noWait, false);
+ send0_10QueueDeclare(dest,noLocal,noWait,false);
sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
null,dest.getExchangeName(),dest, false);
break;
@@ -1244,7 +1230,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
dest.getExchangeClass().asString(),
dest.getTargetNode().getAlternateExchange(),
dest.getTargetNode().getDeclareArgs(),
- false);
+ false, false, false);
if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
{
createSubscriptionQueue(dest,noLocal);
@@ -1323,7 +1309,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
node.setExclusive(true);
node.setAutoDelete(!node.isDurable());
- send0_10QueueDeclare(dest,null,noLocal,true, false);
+ send0_10QueueDeclare(dest,noLocal,true,false);
getQpidSession().exchangeBind(dest.getQueueName(),
dest.getAddressName(),
dest.getSubject(),
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 8ab23a240e..9cd5eb1491 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
@@ -359,9 +360,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
}
- @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+ @Override
+ public void sendConsume(BasicMessageConsumer_0_8 consumer,
AMQShortString queueName,
- AMQProtocolHandler protocolHandler,
boolean nowait,
int tag) throws AMQException, FailoverException
{
@@ -380,27 +381,29 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if (nowait)
{
- protocolHandler.writeFrame(jmsConsume);
+ getProtocolHandler().writeFrame(jmsConsume);
}
else
{
- protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ getProtocolHandler().syncWrite(jmsConsume, BasicConsumeOkBody.class);
}
}
- public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException
+ @Override
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
{
+ //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
name.toString().startsWith("amq."),
- false,false,false,false,null);
+ durable, autoDelete, internal, false, null);
AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
- protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait, boolean passive) throws AMQException, FailoverException
+ private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException
{
QueueDeclareBody body =
getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -414,7 +417,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
AMQFrame queueDeclare = body.generateFrame(getChannelId());
- protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ }
+
+ @Override
+ protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal,
+ final boolean nowait, final boolean passive) throws AMQException
+ {
+ //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
+ return new FailoverNoopSupport<AMQShortString, AMQException>(
+ new FailoverProtectedOperation<AMQShortString, AMQException>()
+ {
+ public AMQShortString execute() throws AMQException, FailoverException
+ {
+ // Generate the queue name if the destination indicates that a client generated name is to be used.
+ if (amqd.isNameRequired())
+ {
+ amqd.setQueueName(protocolHandler.generateQueueName());
+ }
+
+ sendQueueDeclare(amqd, passive);
+
+ return amqd.getAMQQueueName();
+ }
+ }, getAMQConnection()).execute();
}
public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
@@ -440,10 +468,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException
{
-
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
- getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow,
+ getMessageFactoryRegistry(),this, arguments, prefetchHigh, prefetchLow,
exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
@@ -662,14 +688,23 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
queueName == null ? null : new AMQShortString(queueName),
bindingKey == null ? null : new AMQShortString(bindingKey));
}
-
+
+ private AMQProtocolHandler getProtocolHandler()
+ {
+ return getAMQConnection().getProtocolHandler();
+ }
+
+ public MethodRegistry getMethodRegistry()
+ {
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ return methodRegistry;
+ }
public AMQException getLastException()
{
// if the Connection has closed then we should throw any exception that
// has occurred that we were not waiting for
- AMQStateManager manager = getAMQConnection().getProtocolHandler()
- .getStateManager();
+ AMQStateManager manager = getProtocolHandler().getStateManager();
Exception e = manager.getLastException();
if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 0f8b5717d6..f8e837cd34 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -31,7 +31,6 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.framing.AMQShortString;
@@ -87,8 +86,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
private final AMQSession _session;
- private final AMQProtocolHandler _protocolHandler;
-
/**
* We need to store the "raw" field table so that we can resubscribe in the event of failover being required
*/
@@ -140,9 +137,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
- AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelector, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
+ AMQSession session, FieldTable rawSelector,
+ int prefetchHigh, int prefetchLow, boolean exclusive,
+ int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
_channelId = channelId;
_connection = connection;
@@ -150,7 +147,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_destination = destination;
_messageFactory = messageFactory;
_session = session;
- _protocolHandler = protocolHandler;
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
@@ -1042,10 +1038,4 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
return _messageFactory;
}
-
- protected AMQProtocolHandler getProtocolHandler()
- {
- return _protocolHandler;
- }
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 26bb51b821..ca5b1ac9c1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -28,7 +28,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.Session;
@@ -82,13 +81,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
- AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelector, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
+ AMQSession<?,?> session, FieldTable rawSelector,
+ int prefetchHigh, int prefetchLow, boolean exclusive,
+ int acknowledgeMode, boolean browseOnly, boolean autoClose)
throws JMSException
{
- super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
+ super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector,
+ prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
_serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index b00f9dd98a..fc7eacc760 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -29,7 +29,6 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
@@ -52,12 +51,12 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
+ FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive,
+ int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
- protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive,
- acknowledgeMode, browseOnly, autoClose);
+ rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode,
+ browseOnly, autoClose);
final FieldTable consumerArguments = getArguments();
if (isAutoClose())
{
@@ -93,13 +92,19 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
}
}
+ @Override
+ public AMQSession_0_8 getSession()
+ {
+ return (AMQSession_0_8) super.getSession();
+ }
+
void sendCancel() throws AMQException, FailoverException
{
BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false);
final AMQFrame cancelFrame = body.generateFrame(getChannelId());
- getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
+ getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
if (_logger.isDebugEnabled())
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 9b3b2ce0e9..5cd596108a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.client;
-import java.io.UnsupportedEncodingException;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
@@ -36,12 +35,10 @@ import javax.jms.Topic;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -71,18 +68,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private AMQDestination _destination;
/**
- * Default encoding used for messages produced by this producer.
- */
- private String _encoding;
-
- /**
- * Default encoding used for message produced by this producer.
- */
- private String _mimeType;
-
- private AMQProtocolHandler _protocolHandler;
-
- /**
* True if this producer was created from a transacted session
*/
private boolean _transacted;
@@ -135,14 +120,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- Boolean immediate, Boolean mandatory) throws AMQException
+ AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
_logger = logger;
_connection = connection;
_destination = destination;
_transacted = transacted;
- _protocolHandler = protocolHandler;
_channelId = channelId;
_session = session;
_producerId = producerId;
@@ -163,6 +146,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
setPublishMode();
}
+ protected AMQConnection getConnection()
+ {
+ return _connection;
+ }
+
void setPublishMode()
{
// Publish mode could be configured at destination level as well.
@@ -558,18 +546,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- public void setMimeType(String mimeType) throws JMSException
- {
- checkNotClosed();
- _mimeType = mimeType;
- }
-
- public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException
- {
- checkNotClosed();
- _encoding = encoding;
- }
-
private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
{
checkNotClosed();
@@ -645,16 +621,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_destination = destination;
}
- protected AMQProtocolHandler getProtocolHandler()
- {
- return _protocolHandler;
- }
-
- protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
- {
- _protocolHandler = protocolHandler;
- }
-
protected int getChannelId()
{
return _channelId;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index a3a1e9c28b..3b4f642d4c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -27,7 +27,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
@@ -60,10 +59,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
private byte[] userIDBytes;
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- Boolean immediate, Boolean mandatory) throws AMQException
+ AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
- super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
+ super(_logger, connection, destination, transacted, channelId, session, producerId, immediate, mandatory);
userIDBytes = Strings.toUTF8(getUserID());
}
@@ -79,7 +77,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
(name,
destination.getExchangeClass().toString(),
null, null,
- name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
+ name.startsWith("amq.") ? Option.PASSIVE : Option.NONE,
+ destination.isExchangeDurable() ? Option.DURABLE : Option.NONE,
+ destination.isExchangeAutoDelete() ? Option.AUTO_DELETE : Option.NONE);
}
}
else
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 04cc876b76..bb270b0878 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -50,7 +50,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
- super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
+ super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory);
}
void declareDestination(AMQDestination destination)
@@ -63,14 +63,14 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
destination.getExchangeName(),
destination.getExchangeClass(),
destination.getExchangeName().toString().startsWith("amq."),
- false,
- false,
- false,
+ destination.isExchangeDurable(),
+ destination.isExchangeAutoDelete(),
+ destination.isExchangeInternal(),
true,
null);
AMQFrame declare = body.generateFrame(getChannelId());
- getProtocolHandler().writeFrame(declare);
+ getConnection().getProtocolHandler().writeFrame(declare);
}
}
@@ -171,7 +171,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
throw jmse;
}
- getProtocolHandler().writeFrame(compositeFrame);
+ getConnection().getProtocolHandler().writeFrame(compositeFrame);
}
/**
@@ -233,4 +233,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
return frameCount;
}
+ @Override
+ public AMQSession_0_8 getSession()
+ {
+ return (AMQSession_0_8) super.getSession();
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
index bec8b0917d..82c2b88c30 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
@@ -23,25 +23,11 @@ package org.apache.qpid.jms;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
-import java.io.UnsupportedEncodingException;
/**
*/
public interface MessageProducer extends javax.jms.MessageProducer
{
- /**
- * Set the default MIME type for messages produced by this producer. This reduces the overhead of each message.
- * @param mimeType
- */
- void setMimeType(String mimeType) throws JMSException;
-
- /**
- * Set the default encoding for messages produced by this producer. This reduces the overhead of each message.
- * @param encoding the encoding as understood by XXXX how do I specify this?? RG
- * @throws UnsupportedEncodingException if the encoding is not understood
- */
- void setEncoding(String encoding) throws UnsupportedEncodingException, JMSException;
-
void send(Destination destination, Message message, int deliveryMode,
int priority, long timeToLive, boolean immediate)
throws JMSException;
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index 028e2d5cc3..d0cd24adf6 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -276,7 +276,7 @@ public class AMQSession_0_10Test extends QpidTestCase
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, null, false, true);
- session.sendConsume(consumer, new AMQShortString("test"), null, true, 1);
+ session.sendConsume(consumer, new AMQShortString("test"), true, 1);
}
catch (Exception e)
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
index 722cbd0752..066ece7ed1 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
@@ -48,7 +48,7 @@ public class BasicMessageConsumer_0_8_Test extends TestCase
TestAMQSession testSession = new TestAMQSession(conn);
BasicMessageConsumer_0_8 consumer =
- new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour());
}
@@ -68,7 +68,7 @@ public class BasicMessageConsumer_0_8_Test extends TestCase
final TestAMQSession testSession = new TestAMQSession(conn);
final BasicMessageConsumer_0_8 consumer =
- new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
}
@@ -94,7 +94,7 @@ public class BasicMessageConsumer_0_8_Test extends TestCase
TestAMQSession testSession = new TestAMQSession(conn);
BasicMessageConsumer_0_8 consumer =
- new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
index 9addb0ee71..8f578e6a2f 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
@@ -193,6 +193,126 @@ public class DestinationURLTest extends TestCase
assertTrue(dest.getQueueName().equals("test:testQueueD"));
}
+ public void testExchangeOptionsNotPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertFalse(dest.isExchangeAutoDelete());
+ assertFalse(dest.isExchangeDurable());
+ assertFalse(dest.isExchangeInternal());
+ }
+
+ public void testExchangeAutoDeleteOptionPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_AUTODELETE + "='true'";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertTrue(dest.isExchangeAutoDelete());
+ assertFalse(dest.isExchangeDurable());
+ assertFalse(dest.isExchangeInternal());
+ }
+
+ public void testExchangeDurableOptionPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_DURABLE + "='true'";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertTrue(dest.isExchangeDurable());
+ assertFalse(dest.isExchangeAutoDelete());
+ assertFalse(dest.isExchangeInternal());
+ }
+
+ public void testExchangeInternalOptionPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_INTERNAL + "='true'";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+ assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertTrue(dest.isExchangeInternal());
+ assertFalse(dest.isExchangeDurable());
+ assertFalse(dest.isExchangeAutoDelete());
+ }
+
public void testRejectBehaviourPresent() throws URISyntaxException
{
String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'";
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index f199961b6f..751066abbc 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -124,7 +124,7 @@ public class TestAMQSession extends AMQSession_0_8
return false;
}
- public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException
+ public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException
{
}
@@ -139,13 +139,13 @@ public class TestAMQSession extends AMQSession_0_8
return null;
}
- public void sendExchangeDeclare(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException
+ public void sendExchangeDeclare(AMQShortString name, AMQShortString type, boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
{
}
public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler,
- boolean nowait, boolean passive) throws AMQException, FailoverException
+ boolean passive) throws AMQException, FailoverException
{
}