From 4df139e950abc64014511b7714734966539eb0de Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Sat, 1 Sep 2012 12:09:54 +0000 Subject: QPID-4261: extend BindingURLs to allow specifying exchange durable/autodelete/internal options, use the values when sending exchange declares during producer and consumer creation. Fix ExchangeDeclareHandler to set auto-delete properly (though we dont actually support it, and it was removed from the protocol in 0-9-1). Isolate AMQProtocolHandler use to the 0-8/0-9/0-9-1 specific Session/Producer/Consumer implementations that actually need it instead of letting it bleed through the abstraction and 0-10 implementations that dont use it. Add some other clarifying comments. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1379748 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/handler/ExchangeDeclareHandler.java | 2 +- .../org/apache/qpid/client/AMQDestination.java | 52 ++++++++- .../java/org/apache/qpid/client/AMQSession.java | 94 ++++------------ .../org/apache/qpid/client/AMQSession_0_10.java | 54 ++++------ .../org/apache/qpid/client/AMQSession_0_8.java | 69 +++++++++--- .../apache/qpid/client/BasicMessageConsumer.java | 16 +-- .../qpid/client/BasicMessageConsumer_0_10.java | 11 +- .../qpid/client/BasicMessageConsumer_0_8.java | 17 +-- .../apache/qpid/client/BasicMessageProducer.java | 46 ++------ .../qpid/client/BasicMessageProducer_0_10.java | 10 +- .../qpid/client/BasicMessageProducer_0_8.java | 17 +-- .../java/org/apache/qpid/jms/MessageProducer.java | 14 --- .../apache/qpid/client/AMQSession_0_10Test.java | 2 +- .../qpid/client/BasicMessageConsumer_0_8_Test.java | 6 +- .../client/destinationurl/DestinationURLTest.java | 120 +++++++++++++++++++++ .../qpid/test/unit/message/TestAMQSession.java | 6 +- .../main/java/org/apache/qpid/url/BindingURL.java | 3 + .../client/DynamicQueueExchangeCreateTest.java | 81 ++++++++++++-- .../close/JavaServerCloseRaceConditionTest.java | 4 +- 19 files changed, 391 insertions(+), 233 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 8756409f64..eed0cd6020 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -101,7 +101,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener(new FailoverProtectedOperation() { public Object execute() throws AMQException, FailoverException { - sendExchangeDeclare(name, type, protocolHandler, nowait); + sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal); return null; } }, _connection).execute(); } - public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; - - - void declareQueuePassive(AMQDestination queue) throws AMQException - { - declareQueue(queue,false,false,true); - } + public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException; /** * Declares a queue for a JMS destination. @@ -2768,31 +2754,8 @@ public abstract class AMQSession( - new FailoverProtectedOperation() - { - public AMQShortString execute() throws AMQException, FailoverException - { - // Generate the queue name if the destination indicates that a client generated name is to be used. - if (amqd.isNameRequired()) - { - amqd.setQueueName(protocolHandler.generateQueueName()); - } - - sendQueueDeclare(amqd, protocolHandler, nowait, passive); - - return amqd.getAMQQueueName(); - } - }, _connection).execute(); - } - - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait, boolean passive) throws AMQException, FailoverException; + protected abstract AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException; /** * Undeclares the specified queue. @@ -2845,21 +2808,6 @@ public abstract class AMQSession args, - final boolean nowait) throws AMQException + final boolean nowait, boolean durable, boolean autoDelete) throws AMQException { getQpidSession().exchangeDeclare( name, type, alternateExchange, args, - name.toString().startsWith("amq.") ? Option.PASSIVE - : Option.NONE); + name.toString().startsWith("amq.") ? Option.PASSIVE : Option.NONE, + durable ? Option.DURABLE : Option.NONE, + autoDelete ? Option.AUTO_DELETE : Option.NONE); // We need to sync so that we get notify of an error. if (!nowait) { @@ -717,18 +714,8 @@ public class AMQSession_0_10 extends AMQSession( new FailoverProtectedOperation() { @@ -947,7 +933,7 @@ public class AMQSession_0_10 extends AMQSession( + new FailoverProtectedOperation() + { + public AMQShortString execute() throws AMQException, FailoverException + { + // Generate the queue name if the destination indicates that a client generated name is to be used. + if (amqd.isNameRequired()) + { + amqd.setQueueName(protocolHandler.generateQueueName()); + } + + sendQueueDeclare(amqd, passive); + + return amqd.getAMQQueueName(); + } + }, getAMQConnection()).execute(); } public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException @@ -440,10 +468,8 @@ public class AMQSession_0_8 extends AMQSession extends Closeable implements Messa private final AMQSession _session; - private final AMQProtocolHandler _protocolHandler; - /** * We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ @@ -140,9 +137,9 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, - AMQSession session, AMQProtocolHandler protocolHandler, - FieldTable rawSelector, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException + AMQSession session, FieldTable rawSelector, + int prefetchHigh, int prefetchLow, boolean exclusive, + int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { _channelId = channelId; _connection = connection; @@ -150,7 +147,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa _destination = destination; _messageFactory = messageFactory; _session = session; - _protocolHandler = protocolHandler; _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; @@ -1042,10 +1038,4 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { return _messageFactory; } - - protected AMQProtocolHandler getProtocolHandler() - { - return _protocolHandler; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 26bb51b821..ca5b1ac9c1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -28,7 +28,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.Session; @@ -82,13 +81,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer session, AMQProtocolHandler protocolHandler, - FieldTable rawSelector, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) + AMQSession session, FieldTable rawSelector, + int prefetchHigh, int prefetchLow, boolean exclusive, + int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { - super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); + super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector, + prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index b00f9dd98a..fc7eacc760 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -29,7 +29,6 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_8; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQFrame; @@ -52,12 +51,12 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer