diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-02-27 12:47:15 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-02-27 12:47:15 +0000 |
| commit | 499cea57dbf11e4c73cdcd56ac268889ff36b880 (patch) | |
| tree | 8b2cf69d9c98ea5a79244bc1d3bc85482fe4618a /java/client | |
| parent | e7d09b6a5e3293021857541bab7aa4ae163ab05c (diff) | |
| download | qpid-python-499cea57dbf11e4c73cdcd56ac268889ff36b880.tar.gz | |
QPID-509 : [Java Client] Messages sent to topics should not default to mandatory
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1294135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
6 files changed, 150 insertions, 129 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e7e937b689..30f1dcf8b7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -120,18 +120,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; /** - * The default value for immediate flag used by producers created by this session is false. That is, a consumer does - * not need to be attached to a queue. - */ - private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); - - /** - * The default value for mandatory flag used by producers created by this session is true. That is, server will not - * silently drop messages where no queue is connected to the exchange for the message. - */ - private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); - - /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked */ @@ -1198,12 +1186,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public P createProducer(Destination destination) throws JMSException { - return createProducerImpl(destination, _defaultMandatoryValue, _defaultImmediateValue); + return createProducerImpl(destination, null, null); } public P createProducer(Destination destination, boolean immediate) throws JMSException { - return createProducerImpl(destination, _defaultMandatoryValue, immediate); + return createProducerImpl(destination, null, immediate); } public P createProducer(Destination destination, boolean mandatory, boolean immediate) @@ -2613,7 +2601,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract void sendConsume(C consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException; - private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate) + private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate) throws JMSException { return new FailoverRetrySupport<P, JMSException>( @@ -2642,8 +2630,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic }, _connection).execute(); } - public abstract P createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final long producerId) throws JMSException; + public abstract P createMessageProducer(final Destination destination, final Boolean mandatory, + final Boolean immediate, final long producerId) throws JMSException; private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 816ad1f222..36dbad1928 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -17,9 +17,20 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.jms.Destination; +import javax.jms.JMSException; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -27,7 +38,6 @@ import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; -import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.FieldTableSupport; import org.apache.qpid.client.message.MessageFactoryRegistry; @@ -42,28 +52,14 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; -import org.apache.qpid.util.Serial; -import org.apache.qpid.util.Strings; - import static org.apache.qpid.transport.Option.BATCH; import static org.apache.qpid.transport.Option.NONE; import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.UNRELIABLE; - -import javax.jms.Destination; -import javax.jms.JMSException; -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.qpid.util.Serial; +import org.apache.qpid.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is a 0.10 Session @@ -654,8 +650,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Create an 0_10 message producer */ - public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final long producerId) throws JMSException + public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final Boolean mandatory, + final Boolean immediate, final long producerId) throws JMSException { try { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 29f1925cbc..b56da5c2ec 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -441,8 +441,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } - public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, long producerId) throws JMSException + public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final Boolean mandatory, + final Boolean immediate, long producerId) throws JMSException { try { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 75f198e1fa..56739f17f9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -20,18 +20,8 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.MessageConverter; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.util.UUIDGen; -import org.apache.qpid.util.UUIDs; - +import java.io.UnsupportedEncodingException; +import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -42,74 +32,19 @@ import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; -import java.io.UnsupportedEncodingException; -import java.util.UUID; +import javax.jms.Topic; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageConverter; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { - /** - * If true, messages will not get a timestamp. - */ - protected boolean isDisableTimestamps() - { - return _disableTimestamps; - } - - protected void setDisableTimestamps(boolean disableTimestamps) - { - _disableTimestamps = disableTimestamps; - } - - protected void setDestination(AMQDestination destination) - { - _destination = destination; - } - - protected AMQProtocolHandler getProtocolHandler() - { - return _protocolHandler; - } - - protected void setProtocolHandler(AMQProtocolHandler protocolHandler) - { - _protocolHandler = protocolHandler; - } - - protected int getChannelId() - { - return _channelId; - } - - protected void setChannelId(int channelId) - { - _channelId = channelId; - } - - protected void setSession(AMQSession session) - { - _session = session; - } - - protected String getUserID() - { - return _userID; - } - - protected void setUserID(String userID) - { - _userID = userID; - } - - protected PublishMode getPublishMode() - { - return publishMode; - } - - protected void setPublishMode(PublishMode publishMode) - { - this.publishMode = publishMode; - } - enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; private final Logger _logger = LoggerFactory.getLogger(getClass()); @@ -166,7 +101,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private final boolean _immediate; - private final boolean _mandatory; + private final Boolean _mandatory; private boolean _disableMessageId; @@ -174,12 +109,34 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private String _userID; // ref user id used in the connection. - private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; + + /** + * The default value for immediate flag used this producer is false. That is, a consumer does + * not need to be attached to a queue. + */ + private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + + /** + * The default value for mandatory flag used by this producer is true. That is, server will not + * silently drop messages where no queue is connected to the exchange for the message. + */ + private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + + /** + * The default value for mandatory flag used by this producer when publishing to a Topic is false. That is, server + * will silently drop messages where no queue is connected to the exchange for the message. + */ + private final boolean _defaultMandatoryTopicValue = + Boolean.parseBoolean(System.getProperty("qpid.default_mandatory_topic", + System.getProperties().containsKey("qpid.default_mandatory") + ? System.getProperty("qpid.default_mandatory") + : "false")); private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, + Boolean immediate, Boolean mandatory) throws AMQException { _connection = connection; _destination = destination; @@ -193,8 +150,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac declareDestination(destination); } - _immediate = immediate; - _mandatory = mandatory; + _immediate = immediate == null ? _defaultImmediateValue : immediate; + _mandatory = mandatory == null + ? destination == null ? null + : destination instanceof Topic + ? _defaultMandatoryTopicValue + : _defaultMandatoryValue + : mandatory; + _userID = connection.getUsername(); setPublishMode(); } @@ -381,7 +344,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, + sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, + _mandatory == null + ? destination instanceof Topic + ? _defaultMandatoryTopicValue + : _defaultMandatoryValue + : _mandatory, _immediate); } } @@ -394,7 +362,13 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, + _mandatory == null + ? destination instanceof Topic + ? _defaultMandatoryTopicValue + : _defaultMandatoryValue + : _mandatory, + _immediate); } } @@ -646,6 +620,69 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } + /** + * If true, messages will not get a timestamp. + */ + protected boolean isDisableTimestamps() + { + return _disableTimestamps; + } + + protected void setDisableTimestamps(boolean disableTimestamps) + { + _disableTimestamps = disableTimestamps; + } + + protected void setDestination(AMQDestination destination) + { + _destination = destination; + } + + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + + protected void setProtocolHandler(AMQProtocolHandler protocolHandler) + { + _protocolHandler = protocolHandler; + } + + protected int getChannelId() + { + return _channelId; + } + + protected void setChannelId(int channelId) + { + _channelId = channelId; + } + + protected void setSession(AMQSession session) + { + _session = session; + } + + protected String getUserID() + { + return _userID; + } + + protected void setUserID(String userID) + { + _userID = userID; + } + + protected PublishMode getPublishMode() + { + return publishMode; + } + + protected void setPublishMode(PublishMode publishMode) + { + this.publishMode = publishMode; + } + Logger getLogger() { return _logger; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 024219cfd6..db5baed586 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -61,7 +61,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, - boolean immediate, boolean mandatory) throws AMQException + Boolean immediate, Boolean mandatory) throws AMQException { super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 3b5e361f97..7fd8feef54 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -44,7 +44,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer { BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); } |
