diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-09-20 18:59:30 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-09-20 18:59:30 +0000 |
| commit | c70bf3ea28cdf6bafd8571690d3e5c466a0658a2 (patch) | |
| tree | 68b24940e433f3f9c278b054d9ea1622389bd332 /qpid/java/client | |
| parent | fcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e (diff) | |
| download | qpid-python-c70bf3ea28cdf6bafd8571690d3e5c466a0658a2.tar.gz | |
QPID-4984: WIP - Merge from trunk r.1525056
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
16 files changed, 353 insertions, 183 deletions
diff --git a/qpid/java/client/src/main/java/client.bnd b/qpid/java/client/src/main/java/client.bnd index 1d78bee554..5cc63b8d51 100755 --- a/qpid/java/client/src/main/java/client.bnd +++ b/qpid/java/client/src/main/java/client.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.23.0 +ver: 0.25.0 Bundle-SymbolicName: qpid-client Bundle-Version: ${ver} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 74c9878a8e..0dc5cc68c1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -126,7 +126,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** The virtual path to connect to on the AMQ server */ private String _virtualHost; - private ExceptionListener _exceptionListener; + /** The exception listener for this connection object. */ + private volatile ExceptionListener _exceptionListener; private ConnectionListener _connectionListener; @@ -151,10 +152,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ private QpidConnectionMetaData _connectionMetaData; - private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; - private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; - private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; - private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + private AMQShortString _defaultTopicExchangeName = AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME); + private AMQShortString _defaultQueueExchangeName = AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME); + private AMQShortString _temporaryTopicExchangeName = AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME); + private AMQShortString _temporaryQueueExchangeName = AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME); /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); @@ -784,13 +785,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); - - return _exceptionListener; + return getExceptionListenerNoCheck(); } public void setExceptionListener(ExceptionListener listener) throws JMSException { checkNotClosed(); + _exceptionListener = listener; } @@ -1307,45 +1308,56 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.getProtocolSession().notifyError(je); } - // get the failover mutex before trying to close - synchronized (getFailoverMutex()) + try { - // decide if we are going to close the session - if (hardError(cause)) + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { - closer = (!setClosed()) || closer; + // decide if we are going to close the session + if (hardError(cause)) { - _logger.info("Closing AMQConnection due to :" + cause); + closer = (!setClosed()) || closer; + { + _logger.info("Closing AMQConnection due to :" + cause); + } } - } - else - { - _logger.info("Not a hard-error connection not closing: " + cause); - } - - // deliver the exception if there is a listener - if (_exceptionListener != null) - { - _exceptionListener.onException(je); - } - else - { - _logger.error("Throwable Received but no listener set: " + cause); - } - - // if we are closing the connection, close sessions first - if (closer) - { - try + else { - closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + _logger.info("Not a hard-error connection not closing: " + cause); } - catch (JMSException e) + + // if we are closing the connection, close sessions first + if (closer) { - _logger.error("Error closing all sessions: " + e, e); + try + { + closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + } + catch (JMSException e) + { + _logger.error("Error closing all sessions: " + e, e); + } } } } + finally + { + deliverJMSExceptionToExceptionListenerOrLog(je, cause); + } + } + + private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause) + { + // deliver the exception if there is a listener + ExceptionListener exceptionListener = getExceptionListenerNoCheck(); + if (exceptionListener != null) + { + exceptionListener.onException(je); + } + else + { + _logger.error("Throwable Received but no listener set: " + cause); + } } private boolean hardError(Throwable cause) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index fc7762e77d..91c23ff384 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class AMQDestination implements Destination, Referenceable { private static final Logger _logger = LoggerFactory.getLogger(AMQDestination.class); - + private AMQShortString _exchangeName; private AMQShortString _exchangeClass; @@ -65,7 +65,7 @@ public abstract class AMQDestination implements Destination, Referenceable private boolean _isAutoDelete; private boolean _browseOnly; - + private AtomicLong _addressResolved = new AtomicLong(0); private AMQShortString _queueName; @@ -113,10 +113,10 @@ public abstract class AMQDestination implements Destination, Referenceable } // ----- Fields required to support new address syntax ------- - - public enum DestSyntax { + + public enum DestSyntax { BURL,ADDR; - + public static DestSyntax getSyntaxType(String s) { if (("BURL").equals(s)) @@ -133,11 +133,11 @@ public abstract class AMQDestination implements Destination, Referenceable " should be one of {BURL|ADDR}"); } } - } - - public enum AddressOption { - ALWAYS, NEVER, SENDER, RECEIVER; - + } + + public enum AddressOption { + ALWAYS, NEVER, SENDER, RECEIVER; + public static AddressOption getOption(String str) { if ("always".equals(str)) @@ -162,9 +162,9 @@ public abstract class AMQDestination implements Destination, Referenceable } } } - + private final static DestSyntax defaultDestSyntax; - + private DestSyntax _destSyntax = DestSyntax.ADDR; private AddressHelper _addrHelper; @@ -179,25 +179,25 @@ public abstract class AMQDestination implements Destination, Referenceable private Node _node; private Link _link; - + // ----- / Fields required to support new address syntax ------- - + static { defaultDestSyntax = DestSyntax.getSyntaxType( System.getProperty(ClientProperties.DEST_SYNTAX, DestSyntax.ADDR.toString())); - - + + } - + public static DestSyntax getDefaultDestSyntax() { return defaultDestSyntax; } protected AMQDestination() - { + { } protected AMQDestination(Address address) throws Exception @@ -207,10 +207,10 @@ public abstract class AMQDestination implements Destination, Referenceable _destSyntax = DestSyntax.ADDR; _logger.debug("Based on " + address + " the selected destination syntax is " + _destSyntax); } - + public static DestSyntax getDestType(String str) { - if (str.startsWith("BURL:") || + if (str.startsWith("BURL:") || (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL)) { return DestSyntax.BURL; @@ -220,7 +220,7 @@ public abstract class AMQDestination implements Destination, Referenceable return DestSyntax.ADDR; } } - + public static String stripSyntaxPrefix(String str) { if (str.startsWith("BURL:") || str.startsWith("ADDR:")) @@ -232,7 +232,7 @@ public abstract class AMQDestination implements Destination, Referenceable return str; } } - + protected AMQDestination(String str) throws URISyntaxException { parseDestinationString(str); @@ -243,8 +243,8 @@ public abstract class AMQDestination implements Destination, Referenceable _destSyntax = getDestType(str); str = stripSyntaxPrefix(str); if (_destSyntax == DestSyntax.BURL) - { - getInfoFromBindingURL(new AMQBindingURL(str)); + { + getInfoFromBindingURL(new AMQBindingURL(str)); } else { @@ -262,7 +262,7 @@ public abstract class AMQDestination implements Destination, Referenceable } _logger.debug("Based on " + str + " the selected destination syntax is " + _destSyntax); } - + //retained for legacy support protected AMQDestination(BindingURL binding) { @@ -331,8 +331,8 @@ public abstract class AMQDestination implements Destination, Referenceable protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys, boolean browseOnly) { - if ( (ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(exchangeClass) || - ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exchangeClass)) + if ( (AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(exchangeClass) || + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS).equals(exchangeClass)) && routingKey == null) { throw new IllegalArgumentException("routing/binding key must not be null"); @@ -376,16 +376,16 @@ public abstract class AMQDestination implements Destination, Referenceable return toString(); } - public DestSyntax getDestSyntax() + public DestSyntax getDestSyntax() { return _destSyntax; } - + protected void setDestSyntax(DestSyntax syntax) { _destSyntax = syntax; } - + public AMQShortString getEncodedName() { if(_urlAsShortString == null) @@ -431,12 +431,12 @@ public abstract class AMQDestination implements Destination, Referenceable public boolean isTopic() { - return ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(_exchangeClass); + return AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS).equals(_exchangeClass); } public boolean isQueue() { - return ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(_exchangeClass); + return AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(_exchangeClass); } public String getQueueName() @@ -481,7 +481,7 @@ public abstract class AMQDestination implements Destination, Referenceable { return _isExclusive; } - + public boolean isAutoDelete() { return _isAutoDelete; @@ -720,15 +720,15 @@ public abstract class AMQDestination implements Destination, Referenceable { AMQShortString type = binding.getExchangeClass(); - if (type.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + if (type.equals(AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))) { return new AMQQueue(binding); } - else if (type.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + else if (type.equals(AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))) { return new AMQTopic(binding); } - else if (type.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS)) + else if (type.equals(AMQShortString.valueOf(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))) { return new AMQHeadersExchange(binding); } @@ -743,8 +743,8 @@ public abstract class AMQDestination implements Destination, Referenceable DestSyntax syntax = getDestType(str); str = stripSyntaxPrefix(str); if (syntax == DestSyntax.BURL) - { - return createDestination(new AMQBindingURL(str)); + { + return createDestination(new AMQBindingURL(str)); } else { @@ -752,16 +752,16 @@ public abstract class AMQDestination implements Destination, Referenceable return new AMQAnyDestination(address); } } - + // ----- new address syntax ----------- - + public static class Binding { private String exchange; private String bindingKey; private String queue; private Map<String,Object> args; - + public Binding(String exchange, String queue, String bindingKey, @@ -772,36 +772,36 @@ public abstract class AMQDestination implements Destination, Referenceable this.bindingKey = bindingKey; this.args = args; } - - public String getExchange() + + public String getExchange() { return exchange; } - public String getQueue() + public String getQueue() { return queue; } - - public String getBindingKey() + + public String getBindingKey() { return bindingKey; } - - public Map<String, Object> getArgs() + + public Map<String, Object> getArgs() { return args; } } - + public Address getAddress() { return _address; } - + protected void setAddress(Address addr) { _address = addr; } - + public int getAddressType(){ return _addressType; } @@ -809,11 +809,11 @@ public abstract class AMQDestination implements Destination, Referenceable public void setAddressType(int addressType){ _addressType = addressType; } - + public String getAddressName() { return _name; } - + public void setAddressName(String name){ _name = name; } @@ -825,15 +825,15 @@ public abstract class AMQDestination implements Destination, Referenceable public void setSubject(String subject) { _subject = subject; } - + public AddressOption getCreate() { return _create; } - + public void setCreate(AddressOption option) { _create = option; } - + public AddressOption getAssert() { return _assert; } @@ -841,7 +841,7 @@ public abstract class AMQDestination implements Destination, Referenceable public void setAssert(AddressOption option) { _assert = option; } - + public AddressOption getDelete() { return _delete; } @@ -869,22 +869,22 @@ public abstract class AMQDestination implements Destination, Referenceable { _link = link; } - + public void setExchangeName(AMQShortString name) { this._exchangeName = name; } - + public void setExchangeClass(AMQShortString type) { this._exchangeClass = type; } - + public void setRoutingKey(AMQShortString rk) { this._routingKey = rk; } - + public boolean isAddressResolved() { return _addressResolved.get() > 0; @@ -894,80 +894,80 @@ public abstract class AMQDestination implements Destination, Referenceable { _addressResolved.set(addressResolved); } - + private static Address createAddressFromString(String str) { return Address.parse(str); } - + private void getInfoFromAddress() throws Exception { _name = _address.getName(); _subject = _address.getSubject(); - + _addrHelper = new AddressHelper(_address); - + _create = _addrHelper.getCreate() != null ? AddressOption.getOption(_addrHelper.getCreate()):AddressOption.NEVER; - + _assert = _addrHelper.getAssert() != null ? AddressOption.getOption(_addrHelper.getAssert()):AddressOption.NEVER; _delete = _addrHelper.getDelete() != null ? AddressOption.getOption(_addrHelper.getDelete()):AddressOption.NEVER; - + _browseOnly = _addrHelper.isBrowseOnly(); - + _addressType = _addrHelper.getNodeType(); _node = _addrHelper.getNode(); - _link = _addrHelper.getLink(); + _link = _addrHelper.getLink(); } - - // ----- / new address syntax ----------- + + // ----- / new address syntax ----------- public boolean isBrowseOnly() { return _browseOnly; } - + private void setBrowseOnly(boolean b) { _browseOnly = b; } - + public AMQDestination copyDestination() { - AMQDestination dest = + AMQDestination dest = new AMQAnyDestination(_exchangeName, _exchangeClass, _routingKey, - _isExclusive, + _isExclusive, _isAutoDelete, - _queueName, + _queueName, _isDurable, _bindingKeys ); - + dest.setDestSyntax(_destSyntax); dest.setAddress(_address); dest.setAddressName(_name); dest.setSubject(_subject); - dest.setCreate(_create); - dest.setAssert(_assert); - dest.setDelete(_delete); + dest.setCreate(_create); + dest.setAssert(_assert); + dest.setDelete(_delete); dest.setBrowseOnly(_browseOnly); dest.setAddressType(_addressType); dest.setNode(_node); dest.setLink(_link); dest.setAddressResolved(_addressResolved.get()); - return dest; + return dest; } - + protected void setAutoDelete(boolean b) { _isAutoDelete = b; } - + protected void setDurable(boolean b) { _isDurable = b; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java index 922cc1e2a7..714c38d37b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java @@ -41,7 +41,7 @@ public class AMQHeadersExchange extends AMQDestination public AMQHeadersExchange(AMQShortString queueName) { - super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null); + super(queueName, AMQShortString.valueOf(ExchangeDefaults.HEADERS_EXCHANGE_CLASS), queueName, true, true, null); } public boolean isNameRequired() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 4e9b53c814..0f375e4921 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -38,7 +38,7 @@ public class AMQQueue extends AMQDestination implements Queue { super(address); } - + /** * Create a reference to a non temporary queue using a BindingURL object. * Note this does not actually imply the queue exists. @@ -70,13 +70,13 @@ public class AMQQueue extends AMQDestination implements Queue public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) { - super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false, + super(exchangeName, AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS), routingKey, false, false, queueName, false); } public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys) { - super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false, + super(exchangeName, AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS), routingKey, false, false, queueName, false,bindingKeys); } @@ -149,7 +149,7 @@ public class AMQQueue extends AMQDestination implements Queue public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys) { - super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive, + super(exchangeName, AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS), routingKey, exclusive, autoDelete, queueName, durable, bindingKeys); } @@ -167,9 +167,8 @@ public class AMQQueue extends AMQDestination implements Queue public boolean isNameRequired() { - //If the name is null, we require one to be generated by the client so that it will# - //remain valid if we failover (see BLZ-24) - return getQueueName() == null; + AMQShortString queueName = getAMQQueueName(); + return queueName == null || AMQShortString.EMPTY_STRING.equals(queueName); } @Override diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 6b87316e87..9657a49d98 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1380,7 +1380,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setExchangeName(new AMQShortString(dest.getAddressName())); Node node = dest.getNode(); dest.setExchangeClass(node.getExchangeType() == null? - ExchangeDefaults.TOPIC_EXCHANGE_CLASS: + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS): new AMQShortString(node.getExchangeType())); dest.setRoutingKey(new AMQShortString(dest.getSubject())); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index d78e725a5d..4cc32022dc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -60,12 +60,12 @@ public class AMQTopic extends AMQDestination implements Topic public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) { - super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); + super(exchange, AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS), routingKey, true, true, queueName, false); } public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys) { - super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false,bindingKeys); + super(exchange, AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS), routingKey, true, true, queueName, false,bindingKeys); } public AMQTopic(AMQConnection conn, String routingKey) @@ -73,6 +73,10 @@ public class AMQTopic extends AMQDestination implements Topic this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey)); } + public AMQTopic(String exchangeName, String routingKey) + { + this(AMQShortString.valueOf(exchangeName), new AMQShortString(routingKey)); + } public AMQTopic(AMQShortString exchangeName, String routingKey) { @@ -86,7 +90,7 @@ public class AMQTopic extends AMQDestination implements Topic public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { - super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable); + super(exchangeName, AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS), name, true, isAutoDelete, queueName, isDurable); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 98fa6de675..5acaa5c543 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -440,7 +440,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac { if (!(destination instanceof AMQDestination)) { - throw new JMSException("Unsupported destination class: " + throw new InvalidDestinationException("Unsupported destination class: " + ((destination != null) ? destination.getClass() : null)); } @@ -453,7 +453,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } catch(Exception e) { - JMSException ex = new JMSException("Error validating destination"); + JMSException ex = new InvalidDestinationException("Error validating destination"); ex.initCause(e); ex.setLinkedException(e); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index ef92a9281e..b26d67783d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -125,7 +125,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate String subject = null; if (isStrictJMS && messageProps != null && messageProps.getApplicationHeaders() != null) { - System.out.println("%%%% Going to set subject"); subject = (String)messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT); if (subject != null) { @@ -207,7 +206,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } else { - throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be ID: followed by a UUID"); + throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be prefixed with 'ID:'"); } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java index 1395f39b99..ad19b0e620 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -46,7 +46,7 @@ import java.util.concurrent.ConcurrentHashMap; public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate { - private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>(); + private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>(); private static Map<String,ExchangeInfo> _exchangeMap = new ConcurrentHashMap<String, ExchangeInfo>(); /** @@ -55,32 +55,32 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate static { _exchangeTypeToDestinationType.put("", AMQDestination.QUEUE_TYPE); - _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE); - _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS.toString(), AMQDestination.TOPIC_TYPE); - _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(), AMQDestination.TOPIC_TYPE); - _exchangeTypeToDestinationType.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE); - + _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, AMQDestination.QUEUE_TYPE); + _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, AMQDestination.TOPIC_TYPE); + _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, AMQDestination.TOPIC_TYPE); + _exchangeTypeToDestinationType.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, AMQDestination.QUEUE_TYPE); + _exchangeMap.put("", new ExchangeInfo("","",AMQDestination.QUEUE_TYPE)); - - _exchangeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), - new ExchangeInfo(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), - ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(), + + _exchangeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, + new ExchangeInfo(ExchangeDefaults.DIRECT_EXCHANGE_NAME, + ExchangeDefaults.DIRECT_EXCHANGE_CLASS, AMQDestination.QUEUE_TYPE)); - - _exchangeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), - new ExchangeInfo(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), - ExchangeDefaults.TOPIC_EXCHANGE_CLASS.toString(), + + _exchangeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, + new ExchangeInfo(ExchangeDefaults.TOPIC_EXCHANGE_NAME, + ExchangeDefaults.TOPIC_EXCHANGE_CLASS, AMQDestination.TOPIC_TYPE)); - - _exchangeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), - new ExchangeInfo(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), - ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(), + + _exchangeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, + new ExchangeInfo(ExchangeDefaults.FANOUT_EXCHANGE_NAME, + ExchangeDefaults.FANOUT_EXCHANGE_CLASS, AMQDestination.TOPIC_TYPE)); - - _exchangeMap.put(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(), - new ExchangeInfo(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(), - ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(), - AMQDestination.QUEUE_TYPE)); + + _exchangeMap.put(ExchangeDefaults.HEADERS_EXCHANGE_NAME, + new ExchangeInfo(ExchangeDefaults.HEADERS_EXCHANGE_NAME, + ExchangeDefaults.HEADERS_EXCHANGE_CLASS, + AMQDestination.QUEUE_TYPE)); } /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ @@ -115,19 +115,19 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate { AMQDestination dest; ExchangeInfo exchangeInfo = _exchangeMap.get(exchange.asString()); - + if (exchangeInfo == null) { exchangeInfo = new ExchangeInfo(exchange.asString(),"",AMQDestination.UNKNOWN_TYPE); } - + if ("topic".equals(exchangeInfo.getExchangeType())) { dest = new AMQTopic(exchange, routingKey, null); } else if ("direct".equals(exchangeInfo.getExchangeType())) { - dest = new AMQQueue(exchange, routingKey, routingKey); + dest = new AMQQueue(exchange, routingKey, routingKey); } else { @@ -161,7 +161,7 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate { type = AMQDestination.UNKNOWN_TYPE; } - + _exchangeMap.put(exchange, new ExchangeInfo(exchange,newtype,type)); } @@ -226,7 +226,7 @@ class ExchangeInfo private String exchangeName; private String exchangeType; private int destType = AMQDestination.QUEUE_TYPE; - + public ExchangeInfo(String exchangeName, String exchangeType, int destType) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index f17fb9b5f5..8c23ddad5e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -274,11 +274,11 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { if (value instanceof AMQShortString) { - return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, (AMQShortString) value); + return new AMQQueue(AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME), (AMQShortString) value); } else if (value instanceof String) { - return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString((String) value)); + return new AMQQueue(AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME), new AMQShortString((String) value)); } else if (value instanceof BindingURL) { @@ -295,7 +295,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { if (value instanceof AMQShortString) { - return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, (AMQShortString) value); + return new AMQTopic(AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), (AMQShortString) value); } else if (value instanceof String) { @@ -309,7 +309,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } // The Destination has a dual nature. If this was used for a producer the key is used // for the routing key. If it was used for the consumer it becomes the bindingKey - return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,bindings[0],null,bindings); + return new AMQTopic(AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),bindings[0],null,bindings); } else if (value instanceof BindingURL) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java new file mode 100644 index 0000000000..d9caa68ef8 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.amqp_0_91.QueueDeclareOkBodyImpl; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.TestNetworkConnection; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.url.AMQBindingURL; + +public class AMQSession_0_8Test extends QpidTestCase +{ + private AMQConnection _connection; + + public void setUp() throws Exception + { + _connection = new MockAMQConnection("amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'"); + NetworkConnection network = new TestNetworkConnection(); + _connection.getProtocolHandler().setNetworkConnection(network); + } + + public void testQueueNameIsGeneratedOnDeclareQueueWithEmptyQueueName() throws Exception + { + final AMQShortString testQueueName = AMQShortString.valueOf("tmp_127_0_0_1_1_1"); + + _connection.setConnectionListener(new ConnectionListenerSupport() + { + @Override + public void bytesSent(long count) + { + try + { + _connection.getProtocolHandler().methodBodyReceived(1, new QueueDeclareOkBodyImpl(testQueueName, 0, 0)); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + }); + + AMQSession_0_8 session = new AMQSession_0_8(_connection, 1, true, 0, 1, 1); + + AMQBindingURL bindingURL = new AMQBindingURL("topic://amq.topic//?routingkey='testTopic'"); + AMQQueue queue = new AMQQueue(bindingURL); + + assertEquals("Unexpected queue name", AMQShortString.EMPTY_STRING, queue.getAMQQueueName()); + + session.declareQueue(queue, true); + + assertEquals("Unexpected queue name", testQueueName, queue.getAMQQueueName()); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/ConnectionListenerSupport.java b/qpid/java/client/src/test/java/org/apache/qpid/client/ConnectionListenerSupport.java new file mode 100644 index 0000000000..fc66e60bc0 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/ConnectionListenerSupport.java @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.jms.ConnectionListener; + +public class ConnectionListenerSupport implements ConnectionListener +{ + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + + @Override + public boolean preFailover(boolean redirect) + { + return true; + } + + @Override + public boolean preResubscribe() + { + return false; + } + + @Override + public void failoverComplete() + { + } + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java index 5fff6b6b35..3afeb79ac3 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java @@ -124,7 +124,6 @@ public class AMQMessageDelegate_0_10Test extends QpidTestCase for (Enumeration props = delegate.getPropertyNames(); props.hasMoreElements();) { String key = (String)props.nextElement(); - System.out.println("PropName : " + key); if (key.equals("JMS_" + QpidMessageProperties.QPID_SUBJECT)) { propFound = true; diff --git a/qpid/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java b/qpid/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java index ce9e681eaf..2914244103 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java @@ -115,7 +115,7 @@ public class PropertiesFileInitialContextFactoryTest extends QpidTestCase setTestSystemProperty(ClientProperties.DEST_SYNTAX, "ADDR"); setTestSystemProperty(InitialContext.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); - setTestSystemProperty(InitialContext.PROVIDER_URL, "file://" + f.getCanonicalPath()); + setTestSystemProperty(InitialContext.PROVIDER_URL, f.toURI().toURL().toString()); InitialContext context = new InitialContext(); Destination dest = (Destination) context.lookup("topicExchange"); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java index e13180424b..9c9664931a 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.RejectBehaviour; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; @@ -138,7 +139,7 @@ public class DestinationURLTest extends TestCase AMQBindingURL dest = new AMQBindingURL(url); - assertTrue(dest.getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)); + assertTrue(dest.getExchangeClass().equals(AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))); assertTrue(dest.getExchangeName().equalsCharSequence("")); assertTrue(dest.getDestinationName().equalsCharSequence("")); assertTrue(dest.getQueueName().equalsCharSequence("IBMPerfQueue1")); @@ -180,18 +181,6 @@ public class DestinationURLTest extends TestCase assertTrue("Failed to throw an URISyntaxException when both the bindingkey and routingkey is specified",exceptionThrown); } - - public void testDestinationWithDurableTopic() throws URISyntaxException - { - - String url = "topic://amq.topic//testTopicD?durable='true'&autodelete='true'&clientid='test'&subscription='testQueueD'"; - - AMQBindingURL dest = new AMQBindingURL(url); - - assertTrue(dest.getExchangeClass().equalsCharSequence("topic")); - assertTrue(dest.getExchangeName().equalsCharSequence("amq.topic")); - assertTrue(dest.getQueueName().equalsCharSequence("test:testQueueD")); - } public void testExchangeOptionsNotPresent() throws URISyntaxException { @@ -374,6 +363,46 @@ public class DestinationURLTest extends TestCase assertNull("Reject behaviour is unexpected", dest.getRejectBehaviour()); } + public void testBindingUrlWithoutDestinationAndQueueName() throws Exception + { + AMQBindingURL bindingURL = new AMQBindingURL("topic://amq.topic//?routingkey='testTopic'"); + assertEquals("Unexpected queue name", AMQShortString.EMPTY_STRING, bindingURL.getQueueName()); + assertEquals("Unexpected destination", AMQShortString.EMPTY_STRING, bindingURL.getDestinationName()); + assertEquals("Unexpected routing key", AMQShortString.valueOf("testTopic"), bindingURL.getRoutingKey()); + } + + public void testBindingUrlWithoutDestinationAndMissedQueueName() throws Exception + { + AMQBindingURL bindingURL = new AMQBindingURL("topic://amq.topic/?routingkey='testTopic'"); + assertEquals("Unexpected queue name", AMQShortString.EMPTY_STRING, bindingURL.getQueueName()); + assertEquals("Unexpected destination", AMQShortString.EMPTY_STRING, bindingURL.getDestinationName()); + assertEquals("Unexpected routing key", AMQShortString.valueOf("testTopic"), bindingURL.getRoutingKey()); + } + + public void testBindingUrlWithoutQueueName() throws Exception + { + AMQBindingURL bindingURL = new AMQBindingURL("topic://amq.topic/destination/?routingkey='testTopic'"); + assertEquals("Unexpected queue name", AMQShortString.EMPTY_STRING, bindingURL.getQueueName()); + assertEquals("Unexpected destination", AMQShortString.valueOf("destination"), bindingURL.getDestinationName()); + assertEquals("Unexpected routing key", AMQShortString.valueOf("testTopic"), bindingURL.getRoutingKey()); + } + + public void testBindingUrlWithQueueNameWithoutDestination() throws Exception + { + AMQBindingURL bindingURL = new AMQBindingURL("topic://amq.topic//queueName?routingkey='testTopic'"); + assertEquals("Unexpected queue name", AMQShortString.valueOf("queueName"), bindingURL.getQueueName()); + assertEquals("Unexpected destination", AMQShortString.EMPTY_STRING, bindingURL.getDestinationName()); + assertEquals("Unexpected routing key", AMQShortString.valueOf("testTopic"), bindingURL.getRoutingKey()); + } + + public void testBindingUrlWithQueueNameAndDestination() throws Exception + { + AMQBindingURL bindingURL = new AMQBindingURL("topic://amq.topic/destination/queueName?routingkey='testTopic'"); + assertEquals("Unexpected queue name", AMQShortString.valueOf("queueName"), bindingURL.getQueueName()); + assertEquals("Unexpected destination", AMQShortString.valueOf("destination"), bindingURL.getDestinationName()); + assertEquals("Unexpected routing key", AMQShortString.valueOf("testTopic"), bindingURL.getRoutingKey()); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(DestinationURLTest.class); |
