diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /java/client/src | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
51 files changed, 3101 insertions, 964 deletions
diff --git a/java/client/src/main/java/client.bnd b/java/client/src/main/java/client.bnd index 4b9b191520..0a47b30c72 100755 --- a/java/client/src/main/java/client.bnd +++ b/java/client/src/main/java/client.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.19.0 +ver: 0.21.0 Bundle-SymbolicName: qpid-client Bundle-Version: ${ver} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 89273599b9..597096db57 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -344,7 +344,14 @@ public class AMQBrokerDetails implements BrokerDetails optionsURL.append("='"); - optionsURL.append(_options.get(key)); + if (OPTIONS_TRUST_STORE_PASSWORD.equals(key) || OPTIONS_KEY_STORE_PASSWORD.equals(key)) + { + optionsURL.append("********"); + } + else + { + optionsURL.append(_options.get(key)); + } optionsURL.append("'"); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index d80858a7a1..9612417266 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -179,6 +179,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // new amqp-0-10 encoded format. private boolean _useLegacyMapMessageFormat; + // Indicates whether to use the old stream message format or the + // new amqp-0-10 list encoded format. + private boolean _useLegacyStreamMessageFormat; + + // When sending to a Queue destination for the first time, check that the queue is bound + private final boolean _validateQueueOnSend; + //used to track the last failover time for //Address resolution purposes private volatile long _lastFailoverTime = 0; @@ -294,6 +301,30 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT); } + if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT) != null) + { + _useLegacyStreamMessageFormat = Boolean.parseBoolean( + connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT)); + } + else + { + // use the default value set for all connections + _useLegacyStreamMessageFormat = System.getProperty(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT) == null ? + true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT); + } + + if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null) + { + _validateQueueOnSend = Boolean.parseBoolean( + connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)); + } + else + { + _validateQueueOnSend = + Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false")); + } + + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); if (_logger.isDebugEnabled()) { @@ -1080,7 +1111,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _started; } - protected final boolean isConnected() + public final boolean isConnected() { return _connected; } @@ -1425,7 +1456,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _delegate.getProtocolVersion(); } - + public String getBrokerUUID() { if(getProtocolVersion().equals(ProtocolVersion.v0_10)) @@ -1498,6 +1529,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _useLegacyMapMessageFormat; } + public boolean isUseLegacyStreamMessageFormat() + { + return _useLegacyStreamMessageFormat; + } + private void verifyClientID() throws AMQException { if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID)) @@ -1539,4 +1575,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect + localAddress + " to " + remoteAddress); } } + + void setHeartbeatListener(HeartbeatListener listener) + { + _delegate.setHeartbeatListener(listener); + } + + public boolean validateQueueOnSend() + { + return _validateQueueOnSend; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index b6f25a2cef..a8fdaeb65c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -78,4 +78,6 @@ public interface AMQConnectionDelegate * @return true if the feature is supported by the server */ boolean isSupportedServerFeature(final String featureName); + + void setHeartbeatListener(HeartbeatListener listener); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 51e7e4153d..69e79d42a0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -33,6 +33,7 @@ import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; +import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.Session; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; @@ -214,7 +215,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec + "********"); } - ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail); + ConnectionSettings conSettings = retrieveConnectionSettings(brokerDetail); + _qpidConnection.setConnectionDelegate(new ClientConnectionDelegate(conSettings, _conn.getConnectionURL())); _qpidConnection.connect(conSettings); @@ -420,7 +422,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return featureSupported; } - private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail) + @Override + public void setHeartbeatListener(HeartbeatListener listener) + { + ((ClientConnectionDelegate)(_qpidConnection.getConnectionDelegate())).setHeartbeatListener(listener); + } + + private ConnectionSettings retrieveConnectionSettings(BrokerDetails brokerDetail) { ConnectionSettings conSettings = brokerDetail.buildConnectionSettings(); @@ -442,6 +450,24 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail)); + //Check connection-level ssl override setting + String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL); + if(connectionSslOption != null) + { + boolean connUseSsl = Boolean.parseBoolean(connectionSslOption); + boolean brokerlistUseSsl = conSettings.isUseSSL(); + + if( connUseSsl != brokerlistUseSsl) + { + conSettings.setUseSSL(connUseSsl); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl ); + } + } + } + return conSettings; } @@ -464,10 +490,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000; _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>"); } - else + else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null) { heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); } + else + { + heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT); + } return heartbeat; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index e1bf007e83..67d7c2a78c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -40,6 +40,7 @@ import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; +import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.network.NetworkConnection; @@ -90,42 +91,43 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException { + if (_logger.isDebugEnabled()) + { + _logger.debug("Connecting to broker:" + brokerDetail); + } final Set<AMQState> openOrClosedStates = EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); - - StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); - ConnectionSettings settings = brokerDetail.buildConnectionSettings(); settings.setProtocol(brokerDetail.getTransport()); - SSLContext sslContext = null; - if (settings.isUseSSL()) + //Check connection-level ssl override setting + String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL); + if(connectionSslOption != null) { - try - { - sslContext = SSLContextFactory.buildClientContext( - settings.getTrustStorePath(), - settings.getTrustStorePassword(), - settings.getTrustStoreType(), - settings.getTrustManagerFactoryAlgorithm(), - settings.getKeyStorePath(), - settings.getKeyStorePassword(), - settings.getKeyStoreType(), - settings.getKeyManagerFactoryAlgorithm(), - settings.getCertAlias()); - } - catch (GeneralSecurityException e) + boolean connUseSsl = Boolean.parseBoolean(connectionSslOption); + boolean brokerlistUseSsl = settings.isUseSSL(); + + if( connUseSsl != brokerlistUseSsl) { - throw new AMQException("Unable to create SSLContext: " + e.getMessage(), e); + settings.setUseSSL(connUseSsl); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl ); + } } } SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings); OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext); + + NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), + _conn.getProtocolHandler()); _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); + + StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); _conn.getProtocolHandler().getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up @@ -376,4 +378,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate // we just hardcode JMS selectors as supported. return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName); } + + @Override + public void setHeartbeatListener(HeartbeatListener listener) + { + _conn.getProtocolHandler().setHeartbeatListener(listener); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index 8bc815d98e..a2d4b5ee17 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -140,7 +140,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF { try { - ConnectionURL connectionDetails = new AMQConnectionURL(_connectionDetails.toString()); + ConnectionURL connectionDetails = new AMQConnectionURL(_connectionDetails.getURL()); connectionDetails.setUsername(userName); connectionDetails.setPassword(password); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 530186b1f9..f14b6d810b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -52,6 +52,12 @@ public abstract class AMQDestination implements Destination, Referenceable private AMQShortString _exchangeClass; + private boolean _exchangeAutoDelete; + + private boolean _exchangeDurable; + + private boolean _exchangeInternal; + private boolean _isDurable; private boolean _isExclusive; @@ -106,16 +112,6 @@ public abstract class AMQDestination implements Destination, Referenceable _name = name; } - protected Link getTargetLink() - { - return _targetLink; - } - - protected void setTargetLink(Link targetLink) - { - _targetLink = targetLink; - } - // ----- Fields required to support new address syntax ------- public enum DestSyntax { @@ -180,10 +176,9 @@ public abstract class AMQDestination implements Destination, Referenceable private AddressOption _assert = AddressOption.NEVER; private AddressOption _delete = AddressOption.NEVER; - private Node _targetNode; - private Node _sourceNode; - private Link _targetLink; + private Node _node; private Link _link; + // ----- / Fields required to support new address syntax ------- @@ -280,6 +275,9 @@ public abstract class AMQDestination implements Destination, Referenceable { _exchangeName = binding.getExchangeName(); _exchangeClass = binding.getExchangeClass(); + _exchangeDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_DURABLE)); + _exchangeAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE)); + _exchangeInternal = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL)); _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); @@ -358,6 +356,10 @@ public abstract class AMQDestination implements Destination, Referenceable _destSyntax = DestSyntax.BURL; _browseOnly = browseOnly; _rejectBehaviour = null; + _exchangeAutoDelete = false; + _exchangeDurable = false; + _exchangeInternal = false; + if (_logger.isDebugEnabled()) { _logger.debug("Based on " + toString() + " the selected destination syntax is " + _destSyntax); @@ -412,6 +414,21 @@ public abstract class AMQDestination implements Destination, Referenceable return _exchangeClass; } + public boolean isExchangeDurable() + { + return _exchangeDurable; + } + + public boolean isExchangeAutoDelete() + { + return _exchangeAutoDelete; + } + + public boolean isExchangeInternal() + { + return _exchangeInternal; + } + public boolean isTopic() { return ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(_exchangeClass); @@ -579,6 +596,27 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); } + if (_exchangeDurable) + { + sb.append(BindingURL.OPTION_EXCHANGE_DURABLE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + + if (_exchangeAutoDelete) + { + sb.append(BindingURL.OPTION_EXCHANGE_AUTODELETE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + + if (_exchangeInternal) + { + sb.append(BindingURL.OPTION_EXCHANGE_INTERNAL); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + //removeKey the last char '?' if there is no options , ',' if there are. sb.deleteCharAt(sb.length() - 1); url = sb.toString(); @@ -773,24 +811,14 @@ public abstract class AMQDestination implements Destination, Referenceable _delete = option; } - public Node getTargetNode() + public Node getNode() { - return _targetNode; + return _node; } - public void setTargetNode(Node node) + public void setNode(Node node) { - _targetNode = node; - } - - public Node getSourceNode() - { - return _sourceNode; - } - - public void setSourceNode(Node node) - { - _sourceNode = node; + _node = node; } public Link getLink() @@ -851,21 +879,11 @@ public abstract class AMQDestination implements Destination, Referenceable _browseOnly = _addrHelper.isBrowseOnly(); - _addressType = _addrHelper.getTargetNodeType(); - _targetNode = _addrHelper.getTargetNode(_addressType); - _sourceNode = _addrHelper.getSourceNode(_addressType); + _addressType = _addrHelper.getNodeType(); + _node = _addrHelper.getNode(); _link = _addrHelper.getLink(); } - // This method is needed if we didn't know the node type at the beginning. - // Therefore we have to query the broker to figure out the type. - // Once the type is known we look for the necessary properties. - public void rebuildTargetAndSourceNodes(int addressType) - { - _targetNode = _addrHelper.getTargetNode(addressType); - _sourceNode = _addrHelper.getSourceNode(addressType); - } - // ----- / new address syntax ----------- public boolean isBrowseOnly() @@ -900,8 +918,7 @@ public abstract class AMQDestination implements Destination, Referenceable dest.setDelete(_delete); dest.setBrowseOnly(_browseOnly); dest.setAddressType(_addressType); - dest.setTargetNode(_targetNode); - dest.setSourceNode(_sourceNode); + dest.setNode(_node); dest.setLink(_link); dest.setAddressResolved(_addressResolved.get()); return dest; @@ -935,6 +952,4 @@ public abstract class AMQDestination implements Destination, Referenceable { return _rejectBehaviour; } - - } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 1468e90c4e..91a6389214 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client; -import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE; -import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; -import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; -import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +35,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AMQPEncodedMapMessage; +import org.apache.qpid.client.message.AMQPEncodedListMessage; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.JMSBytesMessage; @@ -49,14 +45,13 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; +import org.apache.qpid.jms.ListMessage; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; @@ -122,27 +117,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Immediate message prefetch default. */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - /** - * The period to wait while flow controlled before sending a log message confirming that the session is still - * waiting on flow control being revoked - */ - private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, - DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); - - /** - * The period to wait while flow controlled before declaring a failure - */ - private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE, - DEFAULT_FLOW_CONTROL_WAIT_FAILURE); - private final boolean _delareQueues = - Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); + Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true")); private final boolean _declareExchanges = - Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); + Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true")); private final boolean _useAMQPEncodedMapMessage; + private final boolean _useAMQPEncodedStreamMessage; + /** * Flag indicating to start dispatcher as a daemon thread */ @@ -265,11 +249,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Has failover occured on this session with outstanding actions to commit? */ private boolean _failedOverDirty; - /** Flow control */ - private FlowControlIndicator _flowControl = new FlowControlIndicator(); - - - /** Holds the highest received delivery tag. */ protected AtomicLong getHighestDeliveryTag() { @@ -408,22 +387,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - private static final class FlowControlIndicator - { - private volatile boolean _flowControl = true; - - public synchronized void setFlowControl(boolean flowControl) - { - _flowControl = flowControl; - notify(); - } - - public boolean getFlowControl() - { - return _flowControl; - } - } - /** * Creates a new session on a connection. * @@ -439,6 +402,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { _useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat(); + _useAMQPEncodedStreamMessage = con == null ? false : !con.isUseLegacyStreamMessageFormat(); _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); @@ -649,12 +613,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public abstract void acknowledgeMessage(long deliveryTag, boolean multiple); - public MethodRegistry getMethodRegistry() - { - MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); - return methodRegistry; - } - /** * Binds the named queue, with the specified routing key, to the named exchange. * @@ -1041,12 +999,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { try { - handleAddressBasedDestination(dest,false,noLocal,true); + resolveAddress(dest,false,noLocal); if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) { throw new JMSException("Durable subscribers can only be created for Topics"); } - dest.getSourceNode().setDurable(true); } catch(AMQException e) { @@ -1158,6 +1115,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + public ListMessage createListMessage() throws JMSException + { + checkNotClosed(); + AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; + } + public MapMessage createMapMessage() throws JMSException { checkNotClosed(); @@ -1400,17 +1365,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public StreamMessage createStreamMessage() throws JMSException { - // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived - // calls through connection.closeAllSessions which is also called by the public connection.close() - // with a null cause - // When we are closing the Session due to a protocol session error we simply create a new AMQException - // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. - // We need to determin here if the connection should be - - synchronized (getFailoverMutex()) + checkNotClosed(); + if (_useAMQPEncodedStreamMessage) + { + AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; + } + else { - checkNotClosed(); - JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory()); msg.setAMQSession(this); return msg; @@ -1550,7 +1513,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException { - declareExchange(name, type, getProtocolHandler(), nowait); + declareExchange(name, type, nowait, false, false, false); } abstract public void sync() throws AMQException; @@ -1690,8 +1653,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws AMQException { - AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler, false); + declareExchange(amqd, false); AMQShortString queueName = declareQueue(amqd, false); bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); } @@ -2582,11 +2544,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** * Register to consume from the queue. - * * @param queueName */ - private void consumeFromQueue(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException + private void consumeFromQueue(C consumer, AMQShortString queueName, boolean nowait) throws AMQException, FailoverException { int tagId = _nextTag++; @@ -2603,7 +2563,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - sendConsume(consumer, queueName, protocolHandler, nowait, tagId); + sendConsume(consumer, queueName, nowait, tagId); } catch (AMQException e) { @@ -2614,7 +2574,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendConsume(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException; + boolean nowait, int tag) throws AMQException, FailoverException; private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate) throws JMSException @@ -2648,9 +2608,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract P createMessageProducer(final Destination destination, final Boolean mandatory, final Boolean immediate, final long producerId) throws JMSException; - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException + private void declareExchange(AMQDestination amqd, boolean nowait) throws AMQException { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), nowait, amqd.isExchangeDurable(), + amqd.isExchangeAutoDelete(), amqd.isExchangeInternal()); } /** @@ -2707,33 +2668,29 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * * @param name The name of the exchange to declare. * @param type The type of the exchange to declare. - * @param protocolHandler The protocol handler to process the communication through. * @param nowait - * + * @param durable + * @param autoDelete + * @param internal * @throws AMQException If the exchange cannot be declared for any reason. * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException + final boolean nowait, final boolean durable, + final boolean autoDelete, final boolean internal) throws AMQException { new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendExchangeDeclare(name, type, protocolHandler, nowait); + sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal); return null; } }, _connection).execute(); } - public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; - - - void declareQueuePassive(AMQDestination queue) throws AMQException - { - declareQueue(queue,false,false,true); - } + public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException; /** * Declares a queue for a JMS destination. @@ -2768,31 +2725,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return declareQueue(amqd, noLocal, nowait, false); } - protected AMQShortString declareQueue(final AMQDestination amqd, - final boolean noLocal, final boolean nowait, final boolean passive) - throws AMQException - { - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new FailoverNoopSupport<AMQShortString, AMQException>( - new FailoverProtectedOperation<AMQShortString, AMQException>() - { - public AMQShortString execute() throws AMQException, FailoverException - { - // Generate the queue name if the destination indicates that a client generated name is to be used. - if (amqd.isNameRequired()) - { - amqd.setQueueName(protocolHandler.generateQueueName()); - } - - sendQueueDeclare(amqd, protocolHandler, nowait, passive); - - return amqd.getAMQQueueName(); - } - }, _connection).execute(); - } - - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait, boolean passive) throws AMQException, FailoverException; + protected abstract AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException; /** * Undeclares the specified queue. @@ -2845,21 +2779,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return ++_nextProducerId; } - protected AMQProtocolHandler getProtocolHandler() - { - return _connection.getProtocolHandler(); - } - - public byte getProtocolMajorVersion() - { - return getProtocolHandler().getProtocolMajorVersion(); - } - - public byte getProtocolMinorVersion() - { - return getProtocolHandler().getProtocolMinorVersion(); - } - protected boolean hasMessageListeners() { return _hasMessageListeners; @@ -2918,17 +2837,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { AMQDestination amqd = consumer.getDestination(); - AMQProtocolHandler protocolHandler = getProtocolHandler(); - if (amqd.getDestSyntax() == DestSyntax.ADDR) { - handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait); + resolveAddress(amqd,true,consumer.isNoLocal()); } else { if (_declareExchanges) { - declareExchange(amqd, protocolHandler, nowait); + declareExchange(amqd, nowait); } if (_delareQueues || amqd.isNameRequired()) @@ -2973,7 +2890,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait); + consumeFromQueue(consumer, queueName, nowait); } catch (FailoverException e) { @@ -2981,10 +2898,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - public abstract void handleAddressBasedDestination(AMQDestination dest, + public abstract void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException; + boolean noLocal) throws AMQException; private void registerProducer(long producerId, MessageProducer producer) { @@ -3141,47 +3057,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _ticket = ticket; } - public boolean isFlowBlocked() - { - synchronized (_flowControl) - { - return !_flowControl.getFlowControl(); - } - } - - public void setFlowControl(final boolean active) - { - _flowControl.setFlowControl(active); - if (_logger.isInfoEnabled()) - { - _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); - } - } - - public void checkFlowControl() throws InterruptedException, JMSException - { - long expiryTime = 0L; - synchronized (_flowControl) - { - while (!_flowControl.getFlowControl() && - (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure) - : expiryTime) >= System.currentTimeMillis() ) - { - - _flowControl.wait(_flowControlWaitPeriod); - if (_logger.isInfoEnabled()) - { - _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); - } - } - if(!_flowControl.getFlowControl()) - { - _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); - throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); - } - } + /** + * Tests whether flow to this session is blocked. + * + * @return true if flow is blocked or false otherwise. + */ + public abstract boolean isFlowBlocked(); - } + public abstract void setFlowControl(final boolean active); public interface Dispatchable { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 8a7c6b1a01..8490a724bf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -17,6 +17,11 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.transport.Option.BATCH; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.UNRELIABLE; + import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; @@ -29,8 +34,10 @@ import java.util.Timer; import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; + import javax.jms.Destination; import javax.jms.JMSException; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -44,18 +51,32 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; +import org.apache.qpid.client.messaging.address.Node; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.*; -import static org.apache.qpid.transport.Option.BATCH; -import static org.apache.qpid.transport.Option.NONE; -import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.QueueQueryResult; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.RangeSetFactory; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; import org.slf4j.Logger; @@ -347,15 +368,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { + // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected. List<Binding> bindings = new ArrayList<Binding>(); - bindings.addAll(destination.getSourceNode().getBindings()); - bindings.addAll(destination.getTargetNode().getBindings()); + bindings.addAll(destination.getNode().getBindings()); String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? destination.getAddressName(): "amq.topic"; for (Binding binding: bindings) { + // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. + // The null check below is a way to side step that issue while fixing QPID-4146 + // Note this issue only affects producers. + if (binding.getQueue() == null && queueName == null) + { + continue; + } String queue = binding.getQueue() == null? queueName.asString(): binding.getQueue(); @@ -523,11 +551,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { - - final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, - getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh, - prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose); + getMessageFactoryRegistry(), this, rawSelector, prefetchHigh, prefetchLow, + exclusive, getAcknowledgeMode(), noConsume, autoClose); } /** @@ -558,7 +584,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic rk = routingKey.toString(); } - return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null); + return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null); } public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) @@ -591,10 +617,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * This method is invoked when a consumer is created * Registers the consumer with the broker */ - public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, + public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException - { + { + if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) + { + if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) + { + String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); + + createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); + queueName = consumer.getDestination().getAMQQueueName(); + consumer.setQueuename(queueName); + } + handleLinkCreation(consumer.getDestination()); + } boolean preAcquire = consumer.isPreAcquire(); AMQDestination destination = consumer.getDestination(); @@ -637,11 +675,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic capacity, Option.UNRELIABLE); } - - if (!nowait) - { - sync(); - } + sync(); } /** @@ -653,7 +687,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this, - getProtocolHandler(), producerId, immediate, mandatory); + producerId, immediate, mandatory); } catch (AMQException e) { @@ -673,26 +707,25 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * creates an exchange if it does not already exist */ - public void sendExchangeDeclare(final AMQShortString name, - final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) - throws AMQException, FailoverException + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { - sendExchangeDeclare(name.asString(), type.asString(), null, null, - nowait); + //The 'internal' parameter is ignored on the 0-10 path, the protocol does not support it + sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete); } public void sendExchangeDeclare(final String name, final String type, final String alternateExchange, final Map<String, Object> args, - final boolean nowait) throws AMQException + final boolean nowait, boolean durable, boolean autoDelete) throws AMQException { getQpidSession().exchangeDeclare( name, type, alternateExchange, args, - name.toString().startsWith("amq.") ? Option.PASSIVE - : Option.NONE); + name.toString().startsWith("amq.") ? Option.PASSIVE : Option.NONE, + durable ? Option.DURABLE : Option.NONE, + autoDelete ? Option.AUTO_DELETE : Option.NONE); // We need to sync so that we get notify of an error. if (!nowait) { @@ -717,18 +750,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Declare a queue with the given queueName */ - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait, boolean passive) - throws AMQException, FailoverException - { - // do nothing this is only used by 0_8 - } - - /** - * Declare a queue with the given queueName - */ - public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait, boolean passive) + public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal, + final boolean nowait, boolean passive) throws AMQException { AMQShortString queueName; @@ -759,7 +782,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - QueueNode node = (QueueNode)amqd.getSourceNode(); + // This code is here to ensure address based destination work with the declareQueue public method in AMQSession.java + Node node = amqd.getNode(); Map<String,Object> arguments = new HashMap<String,Object>(); arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs()); if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null) @@ -925,12 +949,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return getCurrentException(); } + @Override protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new FailoverNoopSupport<AMQShortString, AMQException>( new FailoverProtectedOperation<AMQShortString, AMQException>() { @@ -947,7 +970,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive); + return send0_10QueueDeclare(amqd, noLocal, nowait, passive); } }, getAMQConnection()).execute(); } @@ -1072,11 +1095,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } - public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode) + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException { boolean match = true; ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); match = !result.getNotFound(); + Node node = dest.getNode(); if (match) { @@ -1086,16 +1110,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (node.getExchangeType() != null && node.getExchangeType().equals(result.getType())) && (matchProps(result.getArguments(),node.getDeclareArgs())); - } - else if (node.getExchangeType() != null) - { - // even if assert is false, better to verify this - match = node.getExchangeType().equals(result.getType()); - if (!match) - { - _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() + - " actual " + result.getType()); - } } else { @@ -1104,18 +1118,27 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setExchangeClass(new AMQShortString(result.getType())); } } - + + if (assertNode) + { + if (!match) + { + throw new AMQException("Assert failed for address : " + dest +", Result was : " + result); + } + } + return match; } - public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { boolean match = true; try { QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); match = dest.getAddressName().equals(result.getQueue()); - + Node node = dest.getNode(); + if (match && assertNode) { match = (result.getDurable() == node.isDurable()) && @@ -1123,9 +1146,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (result.getExclusive() == node.isExclusive()) && (matchProps(result.getArguments(),node.getDeclareArgs())); } - else if (match) + + if (assertNode) { - // should I use the queried details to update the local data structure. + if (!match) + { + throw new AMQException("Assert failed for address : " + dest +", Result was : " + result); + } } } catch(SessionException e) @@ -1140,7 +1167,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic "Error querying queue",e); } } - return match; } @@ -1179,17 +1205,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ @SuppressWarnings("deprecation") - public void handleAddressBasedDestination(AMQDestination dest, + public void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException + boolean noLocal) throws AMQException { if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) { - if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) - { - createSubscriptionQueue(dest,noLocal); - } + return; } else { @@ -1209,46 +1231,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { case AMQDestination.QUEUE_TYPE: { - if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode)) + if(createNode) { - setLegacyFiledsForQueueType(dest); + setLegacyFieldsForQueueType(dest); + handleQueueNodeCreation(dest,noLocal); break; } - else if(createNode) + else if (isQueueExist(dest,assertNode)) { - setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,null,noLocal,noWait, false); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); + setLegacyFieldsForQueueType(dest); break; - } + } } case AMQDestination.TOPIC_TYPE: { - if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode)) + if(createNode) { setLegacyFiledsForTopicType(dest); verifySubject(dest); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest, noLocal); - } + handleExchangeNodeCreation(dest); break; } - else if(createNode) + else if (isExchangeExist(dest,assertNode)) { setLegacyFiledsForTopicType(dest); verifySubject(dest); - sendExchangeDeclare(dest.getAddressName(), - dest.getExchangeClass().asString(), - dest.getTargetNode().getAlternateExchange(), - dest.getTargetNode().getDeclareArgs(), - false); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest,noLocal); - } break; } } @@ -1287,7 +1295,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic throw new AMQException("Ambiguous address, please specify queue or topic as node type"); } dest.setAddressType(type); - dest.rebuildTargetAndSourceNodes(type); return type; } } @@ -1309,30 +1316,45 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } } - - private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException + + void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException { - QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null - - if (dest.getQueueName() == null) + Link link = dest.getLink(); + String queueName = dest.getQueueName(); + + if (queueName == null) { - if (dest.getLink() != null && dest.getLink().getName() != null) - { - dest.setQueueName(new AMQShortString(dest.getLink().getName())); - } + queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName(); + dest.setQueueName(new AMQShortString(queueName)); + } + + SubscriptionQueue queueProps = link.getSubscriptionQueue(); + Map<String,Object> arguments = queueProps.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + if (link.isDurable() && queueName.startsWith("TempQueue")) + { + throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link."); } - node.setExclusive(true); - node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,null,noLocal,true, false); - getQpidSession().exchangeBind(dest.getQueueName(), - dest.getAddressName(), - dest.getSubject(), - Collections.<String,Object>emptyMap()); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); + + getQpidSession().queueDeclare(queueName, + queueProps.getAlternateExchange(), arguments, + queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + link.isDurable() ? Option.DURABLE : Option.NONE, + queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + Map<String,Object> bindingArguments = new HashMap<String, Object>(); + bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + getQpidSession().exchangeBind(queueName, + dest.getAddressName(), + dest.getSubject(), + bindingArguments); } - - public void setLegacyFiledsForQueueType(AMQDestination dest) + + public void setLegacyFieldsForQueueType(AMQDestination dest) { // legacy support dest.setQueueName(new AMQShortString(dest.getAddressName())); @@ -1345,7 +1367,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { // legacy support dest.setExchangeName(new AMQShortString(dest.getAddressName())); - ExchangeNode node = (ExchangeNode)dest.getTargetNode(); + Node node = dest.getNode(); dest.setExchangeClass(node.getExchangeType() == null? ExchangeDefaults.TOPIC_EXCHANGE_CLASS: new AMQShortString(node.getExchangeType())); @@ -1424,6 +1446,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return _qpidSession.isFlowBlocked(); } + @Override + public void setFlowControl(boolean active) + { + // Supported by 0-8..0-9-1 only + throw new UnsupportedOperationException("Operation not supported by this protocol"); + } + private void cancelTimerTask() { if (flushTask != null) @@ -1432,5 +1461,148 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic flushTask = null; } } -} + private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException + { + Node node = dest.getNode(); + Map<String,Object> arguments = node.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + getQpidSession().queueDeclare(dest.getAddressName(), + node.getAlternateExchange(), arguments, + node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + node.isDurable() ? Option.DURABLE : Option.NONE, + node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + { + Node node = dest.getNode(); + sendExchangeDeclare(dest.getAddressName(), + node.getExchangeType(), + node.getAlternateExchange(), + node.getDeclareArgs(), + false, + node.isDurable(), + node.isAutoDelete()); + + // If bindings are specified without a queue name and is called by the producer, + // the broker will send an exception as expected. + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleLinkCreation(AMQDestination dest) throws AMQException + { + createBindings(dest, dest.getLink().getBindings()); + } + + void createBindings(AMQDestination dest, List<Binding> bindings) + { + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (Binding binding: bindings) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + getQpidSession().exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + getQpidSession().exchangeUnbind(queue, exchange, + binding.getBindingKey()); + } + } + + void deleteSubscriptionQueue(AMQDestination dest) throws AMQException + { + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest, false)) + { + getQpidSession().queueDelete(dest.getQueueName()); + } + } + + void handleNodeDelete(AMQDestination dest) throws AMQException + { + if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) + { + if (isExchangeExist(dest,false)) + { + getQpidSession().exchangeDelete(dest.getAddressName()); + } + } + else + { + if (isQueueExist(dest,false)) + { + getQpidSession().queueDelete(dest.getAddressName()); + } + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 8ab23a240e..3097b33da3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,12 +21,18 @@ package org.apache.qpid.client; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; @@ -58,6 +64,27 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); + public static final String QPID_SYNC_AFTER_CLIENT_ACK = "qpid.sync_after_client.ack"; + + private final boolean _syncAfterClientAck = + Boolean.parseBoolean(System.getProperty(QPID_SYNC_AFTER_CLIENT_ACK, "true")); + + /** + * The period to wait while flow controlled before sending a log message confirming that the session is still + * waiting on flow control being revoked + */ + private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, + DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); + + /** + * The period to wait while flow controlled before declaring a failure + */ + private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE, + DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + + /** Flow control */ + private FlowControlIndicator _flowControl = new FlowControlIndicator(); + /** * Creates a new session on a connection. * @@ -98,8 +125,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return getProtocolHandler().getProtocolVersion(); } - protected void acknowledgeImpl() + protected void acknowledgeImpl() throws JMSException { + boolean syncRequired = false; while (true) { Long tag = getUnacknowledgedMessageTags().poll(); @@ -109,6 +137,19 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } acknowledgeMessage(tag, false); + syncRequired = true; + } + + try + { + if (syncRequired && _syncAfterClientAck) + { + sync(); + } + } + catch (AMQException a) + { + throw new JMSAMQException("Failed to sync after acknowledge", a); } } @@ -359,9 +400,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } - @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, + @Override + public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException { @@ -380,27 +421,29 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if (nowait) { - protocolHandler.writeFrame(jmsConsume); + getProtocolHandler().writeFrame(jmsConsume); } else { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + getProtocolHandler().syncWrite(jmsConsume, BasicConsumeOkBody.class); } } - public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException + @Override + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { + //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path + ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type, name.toString().startsWith("amq."), - false,false,false,false,null); + durable, autoDelete, internal, false, null); AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait, boolean passive) throws AMQException, FailoverException + private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(), @@ -414,7 +457,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe AMQFrame queueDeclare = body.generateFrame(getChannelId()); - protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + } + + @Override + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, + final boolean nowait, final boolean passive) throws AMQException + { + //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path + + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + return new FailoverNoopSupport<AMQShortString, AMQException>( + new FailoverProtectedOperation<AMQShortString, AMQException>() + { + public AMQShortString execute() throws AMQException, FailoverException + { + // Generate the queue name if the destination indicates that a client generated name is to be used. + if (amqd.isNameRequired()) + { + amqd.setQueueName(protocolHandler.generateQueueName()); + } + + sendQueueDeclare(amqd, passive); + + return amqd.getAMQQueueName(); + } + }, getAMQConnection()).execute(); } public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException @@ -440,10 +508,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments, final boolean noConsume, final boolean autoClose) throws JMSException { - - final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, - getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow, + getMessageFactoryRegistry(),this, arguments, prefetchHigh, prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose); } @@ -629,12 +695,11 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } - public void handleAddressBasedDestination(AMQDestination dest, + public void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException + boolean noLocal) throws AMQException { - throw new UnsupportedOperationException("The new addressing based sytanx is " + throw new UnsupportedOperationException("The new addressing based syntax is " + "not supported for AMQP 0-8/0-9 versions"); } @@ -662,14 +727,23 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe queueName == null ? null : new AMQShortString(queueName), bindingKey == null ? null : new AMQShortString(bindingKey)); } - + + private AMQProtocolHandler getProtocolHandler() + { + return getAMQConnection().getProtocolHandler(); + } + + public MethodRegistry getMethodRegistry() + { + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + return methodRegistry; + } public AMQException getLastException() { // if the Connection has closed then we should throw any exception that // has occurred that we were not waiting for - AMQStateManager manager = getAMQConnection().getProtocolHandler() - .getStateManager(); + AMQStateManager manager = getProtocolHandler().getStateManager(); Exception e = manager.getLastException(); if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) @@ -693,6 +767,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } + public boolean isFlowBlocked() + { + synchronized (_flowControl) + { + return !_flowControl.getFlowControl(); + } + } + + public void setFlowControl(final boolean active) + { + _flowControl.setFlowControl(active); + if (_logger.isInfoEnabled()) + { + _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); + } + } + + void checkFlowControl() throws InterruptedException, JMSException + { + long expiryTime = 0L; + synchronized (_flowControl) + { + while (!_flowControl.getFlowControl() && + (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure) + : expiryTime) >= System.currentTimeMillis() ) + { + + _flowControl.wait(_flowControlWaitPeriod); + if (_logger.isInfoEnabled()) + { + _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); + } + } + if(!_flowControl.getFlowControl()) + { + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); + } + } + } + + + public abstract static class DestinationCache<T extends AMQDestination> { private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>(); @@ -740,6 +857,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } + private static final class FlowControlIndicator + { + private volatile boolean _flowControl = true; + + public synchronized void setFlowControl(boolean flowControl) + { + _flowControl = flowControl; + notify(); + } + + public boolean getFlowControl() + { + return _flowControl; + } + } + private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache(); private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index f09ef5e01d..51b6c7e478 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -114,8 +114,8 @@ public class AMQTopic extends AMQDestination implements Topic AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); // link is never null if dest was created using an address string. t.getLink().setName(queueName.asString()); - t.getSourceNode().setAutoDelete(false); - t.getSourceNode().setDurable(true); + t.getLink().getSubscriptionQueue().setAutoDelete(false); + t.getLink().setDurable(true); // The legacy fields are also populated just in case. t.setQueueName(queueName); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0f8b5717d6..b5e008da5a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -31,7 +31,6 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.framing.AMQShortString; @@ -87,8 +86,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa private final AMQSession _session; - private final AMQProtocolHandler _protocolHandler; - /** * We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ @@ -140,9 +137,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, - AMQSession session, AMQProtocolHandler protocolHandler, - FieldTable rawSelector, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException + AMQSession session, FieldTable rawSelector, + int prefetchHigh, int prefetchLow, boolean exclusive, + int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { _channelId = channelId; _connection = connection; @@ -150,7 +147,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _destination = destination; _messageFactory = messageFactory; _session = session; - _protocolHandler = protocolHandler; _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; @@ -597,7 +593,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { sendCancel(); } - cleanupQueue(); } } catch (AMQException e) @@ -635,8 +630,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } abstract void sendCancel() throws AMQException, FailoverException; - - abstract void cleanupQueue() throws AMQException, FailoverException; /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has @@ -1042,10 +1035,4 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { return _messageFactory; } - - protected AMQProtocolHandler getProtocolHandler() - { - return _protocolHandler; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 26bb51b821..ef7b8cc217 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -28,7 +28,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.Session; @@ -82,13 +81,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, - AMQSession<?,?> session, AMQProtocolHandler protocolHandler, - FieldTable rawSelector, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) + AMQSession<?,?> session, FieldTable rawSelector, + int prefetchHigh, int prefetchLow, boolean exclusive, + int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { - super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); + super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector, + prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); @@ -96,6 +95,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _capacity = evaluateCapacity(destination); + // This is due to the Destination carrying the temporary subscription name which is incorrect. if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; @@ -164,6 +164,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM @Override void sendCancel() throws AMQException { _0_10session.getQpidSession().messageCancel(getConsumerTagString()); + postSubscription(); try { _0_10session.getQpidSession().sync(); @@ -500,7 +501,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - void cleanupQueue() throws AMQException, FailoverException + void postSubscription() throws AMQException { AMQDestination dest = this.getDestination(); if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) @@ -508,9 +509,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.RECEIVER ) { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - this.getDestination().getQueueName()); + ((AMQSession_0_10) getSession()).handleNodeDelete(dest); + ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest); } + // Subscription queue is handled as part of linkDelete method. + ((AMQSession_0_10) getSession()).handleLinkDelete(dest); } } @@ -560,4 +563,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return capacity; } -} +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index b00f9dd98a..f733e6bbca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -29,7 +29,6 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_8; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQFrame; @@ -52,12 +51,12 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session, - AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException + FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, + int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session, - protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive, - acknowledgeMode, browseOnly, autoClose); + rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, + browseOnly, autoClose); final FieldTable consumerArguments = getArguments(); if (isAutoClose()) { @@ -93,13 +92,19 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe } } + @Override + public AMQSession_0_8 getSession() + { + return (AMQSession_0_8) super.getSession(); + } + void sendCancel() throws AMQException, FailoverException { BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false); final AMQFrame cancelFrame = body.generateFrame(getChannelId()); - getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class); + getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class); if (_logger.isDebugEnabled()) { @@ -122,11 +127,6 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe return receive(); } - void cleanupQueue() throws AMQException, FailoverException - { - - } - public RejectBehaviour getRejectBehaviour() { return _rejectBehaviour; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 9b3b2ce0e9..98fa6de675 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.client; -import java.io.UnsupportedEncodingException; import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; @@ -36,15 +35,15 @@ import javax.jms.Topic; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { + + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; private final Logger _logger ; @@ -71,18 +70,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private AMQDestination _destination; /** - * Default encoding used for messages produced by this producer. - */ - private String _encoding; - - /** - * Default encoding used for message produced by this producer. - */ - private String _mimeType; - - private AMQProtocolHandler _protocolHandler; - - /** * True if this producer was created from a transacted session */ private boolean _transacted; @@ -135,14 +122,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, - Boolean immediate, Boolean mandatory) throws AMQException + AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { _logger = logger; _connection = connection; _destination = destination; _transacted = transacted; - _protocolHandler = protocolHandler; _channelId = channelId; _session = session; _producerId = producerId; @@ -163,6 +148,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac setPublishMode(); } + protected AMQConnection getConnection() + { + return _connection; + } + void setPublishMode() { // Publish mode could be configured at destination level as well. @@ -303,7 +293,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac checkPreConditions(); checkInitialDestination(); - synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); @@ -467,7 +456,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac JMSException ex = new JMSException("Error validating destination"); ex.initCause(e); ex.setLinkedException(e); - + throw ex; } amqDestination.setExchangeExistsChecked(true); @@ -558,19 +547,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - public void setMimeType(String mimeType) throws JMSException - { - checkNotClosed(); - _mimeType = mimeType; - } - - public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException - { - checkNotClosed(); - _encoding = encoding; - } - - private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException + private void checkPreConditions() throws JMSException { checkNotClosed(); @@ -584,15 +561,16 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - private void checkInitialDestination() + private void checkInitialDestination() throws JMSException { if (_destination == null) { throw new UnsupportedOperationException("Destination is null"); } + checkValidQueue(); } - private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException + private void checkDestination(Destination suppliedDestination) throws JMSException { if ((_destination != null) && (suppliedDestination != null)) { @@ -600,6 +578,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac "This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } + if(suppliedDestination instanceof AMQQueue) + { + AMQQueue destination = (AMQQueue) suppliedDestination; + checkValidQueue(destination); + } if (suppliedDestination == null) { throw new InvalidDestinationException("Supplied Destination was invalid"); @@ -607,6 +590,43 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } + private void checkValidQueue() throws JMSException + { + if(_destination instanceof AMQQueue) + { + checkValidQueue((AMQQueue) _destination); + } + } + + private void checkValidQueue(AMQQueue destination) throws JMSException + { + if (!destination.isCheckedForQueueBinding() && validateQueueOnSend()) + { + if (getSession().isStrictAMQP()) + { + getLogger().warn("AMQP does not support destination validation before publish"); + destination.setCheckedForQueueBinding(true); + } + else + { + if (isBound(destination)) + { + destination.setCheckedForQueueBinding(true); + } + else + { + throw new InvalidDestinationException("Queue: " + destination.getQueueName() + + " is not a valid destination (no binding on server)"); + } + } + } + } + + private boolean validateQueueOnSend() + { + return _connection.validateQueueOnSend(); + } + /** * The session used to create this producer */ @@ -645,16 +665,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _destination = destination; } - protected AMQProtocolHandler getProtocolHandler() - { - return _protocolHandler; - } - - protected void setProtocolHandler(AMQProtocolHandler protocolHandler) - { - _protocolHandler = protocolHandler; - } - protected int getChannelId() { return _channelId; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index a3a1e9c28b..f717ca4655 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -27,7 +27,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.messaging.address.Link.Reliability; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; @@ -60,10 +59,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer private byte[] userIDBytes; BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, - Boolean immediate, Boolean mandatory) throws AMQException + AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { - super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); + super(_logger, connection, destination, transacted, channelId, session, producerId, immediate, mandatory); userIDBytes = Strings.toUTF8(getUserID()); } @@ -79,14 +77,18 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer (name, destination.getExchangeClass().toString(), null, null, - name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); + name.startsWith("amq.") ? Option.PASSIVE : Option.NONE, + destination.isExchangeDurable() ? Option.DURABLE : Option.NONE, + destination.isExchangeAutoDelete() ? Option.AUTO_DELETE : Option.NONE); } } else { try { - getSession().handleAddressBasedDestination(destination,false,false,false); + getSession().resolveAddress(destination,false,false); + ((AMQSession_0_10)getSession()).handleLinkCreation(destination); + ((AMQSession_0_10)getSession()).sync(); } catch(Exception e) { @@ -251,25 +253,35 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer return getSession().isQueueBound(destination); } + // We should have a close and closed method to distinguish between normal close + // and a close due to session or connection error. @Override public void close() throws JMSException { super.close(); AMQDestination dest = getAMQDestination(); - if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + AMQSession_0_10 ssn = (AMQSession_0_10) getSession(); + if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) + try { - try - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - getAMQDestination().getQueueName()); - } - catch(TransportException e) + if (dest.getDelete() == AddressOption.ALWAYS || + dest.getDelete() == AddressOption.SENDER ) { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + ssn.handleNodeDelete(dest); } + ssn.handleLinkDelete(dest); + } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } + catch (AMQException e) + { + JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); + ex.setLinkedException(e); + ex.initCause(e); + throw ex; } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 21ff6c877a..bb270b0878 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -50,29 +50,28 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { - super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); + super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory); } void declareDestination(AMQDestination destination) { - - final MethodRegistry methodRegistry = getSession().getMethodRegistry(); - ExchangeDeclareBody body = + if(getSession().isDeclareExchanges()) + { + final MethodRegistry methodRegistry = getSession().getMethodRegistry(); + ExchangeDeclareBody body = methodRegistry.createExchangeDeclareBody(getSession().getTicket(), destination.getExchangeName(), destination.getExchangeClass(), destination.getExchangeName().toString().startsWith("amq."), - false, - false, - false, + destination.isExchangeDurable(), + destination.isExchangeAutoDelete(), + destination.isExchangeInternal(), true, null); - // Declare the exchange - // Note that the durable and internal arguments are ignored since passive is set to false - - AMQFrame declare = body.generateFrame(getChannelId()); + AMQFrame declare = body.generateFrame(getChannelId()); - getProtocolHandler().writeFrame(declare); + getConnection().getProtocolHandler().writeFrame(declare); + } } void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, @@ -172,7 +171,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer throw jmse; } - getProtocolHandler().writeFrame(compositeFrame); + getConnection().getProtocolHandler().writeFrame(compositeFrame); } /** @@ -234,4 +233,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer return frameCount; } + @Override + public AMQSession_0_8 getSession() + { + return (AMQSession_0_8) super.getSession(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java b/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java new file mode 100644 index 0000000000..32a7cb0b73 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client; + +public interface HeartbeatListener +{ + void heartbeatReceived(); + + void heartbeatSent(); + + static final HeartbeatListener DEFAULT = new HeartbeatListener() + { + public void heartbeatReceived() + { + } + + public void heartbeatSent() + { + } + }; +} diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index 05bd121bbd..e01ec8578d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -88,7 +88,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public void createSession() { - _qpidDtxSession = getQpidConnection().createSession(0); + _qpidDtxSession = getQpidConnection().createSession(0,true); _qpidDtxSession.setSessionListener(this); _qpidDtxSession.dtxSelect(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 2cf7b089eb..f038fc6e4f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client.handler; +import java.nio.ByteBuffer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +35,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> { @@ -91,18 +95,24 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co } finally { + Sender<ByteBuffer> sender = session.getSender(); if (error != null) { session.notifyError(error); - } - - // Close the protocol Session, including any open TCP connections - session.closeProtocolSession(); + } - // Closing the session should not introduce a race condition as this thread will continue to propgate any - // exception in to the exceptionCaught method of the SessionHandler. - // Any sessionClosed event should occur after this. + // Close the open TCP connection + try + { + sender.close(); + } + catch(TransportException e) + { + //Ignore, they are already logged by the Sender and this + //is a connection-close being processed by the IoReceiver + //which will as it closes initiate failover if necessary. + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index a0c3914127..94b835dd1a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -91,6 +91,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate private MessageProperties _messageProps; private DeliveryProperties _deliveryProps; + private String _messageID; protected AMQMessageDelegate_0_10() { @@ -171,8 +172,12 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate public String getJMSMessageID() throws JMSException { - UUID id = _messageProps.getMessageId(); - return id == null ? null : "ID:" + id; + if (_messageID == null && _messageProps.getMessageId() != null) + { + UUID id = _messageProps.getMessageId(); + _messageID = "ID:" + id; + } + return _messageID; } public void setJMSMessageID(String messageId) throws JMSException @@ -185,14 +190,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { if(messageId.startsWith("ID:")) { - try - { - _messageProps.setMessageId(UUID.fromString(messageId.substring(3))); - } - catch(IllegalArgumentException ex) - { - throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be ID: followed by a UUID"); - } + _messageID = messageId; } else { @@ -201,6 +199,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } } + /* Used by the internal implementation */ public void setJMSMessageID(UUID messageId) throws JMSException { if(messageId == null) @@ -344,7 +343,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd); if (type == AMQDestination.QUEUE_TYPE) { - ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd); + ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessage.java new file mode 100644 index 0000000000..1d2cb43322 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessage.java @@ -0,0 +1,935 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client.message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.AMQException; + +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.MessageEOFException; +import java.lang.NumberFormatException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class AMQPEncodedListMessage extends JMSStreamMessage implements + org.apache.qpid.jms.ListMessage, javax.jms.MapMessage +{ + private static final Logger _logger = LoggerFactory + .getLogger(AMQPEncodedListMessage.class); + + public static final String MIME_TYPE = "amqp/list"; + + private List<Object> _list = new ArrayList<Object>(); + + public AMQPEncodedListMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException + { + super(delegateFactory); + currentIndex = 0; + } + + AMQPEncodedListMessage(AMQMessageDelegate delegate, ByteBuffer data) + throws AMQException + { + super(delegate, data); + if (data != null) + { + try + { + populateListFromData(data); + } + catch (JMSException je) + { + throw new AMQException(null, + "Error populating ListMessage from ByteBuffer", je); + } + } + currentIndex = 0; + } + + public String toBodyString() throws JMSException + { + return _list == null ? "" : _list.toString(); + } + + protected String getMimeType() + { + return MIME_TYPE; + } + + /* ListMessage Implementation. */ + public boolean add(Object a) throws JMSException + { + checkWritable(); + checkAllowedValue(a); + try + { + return _list.add(a); + } + catch (Exception e) + { + MessageFormatException ex = new MessageFormatException("Error adding to ListMessage"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + + } + } + + public void add(int index, Object element) throws JMSException + { + checkWritable(); + checkAllowedValue(element); + try + { + _list.add(index, element); + } + catch (Exception e) + { + MessageFormatException ex = new MessageFormatException("Error adding to ListMessage at " + + index); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + + public boolean contains(Object o) throws JMSException + { + try + { + return _list.contains(o); + } + catch (Exception e) + { + JMSException ex = new JMSException("Error when looking up object"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + + public Object get(int index) throws JMSException + { + try + { + return _list.get(index); + } + catch (IndexOutOfBoundsException e) + { + MessageFormatException ex = new MessageFormatException( + "Error getting ListMessage element at " + index); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + + public int indexOf(Object o) + { + return _list.indexOf(o); + } + + public Iterator iterator() + { + return _list.iterator(); + } + + public Object remove(int index) throws JMSException + { + checkWritable(); + try + { + return _list.remove(index); + } + catch (IndexOutOfBoundsException e) + { + MessageFormatException ex = new MessageFormatException( + "Error removing ListMessage element at " + index); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + + public boolean remove(Object o) throws JMSException + { + checkWritable(); + return _list.remove(o); + } + + public Object set(int index, Object element) throws JMSException + { + checkWritable(); + checkAllowedValue(element); + try + { + return _list.set(index, element); + } + catch (Exception e) + { + MessageFormatException ex = new MessageFormatException( + "Error setting ListMessage element at " + index); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + + public int size() + { + return _list.size(); + } + + public Object[] toArray() + { + return _list.toArray(); + } + + /* MapMessage Implementation */ + private boolean isValidIndex(int index) + { + if (index >= 0 && index < size()) + return true; + + return false; + } + + private int getValidIndex(String indexStr) throws JMSException + { + if ((indexStr == null) || indexStr.equals("")) + { + throw new IllegalArgumentException( + "Property name cannot be null, or the empty String."); + } + + int index = 0; + try + { + index = Integer.parseInt(indexStr); + } + catch (NumberFormatException e) + { + JMSException ex = new JMSException("Invalid index string"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + if (isValidIndex(index)) + return index; + + throw new MessageFormatException("Property " + indexStr + + " should be a valid index into the list of size " + size()); + } + + private void setGenericForMap(String propName, Object o) + throws JMSException + { + checkWritable(); + int index = 0; + try + { + index = Integer.parseInt(propName); + } + catch (NumberFormatException e) + { + JMSException ex = new JMSException("The property name should be a valid index"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + + if (isValidIndex(index)) + remove(index); + add(index, o); + } + + public boolean getBoolean(String propName) throws JMSException + { + return getBooleanImpl(getValidIndex(propName)); + } + + public byte getByte(String propName) throws JMSException + { + return getByteImpl(getValidIndex(propName)); + } + + public short getShort(String propName) throws JMSException + { + return getShortImpl(getValidIndex(propName)); + } + + public int getInt(String propName) throws JMSException + { + return getIntImpl(getValidIndex(propName)); + } + + public long getLong(String propName) throws JMSException + { + return getLongImpl(getValidIndex(propName)); + } + + public char getChar(String propName) throws JMSException + { + return getCharImpl(getValidIndex(propName)); + + } + + public float getFloat(String propName) throws JMSException + { + return getFloatImpl(getValidIndex(propName)); + } + + public double getDouble(String propName) throws JMSException + { + return getDoubleImpl(getValidIndex(propName)); + } + + public String getString(String propName) throws JMSException + { + return getStringImpl(getValidIndex(propName)); + } + + public byte[] getBytes(String propName) throws JMSException + { + return getBytesImpl(getValidIndex(propName)); + } + + public Object getObject(String propName) throws JMSException + { + return get(getValidIndex(propName)); + } + + public Enumeration getMapNames() throws JMSException + { + List<String> names = new ArrayList<String>(); + int i = 0; + + while (i < size()) + names.add(Integer.toString(i++)); + + return Collections.enumeration(names); + } + + public void setBoolean(String propName, boolean b) throws JMSException + { + setGenericForMap(propName, b); + } + + public void setByte(String propName, byte b) throws JMSException + { + setGenericForMap(propName, b); + } + + public void setShort(String propName, short i) throws JMSException + { + setGenericForMap(propName, i); + } + + public void setChar(String propName, char c) throws JMSException + { + setGenericForMap(propName, c); + } + + public void setInt(String propName, int i) throws JMSException + { + setGenericForMap(propName, i); + } + + public void setLong(String propName, long l) throws JMSException + { + setGenericForMap(propName, l); + } + + public void setFloat(String propName, float v) throws JMSException + { + setGenericForMap(propName, v); + } + + public void setDouble(String propName, double v) throws JMSException + { + setGenericForMap(propName, v); + } + + public void setString(String propName, String string1) throws JMSException + { + setGenericForMap(propName, string1); + } + + public void setBytes(String propName, byte[] bytes) throws JMSException + { + setGenericForMap(propName, bytes); + } + + public void setBytes(String propName, byte[] bytes, int offset, int length) + throws JMSException + { + if ((offset == 0) && (length == bytes.length)) + { + setBytes(propName, bytes); + } + else + { + byte[] newBytes = new byte[length]; + System.arraycopy(bytes, offset, newBytes, 0, length); + setBytes(propName, newBytes); + } + } + + public void setObject(String propName, Object value) throws JMSException + { + checkAllowedValue(value); + setGenericForMap(propName, value); + } + + public boolean itemExists(String propName) throws JMSException + { + return isValidIndex(Integer.parseInt(propName)); + } + + // StreamMessage methods + + private int currentIndex; + + private static final String MESSAGE_EOF_EXCEPTION = "End of Stream (ListMessage) at index: "; + + private void setGenericForStream(Object o) throws JMSException + { + checkWritable(); + add(o); + currentIndex++; + } + + @Override + public boolean readBoolean() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getBooleanImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public byte readByte() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getByteImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public int readBytes(byte[] value) throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + { + ByteBuffer res = ByteBuffer.wrap(getBytesImpl(currentIndex++)); + res.get(value); + return value.length; + } + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public char readChar() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getCharImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public double readDouble() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getDoubleImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public float readFloat() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getFloatImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public int readInt() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getIntImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public long readLong() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getLongImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public Object readObject() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return get(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public short readShort() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getShortImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public String readString() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getStringImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public void writeBoolean(boolean value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeByte(byte value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeBytes(byte[] value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeBytes(byte[] value, int offset, int length) + throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeChar(char value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeDouble(double value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeFloat(float value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeInt(int value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeLong(long value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeObject(Object value) throws JMSException + { + checkAllowedValue(value); + setGenericForStream(value); + } + + @Override + public void writeShort(short value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeString(String value) throws JMSException + { + setGenericForStream(value); + } + + // Common methods + + private void checkAllowedValue(Object value) throws MessageFormatException + { + if (((value instanceof Boolean) || (value instanceof Byte) + || (value instanceof Short) || (value instanceof Integer) + || (value instanceof Long) || (value instanceof Character) + || (value instanceof Float) || (value instanceof Double) + || (value instanceof String) || (value instanceof byte[]) + || (value instanceof List) || (value instanceof Map) + || (value instanceof UUID) || (value == null)) == false) + { + throw new MessageFormatException("Invalid value " + value + + "of type " + value.getClass().getName() + "."); + } + } + + @Override + public void reset() + { + currentIndex = 0; + setReadable(true); + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + _list.clear(); + currentIndex = 0; + setReadable(false); + } + + private boolean getBooleanImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Boolean) + { + return ((Boolean) value).booleanValue(); + } + else if ((value instanceof String) || (value == null)) + { + try + { + return Boolean.valueOf((String) value); + } + catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + + " cannot be converted to boolean."); + } + + private byte getByteImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Byte) + { + return ((Byte) value).byteValue(); + } + else if ((value instanceof String) || (value == null)) + { + try + { + return Byte.valueOf((String) value).byteValue(); + } catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + " cannot be converted to byte."); + } + + private short getShortImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Short) + { + return ((Short) value).shortValue(); + } + else if (value instanceof Byte) + { + return ((Byte) value).shortValue(); + } + else if ((value instanceof String) || (value == null)) + { + try + { + return Short.valueOf((String) value).shortValue(); + } catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + " cannot be converted to short."); + } + + private int getIntImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Integer) + { + return ((Integer) value).intValue(); + } + + if (value instanceof Short) + { + return ((Short) value).intValue(); + } + + if (value instanceof Byte) + { + return ((Byte) value).intValue(); + } + + if ((value instanceof String) || (value == null)) + { + try + { + return Integer.valueOf((String) value).intValue(); + } + catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + " cannot be converted to int."); + } + + private long getLongImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Long) + { + return ((Long) value).longValue(); + } else if (value instanceof Integer) + { + return ((Integer) value).longValue(); + } + + if (value instanceof Short) + { + return ((Short) value).longValue(); + } + + if (value instanceof Byte) + { + return ((Byte) value).longValue(); + } else if ((value instanceof String) || (value == null)) + { + try + { + return Long.valueOf((String) value).longValue(); + } catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + " cannot be converted to long."); + } + + private char getCharImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Character) + { + return ((Character) value).charValue(); + } else if (value == null) + { + throw new NullPointerException("Property at " + index + + " has null value and therefore cannot " + + "be converted to char."); + } else + { + throw new MessageFormatException("Property at " + index + + " of type " + value.getClass().getName() + + " cannot be converted to a char."); + } + } + + private float getFloatImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Float) + { + return ((Float) value).floatValue(); + } + else if ((value instanceof String) || (value == null)) + { + try + { + return Float.valueOf((String) value).floatValue(); + } + catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + " cannot be converted to float."); + } + + private double getDoubleImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Double) + { + return ((Double) value).doubleValue(); + } + else if (value instanceof Float) + { + return ((Float) value).doubleValue(); + } + else if ((value instanceof String) || (value == null)) + { + try + { + return Double.valueOf((String) value).doubleValue(); + } + catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + + " cannot be converted to double."); + } + + private String getStringImpl(int index) throws JMSException + { + Object value = get(index); + + if ((value instanceof String) || (value == null)) + { + return (String) value; + } else if (value instanceof byte[]) + { + throw new MessageFormatException("Property at " + index + + " of type byte[] " + "cannot be converted to String."); + } else + { + return value.toString(); + } + } + + private byte[] getBytesImpl(int index) throws JMSException + { + Object value = get(index); + + if ((value instanceof byte[]) || (value == null)) + { + return (byte[]) value; + } + else + { + throw new MessageFormatException("Property at " + index + + " of type " + value.getClass().getName() + + " cannot be converted to byte[]."); + } + } + + protected void populateListFromData(ByteBuffer data) throws JMSException + { + if (data != null) + { + data.rewind(); + BBDecoder decoder = new BBDecoder(); + decoder.init(data); + _list = decoder.readList(); + } + else + { + _list.clear(); + } + } + + public ByteBuffer getData() throws JMSException + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeList(_list); + return encoder.segment(); + } + + public void setList(List<Object> l) + { + _list = l; + } + + public List<Object> asList() + { + return _list; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java new file mode 100644 index 0000000000..b503dccb91 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java @@ -0,0 +1,44 @@ +package org.apache.qpid.client.message; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; +import java.nio.ByteBuffer; + +public class AMQPEncodedListMessageFactory extends AbstractJMSMessageFactory +{ + @Override + protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, + ByteBuffer data) throws AMQException + { + return new AMQPEncodedListMessage(delegate,data); + } + + + public AbstractJMSMessage createMessage( + AMQMessageDelegateFactory delegateFactory) throws JMSException + { + return new AMQPEncodedListMessage(delegateFactory); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 509fc9f7f1..c2a919c1c5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -196,7 +196,14 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag if (data != null && data.hasRemaining()) { ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new ByteBufferInputStream(data)); - result = (Serializable) in.readObject(); + try + { + result = (Serializable) in.readObject(); + } + finally + { + in.close(); + } } return result; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index b958d89515..b1af262580 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -44,7 +44,12 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea } + JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws AMQException + { + super(delegateFactory, data!=null); + _typedBytesContentWriter = new TypedBytesContentWriter(); + } JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index fa39b4c93c..4154003b23 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -66,6 +66,7 @@ public class MessageFactoryRegistry mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); mf.registerFactory(AMQPEncodedMapMessage.MIME_TYPE, new AMQPEncodedMapMessageFactory()); + mf.registerFactory(AMQPEncodedListMessage.MIME_TYPE, new AMQPEncodedListMessageFactory()); mf.registerFactory(null, mf._default); return mf; diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 318fe32d36..72fc74e19c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -20,21 +20,20 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Link.Subscription; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; +import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.configuration.Accessor; import org.apache.qpid.configuration.Accessor.MapAccessor; import org.apache.qpid.messaging.Address; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - /** * Utility class for extracting information from the address class */ @@ -68,58 +67,56 @@ public class AddressHelper public static final String ARGUMENTS = "arguments"; public static final String RELIABILITY = "reliability"; - private Address address; - private Accessor addressProps; - private Accessor nodeProps; - private Accessor linkProps; + private Address _address; + private Accessor _addressPropAccess; + private Accessor _nodePropAccess; + private Accessor _linkPropAccess; + private Map _addressPropMap; + private Map _nodePropMap; + private Map _linkPropMap; public AddressHelper(Address address) { - this.address = address; - addressProps = new MapAccessor(address.getOptions()); - Map node_props = address.getOptions() == null + this._address = address; + this._addressPropMap = address.getOptions(); + this._addressPropAccess = new MapAccessor(_addressPropMap); + this._nodePropMap = address.getOptions() == null || address.getOptions().get(NODE) == null ? null : (Map) address.getOptions().get(NODE); - if (node_props != null) + if (_nodePropMap != null) { - nodeProps = new MapAccessor(node_props); + _nodePropAccess = new MapAccessor(_nodePropMap); } - Map link_props = address.getOptions() == null + this._linkPropMap = address.getOptions() == null || address.getOptions().get(LINK) == null ? null : (Map) address.getOptions().get(LINK); - if (link_props != null) + if (_linkPropMap != null) { - linkProps = new MapAccessor(link_props); + _linkPropAccess = new MapAccessor(_linkPropMap); } } public String getCreate() { - return addressProps.getString(CREATE); + return _addressPropAccess.getString(CREATE); } public String getAssert() { - return addressProps.getString(ASSERT); + return _addressPropAccess.getString(ASSERT); } public String getDelete() { - return addressProps.getString(DELETE); - } - - public boolean isNoLocal() - { - Boolean b = nodeProps.getBoolean(NO_LOCAL); - return b == null ? false : b; + return _addressPropAccess.getString(DELETE); } public boolean isBrowseOnly() { - String mode = addressProps.getString(MODE); + String mode = _addressPropAccess.getString(MODE); return mode != null && mode.equals(BROWSE) ? true : false; } @@ -127,7 +124,7 @@ public class AddressHelper public List<Binding> getBindings(Map props) { List<Binding> bindings = new ArrayList<Binding>(); - List<Map> bindingList = (List<Map>) props.get(X_BINDINGS); + List<Map> bindingList = (props == null) ? Collections.EMPTY_LIST : (List<Map>) props.get(X_BINDINGS); if (bindingList != null) { for (Map bindingMap : bindingList) @@ -157,117 +154,70 @@ public class AddressHelper } } - public int getTargetNodeType() throws Exception + public int getNodeType() throws Exception { - if (nodeProps == null || nodeProps.getString(TYPE) == null) + if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null) { // need to query and figure out return AMQDestination.UNKNOWN_TYPE; - } else if (nodeProps.getString(TYPE).equals("queue")) + } + else if (_nodePropAccess.getString(TYPE).equals("queue")) { return AMQDestination.QUEUE_TYPE; - } else if (nodeProps.getString(TYPE).equals("topic")) + } + else if (_nodePropAccess.getString(TYPE).equals("topic")) { return AMQDestination.TOPIC_TYPE; - } else + } + else { throw new Exception("unkown exchange type"); } } - public Node getTargetNode(int addressType) + public Node getNode() { - // target node here is the default exchange - if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE) - { - return new ExchangeNode(); - } else if (addressType == AMQDestination.TOPIC_TYPE) - { - Map node = (Map) address.getOptions().get(NODE); - return createExchangeNode(node); - } else + Node node = new Node(_address.getName()); + if (_nodePropAccess != null) { - // don't know yet - return null; - } - } - - private Node createExchangeNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - ExchangeNode node = new ExchangeNode(); - node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap - .getString(TYPE)); - fillInCommonNodeArgs(node, parent, argsMap); - return node; - } + Map xDeclareMap = getDeclareArgs(_nodePropMap); + MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); - private Node createQueueNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - QueueNode node = new QueueNode(); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false - : argsMap.getBoolean(EXCLUSIVE)); - fillInCommonNodeArgs(node, parent, argsMap); - - return node; - } - - private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap) - { - node.setDurable(getDurability(parent)); - node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false - : argsMap.getBoolean(AUTO_DELETE)); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setBindings(getBindings(parent)); - if (getDeclareArgs(parent).containsKey(ARGUMENTS)) - { - node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS)); + node.setDurable(getBooleanProperty(_nodePropAccess,DURABLE,false)); + node.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,false)); + node.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,false)); + node.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); + if (xDeclareMapAccessor.getString(TYPE) != null) + { + node.setExchangeType(xDeclareMapAccessor.getString(TYPE)); + } + node.setBindings(getBindings(_nodePropMap)); + if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS)) + { + node.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS)); + } } - } - - private boolean getDurability(Map map) - { - Accessor access = new MapAccessor(map); - Boolean result = access.getBoolean(DURABLE); - return (result == null) ? false : result.booleanValue(); + return node; } - /** - * if the type == queue x-declare args from the node props is used. if the - * type == exchange x-declare args from the link props is used else just - * create a default temp queue. - */ - public Node getSourceNode(int addressType) + // This should really be in the Accessor interface + private boolean getBooleanProperty(Accessor access, String propName, boolean defaultValue) { - if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) - { - return createQueueNode((Map) address.getOptions().get(NODE)); - } - if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null) - { - return createQueueNode((Map) address.getOptions().get(LINK)); - } else - { - // need to query the info - return new QueueNode(); - } + Boolean result = access.getBoolean(propName); + return (result == null) ? defaultValue : result.booleanValue(); } public Link getLink() throws Exception { Link link = new Link(); link.setSubscription(new Subscription()); - if (linkProps != null) + link.setSubscriptionQueue(new SubscriptionQueue()); + if (_linkPropAccess != null) { - link.setDurable(linkProps.getBoolean(DURABLE) == null ? false - : linkProps.getBoolean(DURABLE)); - link.setName(linkProps.getString(NAME)); + link.setDurable(getBooleanProperty(_linkPropAccess,DURABLE,false)); + link.setName(_linkPropAccess.getString(NAME)); - String reliability = linkProps.getString(RELIABILITY); + String reliability = _linkPropAccess.getString(RELIABILITY); if ( reliability != null) { if (reliability.equalsIgnoreCase("unreliable")) @@ -283,13 +233,12 @@ public class AddressHelper throw new Exception("The reliability mode '" + reliability + "' is not yet supported"); } - } - if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) + if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) { MapAccessor capacityProps = new MapAccessor( - (Map) ((Map) address.getOptions().get(LINK)) + (Map) ((Map) _address.getOptions().get(LINK)) .get(CAPACITY)); link .setConsumerCapacity(capacityProps @@ -302,17 +251,19 @@ public class AddressHelper } else { - int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps + int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess .getInt(CAPACITY); link.setConsumerCapacity(cap); link.setProducerCapacity(cap); } - link.setFilter(linkProps.getString(FILTER)); + link.setFilter(_linkPropAccess.getString(FILTER)); // so far filter type not used - if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) + Map linkMap = (Map) _address.getOptions().get(LINK); + + if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE)) { - Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); + Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE); if (x_subscribe.containsKey(ARGUMENTS)) { @@ -324,6 +275,18 @@ public class AddressHelper link.getSubscription().setExclusive(exclusive); } + + link.setBindings(getBindings(linkMap)); + Map xDeclareMap = getDeclareArgs(linkMap); + SubscriptionQueue queue = link.getSubscriptionQueue(); + if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS)) + { + MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); + queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true)); + queue.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true)); + queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); + queue.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS)); + } } return link; diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 41f6725c8f..40a84ebd02 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.qpid.client.AMQDestination.Binding; + public class Link { public enum FilterType { SQL92, XQUERY, SUBJECT } @@ -36,10 +41,11 @@ public class Link private boolean _isDurable; private int _consumerCapacity = 0; private int _producerCapacity = 0; - private Node node; private Subscription subscription; private Reliability reliability = Reliability.AT_LEAST_ONCE; - + private List<Binding> _bindings = new ArrayList<Binding>(); + private SubscriptionQueue _subscriptionQueue; + public Reliability getReliability() { return reliability; @@ -50,21 +56,11 @@ public class Link this.reliability = reliability; } - public Node getNode() - { - return node; - } - - public void setNode(Node node) - { - this.node = node; - } - public boolean isDurable() { return _isDurable; } - + public void setDurable(boolean durable) { _isDurable = durable; @@ -139,6 +135,74 @@ public class Link { this.subscription = subscription; } + + public List<Binding> getBindings() + { + return _bindings; + } + + public void setBindings(List<Binding> bindings) + { + _bindings = bindings; + } + + public SubscriptionQueue getSubscriptionQueue() + { + return _subscriptionQueue; + } + + public void setSubscriptionQueue(SubscriptionQueue subscriptionQueue) + { + this._subscriptionQueue = subscriptionQueue; + } + + public static class SubscriptionQueue + { + private Map<String,Object> _declareArgs = new HashMap<String,Object>(); + private boolean _isAutoDelete = true; + private boolean _isExclusive = true; + private String _alternateExchange; + + public Map<String,Object> getDeclareArgs() + { + return _declareArgs; + } + + public void setDeclareArgs(Map<String,Object> options) + { + _declareArgs = options; + } + + public boolean isAutoDelete() + { + return _isAutoDelete; + } + + public void setAutoDelete(boolean autoDelete) + { + _isAutoDelete = autoDelete; + } + + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + + public String getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(String altExchange) + { + _alternateExchange = altExchange; + } + } public static class Subscription { diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java index 0da0327885..005f98f344 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -26,19 +26,33 @@ import org.apache.qpid.client.AMQDestination.Binding; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -public abstract class Node +public class Node { private int _nodeType = AMQDestination.UNKNOWN_TYPE; + private String _name; private boolean _isDurable; private boolean _isAutoDelete; + private boolean _isExclusive; private String _alternateExchange; + private String _exchangeType = "topic"; // used when node is an exchange instead of a queue. private List<Binding> _bindings = new ArrayList<Binding>(); - private Map<String,Object> _declareArgs = Collections.emptyMap(); + private Map<String,Object> _declareArgs = new HashMap<String,Object>(); - protected Node(int nodeType) + protected Node(String name) + { + _name = name; + } + + public String getName() + { + return _name; + } + + public void setNodeType(int nodeType) { _nodeType = nodeType; } @@ -58,6 +72,16 @@ public abstract class Node _isDurable = durable; } + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + public boolean isAutoDelete() { return _isAutoDelete; @@ -100,56 +124,15 @@ public abstract class Node public void setDeclareArgs(Map<String,Object> options) { _declareArgs = options; - } - - public static class QueueNode extends Node - { - private boolean _isExclusive; - private QpidQueueOptions _queueOptions = new QpidQueueOptions(); - - public QueueNode() - { - super(AMQDestination.QUEUE_TYPE); - } - - public boolean isExclusive() - { - return _isExclusive; - } - - public void setExclusive(boolean exclusive) - { - _isExclusive = exclusive; - } } - - public static class ExchangeNode extends Node - { - private QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); - private String _exchangeType; - - public ExchangeNode() - { - super(AMQDestination.TOPIC_TYPE); - } - - public String getExchangeType() - { - return _exchangeType; - } - - public void setExchangeType(String exchangeType) - { - _exchangeType = exchangeType; - } - + + public void setExchangeType(String type) + { + _exchangeType = type; } - - public static class UnknownNodeType extends Node + + public String getExchangeType() { - public UnknownNodeType() - { - super(AMQDestination.UNKNOWN_TYPE); - } + return _exchangeType; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index b314453e31..816caac824 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.protocol; +import org.apache.qpid.client.HeartbeatListener; import org.apache.qpid.util.BytesDataOutput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import java.io.IOException; @@ -177,6 +179,9 @@ public class AMQProtocolHandler implements ProtocolEngine private NetworkConnection _network; private Sender<ByteBuffer> _sender; + private long _lastReadTime = System.currentTimeMillis(); + private long _lastWriteTime = System.currentTimeMillis(); + private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -210,48 +215,67 @@ public class AMQProtocolHandler implements ProtocolEngine } else { - _logger.debug("Session closed called with failover state currently " + _failoverState); - - // reconnetablility was introduced here so as not to disturb the client as they have made their intentions - // known through the policy settings. - - if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) + // Use local variable to keep flag whether fail-over allowed or not, + // in order to execute AMQConnection#exceptionRecievedout out of synchronization block, + // otherwise it might deadlock with failover mutex + boolean failoverNotAllowed = false; + synchronized (this) { - _logger.debug("FAILOVER STARTING"); - if (_failoverState == FailoverState.NOT_STARTED) - { - _failoverState = FailoverState.IN_PROGRESS; - startFailoverThread(); - } - else - { - _logger.debug("Not starting failover as state currently " + _failoverState); - } - } - else - { - _logger.debug("Failover not allowed by policy."); // or already in progress? - if (_logger.isDebugEnabled()) { - _logger.debug(_connection.getFailoverPolicy().toString()); + _logger.debug("Session closed called with failover state " + _failoverState); } - if (_failoverState != FailoverState.IN_PROGRESS) + // reconnetablility was introduced here so as not to disturb the client as they have made their intentions + // known through the policy settings. + if (_failoverState == FailoverState.NOT_STARTED) { - _logger.debug("sessionClose() not allowed to failover"); - _connection.exceptionReceived(new AMQDisconnectedException( - "Server closed connection and reconnection " + "not permitted.", - _stateManager.getLastException())); + // close the sender + try + { + _sender.close(); + } + catch (Exception e) + { + _logger.warn("Exception occured on closing the sender", e); + } + if (_connection.failoverAllowed()) + { + _failoverState = FailoverState.IN_PROGRESS; + + _logger.debug("FAILOVER STARTING"); + startFailoverThread(); + } + else if (_connection.isConnected()) + { + failoverNotAllowed = true; + if (_logger.isDebugEnabled()) + { + _logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy()); + } + } + else + { + _logger.debug("We are in process of establishing the initial connection"); + } } else { - _logger.debug("sessionClose() failover in progress"); + _logger.debug("Not starting the failover thread as state currently " + _failoverState); } } + + if (failoverNotAllowed) + { + _connection.exceptionReceived(new AMQDisconnectedException( + "Server closed connection and reconnection not permitted.", _stateManager.getLastException())); + } } - _logger.debug("Protocol Session [" + this + "] closed"); + if (_logger.isDebugEnabled()) + { + _logger.debug("Protocol Session [" + this + "] closed"); + } } /** See {@link FailoverHandler} to see rationale for separate thread. */ @@ -280,7 +304,6 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("Protocol Session [" + this + "] idle: reader"); // failover: - HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); _network.close(); } @@ -289,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("Protocol Session [" + this + "] idle: reader"); writeFrame(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); + _heartbeatListener.heartbeatSent(); } /** @@ -297,14 +320,29 @@ public class AMQProtocolHandler implements ProtocolEngine */ public void exception(Throwable cause) { - if (_failoverState == FailoverState.NOT_STARTED) + boolean causeIsAConnectionProblem = + cause instanceof AMQConnectionClosedException || + cause instanceof IOException || + cause instanceof TransportException; + + if (causeIsAConnectionProblem) { - if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) + //ensure the IoSender and IoReceiver are closed + try { - _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attempt failover _network.close(); - closed(); + } + catch (Exception e) + { + //ignore + } + } + FailoverState state = getFailoverState(); + if (state == FailoverState.NOT_STARTED) + { + if (causeIsAConnectionProblem) + { + _logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause); } else { @@ -319,7 +357,7 @@ public class AMQProtocolHandler implements ProtocolEngine } // we reach this point if failover was attempted and failed therefore we need to let the calling app // know since we cannot recover the situation - else if (_failoverState == FailoverState.FAILED) + else if (state == FailoverState.FAILED) { _logger.error("Exception caught by protocol handler: " + cause, cause); @@ -329,6 +367,10 @@ public class AMQProtocolHandler implements ProtocolEngine propagateExceptionToAllWaiters(amqe); _connection.exceptionReceived(cause); } + else + { + _logger.warn("Exception caught by protocol handler: " + cause, cause); + } } /** @@ -403,6 +445,7 @@ public class AMQProtocolHandler implements ProtocolEngine public void received(ByteBuffer msg) { _readBytes += msg.remaining(); + _lastReadTime = System.currentTimeMillis(); try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); @@ -431,8 +474,6 @@ public class AMQProtocolHandler implements ProtocolEngine final AMQBody bodyFrame = frame.getBodyFrame(); - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - bodyFrame.handle(frame.getChannel(), _protocolSession); _connection.bytesReceived(_readBytes); @@ -521,6 +562,7 @@ public class AMQProtocolHandler implements ProtocolEngine public synchronized void writeFrame(AMQDataBlock frame, boolean flush) { final ByteBuffer buf = asByteBuffer(frame); + _lastWriteTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); _sender.send(buf); if(flush) @@ -792,14 +834,14 @@ public class AMQProtocolHandler implements ProtocolEngine return _protocolSession; } - FailoverState getFailoverState() + synchronized FailoverState getFailoverState() { return _failoverState; } - public void setFailoverState(FailoverState failoverState) + public synchronized void setFailoverState(FailoverState failoverState) { - _failoverState = failoverState; + _failoverState= failoverState; } public byte getProtocolMajorVersion() @@ -843,6 +885,23 @@ public class AMQProtocolHandler implements ProtocolEngine _sender = sender; } + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + + protected Sender<ByteBuffer> getSender() + { + return _sender; + } + /** @param delay delay in seconds (not ms) */ void initHeartbeats(int delay) { @@ -850,7 +909,6 @@ public class AMQProtocolHandler implements ProtocolEngine { _network.setMaxWriteIdle(delay); _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } @@ -865,5 +923,13 @@ public class AMQProtocolHandler implements ProtocolEngine } + public void setHeartbeatListener(HeartbeatListener listener) + { + _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener; + } + public void heartbeatBodyReceived() + { + _heartbeatListener.heartbeatReceived(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index af57fd98fc..aed10cf15f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -48,6 +48,8 @@ import org.apache.qpid.transport.TransportException; import javax.jms.JMSException; import javax.security.sasl.SaslClient; + +import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -265,7 +267,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException { - + _protocolHandler.heartbeatBodyReceived(); } /** @@ -372,6 +374,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } + public Sender<ByteBuffer> getSender() + { + return _protocolHandler.getSender(); + } + public void failover(String host, int port) { _protocolHandler.failover(host, port); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java b/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java deleted file mode 100644 index d387a8ba93..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -class HeartbeatDiagnostics -{ - private static final Diagnostics _impl = init(); - - private HeartbeatDiagnostics() - { - } - - private static Diagnostics init() - { - return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off(); - } - - static void sent() - { - _impl.sent(); - } - - static void timeout() - { - _impl.timeout(); - } - - static void received(boolean heartbeat) - { - _impl.received(heartbeat); - } - - static void init(int delay, int timeout) - { - _impl.init(delay, timeout); - } - - private static interface Diagnostics - { - void sent(); - void timeout(); - void received(boolean heartbeat); - void init(int delay, int timeout); - } - - private static class On implements Diagnostics - { - private final String[] messages = new String[50]; - private int i; - - private void save(String msg) - { - messages[i++] = msg; - if(i >= messages.length){ - i = 0;//i.e. a circular buffer - } - } - - public void sent() - { - save(System.currentTimeMillis() + ": sent heartbeat"); - } - - public void timeout() - { - for(int i = 0; i < messages.length; i++) - { - if(messages[i] != null) - { - System.out.println(messages[i]); - } - } - System.out.println(System.currentTimeMillis() + ": timed out"); - } - - public void received(boolean heartbeat) - { - save(System.currentTimeMillis() + ": received " + (heartbeat ? "heartbeat" : "data")); - } - - public void init(int delay, int timeout) - { - System.out.println(System.currentTimeMillis() + ": initialised delay=" + delay + ", timeout=" + timeout); - } - } - - private static class Off implements Diagnostics - { - public void sent() - { - - } - public void timeout() - { - - } - public void received(boolean heartbeat) - { - - } - - public void init(int delay, int timeout) - { - - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java b/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java index 9198903408..b43229292f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java +++ b/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java @@ -28,8 +28,10 @@ import org.apache.qpid.util.FileUtils; import javax.security.sasl.SaslClientFactory; import java.io.IOException; import java.io.InputStream; +import java.security.Provider; import java.security.Security; import java.util.Enumeration; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.TreeMap; @@ -67,10 +69,10 @@ public class DynamicSaslRegistrar } /** Reads the properties file, and creates a dynamic security provider to register the SASL implementations with. */ - public static void registerSaslProviders() + public static ProviderRegistrationResult registerSaslProviders() { _logger.debug("public static void registerSaslProviders(): called"); - + ProviderRegistrationResult result = ProviderRegistrationResult.FAILED; // Open the SASL properties file, using the default name is one is not specified. String filename = System.getProperty(FILE_PROPERTY); InputStream is = @@ -89,22 +91,45 @@ public class DynamicSaslRegistrar if (factories.size() > 0) { // Ensure we are used before the defaults - if (Security.insertProviderAt(new JCAProvider(factories), 1) == -1) + JCAProvider qpidProvider = new JCAProvider(factories); + if (Security.insertProviderAt(qpidProvider, 1) == -1) { - _logger.error("Unable to load custom SASL providers."); + Provider registeredProvider = findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME); + if (registeredProvider == null) + { + result = ProviderRegistrationResult.FAILED; + _logger.error("Unable to load custom SASL providers."); + } + else if (registeredProvider.equals(qpidProvider)) + { + result = ProviderRegistrationResult.EQUAL_ALREADY_REGISTERED; + _logger.debug("Custom SASL provider is already registered with equal properties."); + } + else + { + result = ProviderRegistrationResult.DIFFERENT_ALREADY_REGISTERED; + _logger.warn("Custom SASL provider was already registered with different properties."); + if (_logger.isDebugEnabled()) + { + _logger.debug("Custom SASL provider " + registeredProvider + " properties: " + new HashMap<Object, Object>(registeredProvider)); + } + } } else { + result = ProviderRegistrationResult.SUCCEEDED; _logger.info("Additional SASL providers successfully registered."); } } else { - _logger.warn("No additional SASL providers registered."); + result = ProviderRegistrationResult.NO_SASL_FACTORIES; + _logger.warn("No additional SASL factories found to register."); } } catch (IOException e) { + result = ProviderRegistrationResult.FAILED; _logger.error("Error reading properties: " + e, e); } finally @@ -122,6 +147,22 @@ public class DynamicSaslRegistrar } } } + return result; + } + + static Provider findProvider(String name) + { + Provider[] providers = Security.getProviders(); + Provider registeredProvider = null; + for (Provider provider : providers) + { + if (name.equals(provider.getName())) + { + registeredProvider = provider; + break; + } + } + return registeredProvider; } /** @@ -158,15 +199,24 @@ public class DynamicSaslRegistrar continue; } - _logger.debug("Registering class "+ clazz.getName() +" for mechanism "+mechanism); + _logger.debug("Found class "+ clazz.getName() +" for mechanism "+mechanism); factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz); } catch (Exception ex) { - _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping"); + _logger.error("Error instantiating SaslClientFactory class " + className + " - skipping"); } } return factoriesToRegister; } + + public static enum ProviderRegistrationResult + { + SUCCEEDED, + EQUAL_ALREADY_REGISTERED, + DIFFERENT_ALREADY_REGISTERED, + NO_SASL_FACTORIES, + FAILED; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java b/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java index 4a91f805f6..c9bcaf0d15 100644 --- a/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java +++ b/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java @@ -39,6 +39,11 @@ import java.util.Map; */ public class JCAProvider extends Provider { + static final String QPID_CLIENT_SASL_PROVIDER_NAME = "AMQSASLProvider-Client"; + static final String QPID_CLIENT_SASL_PROVIDER_INFO = "A JCA provider that registers all " + + "AMQ SASL providers that want to be registered"; + static final double QPID_CLIENT_SASL_PROVIDER_VERSION = 1.0; + private static final Logger log = LoggerFactory.getLogger(JCAProvider.class); /** @@ -48,8 +53,7 @@ public class JCAProvider extends Provider */ public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap) { - super("AMQSASLProvider-Client", 1.0, "A JCA provider that registers all " - + "AMQ SASL providers that want to be registered"); + super(QPID_CLIENT_SASL_PROVIDER_NAME, QPID_CLIENT_SASL_PROVIDER_VERSION, QPID_CLIENT_SASL_PROVIDER_INFO); register(providerMap); } @@ -63,7 +67,7 @@ public class JCAProvider extends Provider for (Map.Entry<String, Class<? extends SaslClientFactory>> me : providerMap.entrySet()) { put( "SaslClientFactory."+me.getKey(), me.getValue().getName()); - log.debug("Registered SASL Client factory for " + me.getKey() + " as " + me.getValue().getName()); + log.debug("Recording SASL Client factory for " + me.getKey() + " as " + me.getValue().getName()); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 0b6217ffce..ed75e1f4c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -157,12 +157,15 @@ public class AMQStateManager implements AMQMethodListener if (_waiters.size() == 0) { - _logger.error("No Waiters for error saving as last error:" + error.getMessage()); + _logger.info("No Waiters for error. Saving as last error:" + error.getMessage()); _lastException = error; } for (StateWaiter waiter : _waiters) { - _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage()); + if(_logger.isDebugEnabled()) + { + _logger.debug("Notifying waiter " + waiter + " for error:" + error.getMessage()); + } waiter.error(error); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java index 3c9a6e1500..4789dd0ed7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client.transport; +import org.apache.qpid.client.HeartbeatListener; +import org.apache.qpid.transport.ConnectionHeartbeat; import org.ietf.jgss.GSSContext; import org.ietf.jgss.GSSException; import org.ietf.jgss.GSSManager; @@ -70,6 +72,7 @@ public class ClientConnectionDelegate extends ClientDelegate } private final ConnectionURL _connectionURL; + private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; /** * @param settings @@ -165,4 +168,19 @@ public class ClientConnectionDelegate extends ClientDelegate return null; } + + @Override + public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) + { + // ClientDelegate simply responds to heartbeats with heartbeats + _heartbeatListener.heartbeatReceived(); + super.connectionHeartbeat(conn, hearbeat); + _heartbeatListener.heartbeatSent(); + } + + + public void setHeartbeatListener(HeartbeatListener listener) + { + _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener; + } } diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 8fd6ff6d33..c4fbeb5607 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -20,9 +20,8 @@ */ package org.apache.qpid.jms; -import org.apache.qpid.framing.AMQShortString; - import java.util.List; +import org.apache.qpid.framing.AMQShortString; /** Connection URL format @@ -35,14 +34,22 @@ public interface ConnectionURL public static final String AMQ_PROTOCOL = "amqp"; public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence"; public static final String OPTIONS_MAXPREFETCH = "maxprefetch"; - public static final String OPTIONS_SYNC_ACK = "sync_ack"; + public static final String OPTIONS_SYNC_ACK = "sync_ack"; public static final String OPTIONS_SYNC_PUBLISH = "sync_publish"; public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format"; + public static final String OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT = "use_legacy_stream_msg_format"; public static final String OPTIONS_BROKERLIST = "brokerlist"; public static final String OPTIONS_FAILOVER = "failover"; public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; /** + * This option is used to apply a connection level override of + * the {@value BrokerDetails#OPTIONS_SSL} option values in the + * {@value ConnectionURL#OPTIONS_BROKERLIST}; + */ + public static final String OPTIONS_SSL = "ssl"; + + /** * This option is only applicable for 0-8/0-9/0-9-1 protocols connection * <p> * It tells the client to delegate the requeue/DLQ decision to the @@ -54,9 +61,11 @@ public interface ConnectionURL public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; + public static final String OPTIONS_VERIFY_QUEUE_ON_SEND = "verifyQueueOnSend"; + public static final byte URL_0_8 = 1; public static final byte URL_0_10 = 2; - + String getURL(); String getFailoverMethod(); diff --git a/java/client/src/main/java/org/apache/qpid/jms/ListMessage.java b/java/client/src/main/java/org/apache/qpid/jms/ListMessage.java new file mode 100644 index 0000000000..21dd2a89ee --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/jms/ListMessage.java @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms; + +import javax.jms.JMSException; + +import java.util.Iterator; +import java.util.List; + +public interface ListMessage extends javax.jms.StreamMessage +{ + boolean add(Object e) throws JMSException; + + void add(int index, Object e) throws JMSException; + + boolean contains(Object e) throws JMSException; + + Object get(int index) throws JMSException; + + int indexOf(Object e) throws JMSException; + + Iterator<Object> iterator() throws JMSException; + + Object remove(int index) throws JMSException; + + boolean remove(Object e)throws JMSException; + + Object set(int index, Object e) throws JMSException; + + int size() throws JMSException; + + Object[] toArray() throws JMSException; + + List<Object> asList() throws JMSException; + + void setList(List<Object> l) throws JMSException; +} diff --git a/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java b/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java index bec8b0917d..82c2b88c30 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java @@ -23,25 +23,11 @@ package org.apache.qpid.jms; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; -import java.io.UnsupportedEncodingException; /** */ public interface MessageProducer extends javax.jms.MessageProducer { - /** - * Set the default MIME type for messages produced by this producer. This reduces the overhead of each message. - * @param mimeType - */ - void setMimeType(String mimeType) throws JMSException; - - /** - * Set the default encoding for messages produced by this producer. This reduces the overhead of each message. - * @param encoding the encoding as understood by XXXX how do I specify this?? RG - * @throws UnsupportedEncodingException if the encoding is not understood - */ - void setEncoding(String encoding) throws UnsupportedEncodingException, JMSException; - void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean immediate) throws JMSException; diff --git a/java/client/src/main/java/org/apache/qpid/jms/Session.java b/java/client/src/main/java/org/apache/qpid/jms/Session.java index b4bf2d1d85..4801f87295 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/Session.java +++ b/java/client/src/main/java/org/apache/qpid/jms/Session.java @@ -21,6 +21,7 @@ package org.apache.qpid.jms; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.ListMessage; import javax.jms.Destination; import javax.jms.JMSException; @@ -100,4 +101,6 @@ public interface Session extends TopicSession, QueueSession AMQShortString getDefaultTopicExchangeName(); AMQShortString getTemporaryQueueExchangeName(); + + ListMessage createListMessage() throws JMSException; } diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java index d186a440da..d309251b44 100644 --- a/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java @@ -20,25 +20,60 @@ */ package org.apache.qpid.client; -import junit.framework.TestCase; - -import org.apache.qpid.AMQInvalidArgumentException; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.ExceptionListener; import javax.jms.JMSException; -import java.util.concurrent.atomic.AtomicReference; -public class AMQConnectionUnitTest extends TestCase +import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidTestCase; + +public class AMQConnectionUnitTest extends QpidTestCase { + String _url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'"; + + public void testVerifyQueueOnSendDefault() throws Exception + { + MockAMQConnection connection = new MockAMQConnection(_url); + assertFalse(connection.validateQueueOnSend()); + } + + public void testVerifyQueueOnSendViaSystemProperty() throws Exception + { + setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "true"); + MockAMQConnection connection = new MockAMQConnection(_url); + assertTrue(connection.validateQueueOnSend()); + + setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"); + connection = new MockAMQConnection(_url); + assertFalse(connection.validateQueueOnSend()); + } + + public void testVerifyQueueOnSendViaURL() throws Exception + { + MockAMQConnection connection = new MockAMQConnection(_url + "&" + ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND + "='true'"); + assertTrue(connection.validateQueueOnSend()); + + connection = new MockAMQConnection(_url + "&" + ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND + "='false'"); + assertFalse(connection.validateQueueOnSend()); + } + + public void testVerifyQueueOnSendViaURLoverridesSystemProperty() throws Exception + { + setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"); + MockAMQConnection connection = new MockAMQConnection(_url + "&" + ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND + "='true'"); + assertTrue(connection.validateQueueOnSend()); + } public void testExceptionReceived() { - String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'"; AMQInvalidArgumentException expectedException = new AMQInvalidArgumentException("Test", null); final AtomicReference<JMSException> receivedException = new AtomicReference<JMSException>(); try { - MockAMQConnection connection = new MockAMQConnection(url); + MockAMQConnection connection = new MockAMQConnection(_url); connection.setExceptionListener(new ExceptionListener() { @@ -62,4 +97,22 @@ public class AMQConnectionUnitTest extends TestCase assertEquals("JMSException linked exception is incorrect", expectedException, exception.getLinkedException()); } + /** + * This should expand to test all the defaults. + */ + public void testDefaultStreamMessageEncoding() throws Exception + { + MockAMQConnection connection = new MockAMQConnection(_url); + assertTrue("Legacy Stream message encoding should be the default",connection.isUseLegacyStreamMessageFormat()); + } + + /** + * This should expand to test all the connection properties. + */ + public void testStreamMessageEncodingProperty() throws Exception + { + MockAMQConnection connection = new MockAMQConnection(_url + "&use_legacy_stream_msg_format='false'"); + assertFalse("Stream message encoding should be amqp/list",connection.isUseLegacyStreamMessageFormat()); + } + } diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 028e2d5cc3..40ed9319f1 100644 --- a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -18,6 +18,7 @@ */ package org.apache.qpid.client; +import org.apache.qpid.client.message.AMQPEncodedListMessage; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.*; @@ -28,6 +29,8 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.StreamMessage; + import java.util.ArrayList; import java.util.List; @@ -276,7 +279,7 @@ public class AMQSession_0_10Test extends QpidTestCase { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, null, null, false, true); - session.sendConsume(consumer, new AMQShortString("test"), null, true, 1); + session.sendConsume(consumer, new AMQShortString("test"), true, 1); } catch (Exception e) { @@ -459,6 +462,13 @@ public class AMQSession_0_10Test extends QpidTestCase assertNotNull("ExchangeDeclare event was not sent", event); } + public void testCreateStreamMessage() throws Exception + { + AMQSession_0_10 session = createAMQSession_0_10(); + StreamMessage m = session.createStreamMessage(); + assertTrue("Legacy Stream message encoding should be the default" + m.getClass(),!(m instanceof AMQPEncodedListMessage)); + } + public void testGetQueueDepthWithSync() { // slow down a flush thread @@ -587,7 +597,7 @@ public class AMQSession_0_10Test extends QpidTestCase connection.setSessionFactory(new SessionFactory() { - public Session newSession(Connection conn, Binary name, long expiry) + public Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay) { return new MockSession(conn, new SessionDelegate(), name, expiry, throwException); } @@ -660,7 +670,6 @@ public class AMQSession_0_10Test extends QpidTestCase if (m instanceof ExchangeBound) { ExchangeBoundResult struc = new ExchangeBoundResult(); - struc.setQueueNotFound(true); result.setValue(struc); } else if (m instanceof ExchangeQuery) diff --git a/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java b/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java index 722cbd0752..066ece7ed1 100644 --- a/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java +++ b/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java @@ -48,7 +48,7 @@ public class BasicMessageConsumer_0_8_Test extends TestCase TestAMQSession testSession = new TestAMQSession(conn); BasicMessageConsumer_0_8 consumer = - new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour()); } @@ -68,7 +68,7 @@ public class BasicMessageConsumer_0_8_Test extends TestCase final TestAMQSession testSession = new TestAMQSession(conn); final BasicMessageConsumer_0_8 consumer = - new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour()); } @@ -94,7 +94,7 @@ public class BasicMessageConsumer_0_8_Test extends TestCase TestAMQSession testSession = new TestAMQSession(conn); BasicMessageConsumer_0_8 consumer = - new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour()); } diff --git a/java/client/src/test/java/org/apache/qpid/client/message/AMQPEncodedListMessageUnitTest.java b/java/client/src/test/java/org/apache/qpid/client/message/AMQPEncodedListMessageUnitTest.java new file mode 100644 index 0000000000..e131ab3dd2 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/message/AMQPEncodedListMessageUnitTest.java @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import javax.jms.MessageFormatException; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.codec.BBEncoder; + +public class AMQPEncodedListMessageUnitTest extends QpidTestCase +{ + + Map<String,String> _map = new HashMap<String,String>(); + List<Object> _list = new ArrayList<Object>(); + UUID _uuid = UUID.randomUUID(); + + @Override + public void setUp() throws Exception + { + super.setUp(); + _map.put("Key1","String1"); + _map.put("Key2","String2"); + _map.put("Key3","String3"); + + _list.add(1); + _list.add(2); + _list.add(3); + } + + /** + * Test whether we accept the correct types while rejecting invalid types. + */ + public void testAddObject() throws Exception + { + AMQPEncodedListMessage m = new AMQPEncodedListMessage(AMQMessageDelegateFactory.FACTORY_0_10); + m.add(true); + m.add((byte)256); + m.add(Short.MAX_VALUE); + m.add(Integer.MAX_VALUE); + m.add(Long.MAX_VALUE); + m.add(10.22); + m.add("Msg"); + m.add("Msg".getBytes()); + m.add(_list); + m.add(_map); + m.add(_uuid); + + try + { + m.add(new Object()); + fail("Validation for element type failed"); + } + catch (MessageFormatException e) + { + } + } + + public void testListBehaviorForIncommingMsg() throws Exception + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeList(_list); + AMQPEncodedListMessage m = new AMQPEncodedListMessage(new AMQMessageDelegate_0_10(),encoder.segment()); + + assertTrue("contains(Object) method did not return true as expected",m.contains(1)); + assertFalse("contains(Object) method did not return false as expected",m.contains(5)); + assertEquals("get(index) method returned incorrect value",((Integer)m.get(1)).intValue(),2); + assertEquals("indexOf(Object) method returned incorrect index",m.indexOf(2),1); + try + { + m.get(10); + } + catch (MessageFormatException e) + { + assertTrue("Incorrect exception type. Expected IndexOutOfBoundsException", e.getCause() instanceof IndexOutOfBoundsException); + } + } + + public void testStreamMessageInterfaceForIncommingMsg() throws Exception + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeList(getList()); + AMQPEncodedListMessage m = new AMQPEncodedListMessage(new AMQMessageDelegate_0_10(),encoder.segment()); + + assertEquals(true,m.readBoolean()); + assertEquals((byte)256,m.readByte()); + assertEquals(Short.MAX_VALUE,m.readShort()); + assertEquals(Integer.MAX_VALUE,m.readInt()); + assertEquals(Long.MAX_VALUE,m.readLong()); + assertEquals(10.22,m.readDouble()); + assertEquals("Msg",m.readString()); + assertEquals(_list,(List)m.readObject()); + assertEquals(_map,(Map)m.readObject()); + assertEquals(_uuid,(UUID)m.readObject()); + } + + public void testMapMessageInterfaceForIncommingMsg() throws Exception + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeList(getList()); + AMQPEncodedListMessage m = new AMQPEncodedListMessage(new AMQMessageDelegate_0_10(),encoder.segment()); + + assertEquals(true,m.getBoolean("0")); + assertEquals((byte)256,m.getByte("1")); + assertEquals(Short.MAX_VALUE,m.getShort("2")); + assertEquals(Integer.MAX_VALUE,m.getInt("3")); + assertEquals(Long.MAX_VALUE,m.getLong("4")); + assertEquals(10.22,m.getDouble("5")); + assertEquals("Msg",m.getString("6")); + assertEquals(_list,(List)m.getObject("7")); + assertEquals(_map,(Map)m.getObject("8")); + assertEquals(_uuid,(UUID)m.getObject("9")); + } + + public List<Object> getList() + { + List<Object> myList = new ArrayList<Object>(); + myList.add(true); + myList.add((byte)256); + myList.add(Short.MAX_VALUE); + myList.add(Integer.MAX_VALUE); + myList.add(Long.MAX_VALUE); + myList.add(10.22); + myList.add("Msg"); + myList.add(_list); + myList.add(_map); + myList.add(_uuid); + return myList; + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java b/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java new file mode 100644 index 0000000000..7401168978 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.messaging.address; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQDestination.AddressOption; +import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Link.Reliability; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.test.utils.QpidTestCase; + +public class AddressHelperTest extends QpidTestCase +{ + public void testAddressOptions() throws Exception + { + Address addr = Address.parse("queue/test;{create:sender, assert:always, delete:receiver, mode:browse}"); + AddressHelper helper = new AddressHelper(addr); + assertEquals(AddressOption.SENDER,AddressOption.getOption(helper.getCreate())); + assertEquals(AddressOption.ALWAYS,AddressOption.getOption(helper.getAssert())); + assertEquals(AddressOption.RECEIVER,AddressOption.getOption(helper.getDelete())); + assertTrue("'mode' option wasn't read properly",helper.isBrowseOnly()); + } + + public void testNodeProperties() throws Exception + { + Address addr = Address.parse("my-queue;{" + + "node: " + + "{" + + "type: queue ," + + "durable: true ," + + "x-declare: " + + "{" + + "exclusive: true," + + "auto-delete: true," + + "alternate-exchange: 'amq.fanout'," + + "arguments: {" + + "'qpid.max_size': 1000," + + "'qpid.max_count': 100" + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', queue:my-queue, key : test}, " + + "{exchange : 'amq.fanout', queue:my-queue}," + + "{exchange: 'amq.match', queue:my-queue, arguments: {x-match: any, dep: sales, loc: CA}}," + + "{exchange : 'amq.topic',queue:my-queue, key : 'a.#'}" + + "]" + + + "}" + + "}"); + AddressHelper helper = new AddressHelper(addr); + Node node = helper.getNode(); + assertEquals("'type' property wasn't read properly",AMQDestination.QUEUE_TYPE,helper.getNodeType()); + assertTrue("'durable' property wasn't read properly",node.isDurable()); + assertTrue("'auto-delete' property wasn't read properly",node.isAutoDelete()); + assertTrue("'exclusive' property wasn't read properly",node.isExclusive()); + assertEquals("'alternate-exchange' property wasn't read properly","amq.fanout",node.getAlternateExchange()); + assertEquals("'arguments' in 'x-declare' property wasn't read properly",2,node.getDeclareArgs().size()); + assertEquals("'bindings' property wasn't read properly",4,node.getBindings().size()); + for (Binding binding: node.getBindings()) + { + assertTrue("property 'exchange' in bindings wasn't read properly",binding.getExchange().startsWith("amq.")); + assertEquals("property 'queue' in bindings wasn't read properly","my-queue",binding.getQueue()); + if (binding.getExchange().equals("amq.direct")) + { + assertEquals("'key' property in bindings wasn't read properly","test",binding.getBindingKey()); + } + if (binding.getExchange().equals("amq.match")) + { + assertEquals("'arguments' property in bindings wasn't read properly",3,binding.getArgs().size()); + } + } + } + + public void testLinkProperties() throws Exception + { + Address addr = Address.parse("my-queue;{" + + "link: " + + "{" + + "name: my-queue ," + + "durable: true ," + + "reliability: at-least-once," + + "capacity: {source:10, target:15}," + + "x-declare: " + + "{" + + "exclusive: true," + + "auto-delete: true," + + "alternate-exchange: 'amq.fanout'," + + "arguments: {" + + "'qpid.max_size': 1000," + + "'qpid.max_count': 100" + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', queue:my-queue, key : test}, " + + "{exchange : 'amq.fanout', queue:my-queue}," + + "{exchange: 'amq.match', queue:my-queue, arguments: {x-match: any, dep: sales, loc: CA}}," + + "{exchange : 'amq.topic',queue:my-queue, key : 'a.#'}" + + "]," + + "x-subscribes:{exclusive: true, arguments: {a:b,x:y}}" + + "}" + + "}"); + + AddressHelper helper = new AddressHelper(addr); + Link link = helper.getLink(); + assertEquals("'name' property wasn't read properly","my-queue",link.getName()); + assertTrue("'durable' property wasn't read properly",link.isDurable()); + assertEquals("'reliability' property wasn't read properly",Reliability.AT_LEAST_ONCE,link.getReliability()); + assertTrue("'auto-delete' property in 'x-declare' wasn't read properly",link.getSubscriptionQueue().isAutoDelete()); + assertTrue("'exclusive' property in 'x-declare' wasn't read properly",link.getSubscriptionQueue().isExclusive()); + assertEquals("'alternate-exchange' property in 'x-declare' wasn't read properly","amq.fanout",link.getSubscriptionQueue().getAlternateExchange()); + assertEquals("'arguments' in 'x-declare' property wasn't read properly",2,link.getSubscriptionQueue().getDeclareArgs().size()); + assertEquals("'bindings' property wasn't read properly",4,link.getBindings().size()); + for (Binding binding: link.getBindings()) + { + assertTrue("property 'exchange' in bindings wasn't read properly",binding.getExchange().startsWith("amq.")); + assertEquals("property 'queue' in bindings wasn't read properly","my-queue",binding.getQueue()); + if (binding.getExchange().equals("amq.direct")) + { + assertEquals("'key' property in bindings wasn't read properly","test",binding.getBindingKey()); + } + if (binding.getExchange().equals("amq.match")) + { + assertEquals("'arguments' property in bindings wasn't read properly",3,binding.getArgs().size()); + } + } + assertTrue("'exclusive' property in 'x-subscribe' wasn't read properly",link.getSubscription().isExclusive()); + assertEquals("'arguments' in 'x-subscribe' property wasn't read properly",2,link.getSubscription().getArgs().size()); + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/client/security/DynamicSaslRegistrarTest.java b/java/client/src/test/java/org/apache/qpid/client/security/DynamicSaslRegistrarTest.java new file mode 100644 index 0000000000..4281984212 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/security/DynamicSaslRegistrarTest.java @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.security; + +import java.io.File; +import java.security.Provider; +import java.security.Security; + +import org.apache.qpid.client.security.DynamicSaslRegistrar.ProviderRegistrationResult; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.TestFileUtils; + +public class DynamicSaslRegistrarTest extends QpidTestCase +{ + private Provider _registeredProvider; + + public void setUp() throws Exception + { + super.setUp(); + + //If the client provider is already registered, remove it for the duration of the test + _registeredProvider = DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME); + if (_registeredProvider != null) + { + Security.removeProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME); + } + } + + public void tearDown() throws Exception + { + //Remove any provider left behind by the test. + Security.removeProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME); + try + { + //If the client provider was already registered before the test, restore it. + if (_registeredProvider != null) + { + Security.insertProviderAt(_registeredProvider, 1); + } + } + finally + { + super.tearDown(); + } + } + + public void testRegisterDefaultProvider() + { + assertNull("Provider should not yet be registered", DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + + ProviderRegistrationResult firstRegistrationResult = DynamicSaslRegistrar.registerSaslProviders(); + assertEquals("Unexpected registration result", ProviderRegistrationResult.SUCCEEDED, firstRegistrationResult); + assertNotNull("Providers should now be registered", DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + } + + public void testRegisterDefaultProviderTwice() + { + assertNull("Provider should not yet be registered", DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + + DynamicSaslRegistrar.registerSaslProviders(); + assertNotNull("Providers should now be registered", DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + + ProviderRegistrationResult result = DynamicSaslRegistrar.registerSaslProviders(); + assertEquals("Unexpected registration result when trying to re-register", ProviderRegistrationResult.EQUAL_ALREADY_REGISTERED, result); + assertNotNull("Providers should still be registered", DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + } + + @SuppressWarnings("serial") + public void testRegisterDefaultProviderWhenAnotherIsAlreadyPresentWithDifferentFactories() + { + assertNull("Provider should not be registered", DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + + //Add a test provider with the same name, version, info as the default client provider, but with different factory properties (none). + Provider testProvider = new Provider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME, + JCAProvider.QPID_CLIENT_SASL_PROVIDER_VERSION, + JCAProvider.QPID_CLIENT_SASL_PROVIDER_INFO){}; + Security.addProvider(testProvider); + assertSame("Test provider should be registered", testProvider, DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + + //Try to register the default provider now that another with the same name etc (but different factories) + //is already registered, expect it not to be registered as a result. + ProviderRegistrationResult result = DynamicSaslRegistrar.registerSaslProviders(); + assertEquals("Unexpected registration result", ProviderRegistrationResult.DIFFERENT_ALREADY_REGISTERED, result); + + //Verify the test provider is still registered + assertSame("Test provider should still be registered", testProvider, DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + } + + public void testRegisterWithNoFactories() + { + File emptyTempFile = TestFileUtils.createTempFile(this); + + assertNull("Provider should not be registered", DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + + //Adjust the location of the properties file to point at an empty file, so no factories are found to register. + setTestSystemProperty("amq.dynamicsaslregistrar.properties", emptyTempFile.getPath()); + + //Try to register the default provider, expect it it not to be registered because there were no factories. + ProviderRegistrationResult result = DynamicSaslRegistrar.registerSaslProviders(); + assertEquals("Unexpected registration result", ProviderRegistrationResult.NO_SASL_FACTORIES, result); + + assertNull("Provider should not be registered", DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + } + + public void testRegisterWithMissingFileGetsDefault() + { + //Create a temp file and then delete it, such that we get a path which doesn't exist + File tempFile = TestFileUtils.createTempFile(this); + assertTrue("Failed to delete file", tempFile.delete()); + + assertNull("Provider should not be registered", DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + + //Adjust the location of the properties file to point at non-existent file. + setTestSystemProperty("amq.dynamicsaslregistrar.properties", tempFile.getPath()); + + //Try to register the default provider, expect it to fall back to the default in the jar and succeed. + ProviderRegistrationResult result = DynamicSaslRegistrar.registerSaslProviders(); + assertEquals("Unexpected registration result", ProviderRegistrationResult.SUCCEEDED, result); + + assertNotNull("Provider should be registered", DynamicSaslRegistrar.findProvider(JCAProvider.QPID_CLIENT_SASL_PROVIDER_NAME)); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java index 412c458247..1e9e5b00a5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java @@ -120,6 +120,48 @@ public class BrokerDetailsTest extends TestCase { assertTrue(urise.getReason().equals("Illegal character in port number")); } + } + + public void testToStringMasksKeyStorePassword() throws Exception + { + String url = "tcp://localhost:5672?key_store_password='password'"; + BrokerDetails details = new AMQBrokerDetails(url); + + String expectedToString = "tcp://localhost:5672?key_store_password='********'"; + String actualToString = details.toString(); + + assertEquals("Unexpected toString", expectedToString, actualToString); + } + + public void testToStringMasksTrustStorePassword() throws Exception + { + String url = "tcp://localhost:5672?trust_store_password='password'"; + BrokerDetails details = new AMQBrokerDetails(url); + + String expectedToString = "tcp://localhost:5672?trust_store_password='********'"; + String actualToString = details.toString(); + + assertEquals("Unexpected toString", expectedToString, actualToString); + } + + public void testDefaultSsl() throws URLSyntaxException + { + String brokerURL = "tcp://localhost:5672"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + + assertNull("default value should be null", broker.getProperty(BrokerDetails.OPTIONS_SSL)); + } + + public void testOverridingSsl() throws URLSyntaxException + { + String brokerURL = "tcp://localhost:5672?ssl='true'"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + + assertTrue("value should be true", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL))); + + brokerURL = "tcp://localhost:5672?ssl='false''&maxprefetch='1'"; + broker = new AMQBrokerDetails(brokerURL); + assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL))); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 392ef1f29b..8c193622e3 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -30,7 +30,6 @@ import org.apache.qpid.url.URLSyntaxException; public class ConnectionURLTest extends TestCase { - public void testFailoverURL() throws URLSyntaxException { String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''"; @@ -252,55 +251,47 @@ public class ConnectionURLTest extends TestCase assertTrue(service.getPort() == 5672); } - public void testSingleTransportDefaultedBrokerWithIPandPort() throws URLSyntaxException + public void testConnectionURLOptionToStringMasksPassword() throws URLSyntaxException { - String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1:1234'"; + String url = "amqp://guest:guest@client/localhost?brokerlist='tcp://localhost:1234'"; + ConnectionURL connectionurl = new AMQConnectionURL(url); + + String expectedToString = "amqp://guest:********@client/localhost?brokerlist='tcp://localhost:1234'"; + String actualToString = connectionurl.toString(); + assertEquals("Unexpected toString form", expectedToString, actualToString); + } + + public void testConnectionURLOptionToStringMasksSslTrustStorePassword() throws URLSyntaxException + { + String url = "amqp://guest:guest@client/vhost?brokerlist='tcp://host:1234?trust_store_password='truststorepassword''"; + ConnectionURL connectionurl = new AMQConnectionURL(url); -// ConnectionURL connectionurl = new AMQConnectionURL(url); -// -// assertTrue(connectionurl.getFailoverMethod() == null); -// assertTrue(connectionurl.getUsername().equals("guest")); -// assertTrue(connectionurl.getPassword().equals("guest")); -// assertTrue(connectionurl.getVirtualHost().equals("/temp")); -// -// -// assertTrue(connectionurl.getBrokerCount() == 1); -// -// BrokerDetails service = connectionurl.getBrokerDetails(0); -// -// assertTrue(service.getTransport().equals("tcp")); -// -// assertTrue(service.getHost().equals("127.0.0.1")); -// assertTrue(service.getPort() == 1234); + String expectedToString = "amqp://guest:********@client/vhost?brokerlist='tcp://host:1234?trust_store_password='********''"; + String actualToString = connectionurl.toString(); + assertEquals("Unexpected toString form", expectedToString, actualToString); + } + + public void testConnectionURLOptionToStringMasksSslKeyStorePassword() throws URLSyntaxException + { + String url = "amqp://guest:guest@client/vhost?brokerlist='tcp://host:1234?key_store_password='keystorepassword1';tcp://host:1235?key_store_password='keystorepassword2''"; + ConnectionURL connectionurl = new AMQConnectionURL(url); + + String expectedToString = "amqp://guest:********@client/vhost?brokerlist='tcp://host:1234?key_store_password='********';tcp://host:1235?key_store_password='********''"; + String actualToString = connectionurl.toString(); + assertEquals("Unexpected toString form", expectedToString, actualToString); } /** * Test for QPID-3662 to ensure the {@code toString()} representation is correct. */ - public void testConnectionURLOptionToString() throws URLSyntaxException + public void testConnectionURLOptionToStringWithMaxPreftech() throws URLSyntaxException { String url = "amqp://guest:guest@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''"; ConnectionURL connectionurl = new AMQConnectionURL(url); - assertNull(connectionurl.getFailoverMethod()); - assertEquals("guest", connectionurl.getUsername()); - assertEquals("guest", connectionurl.getPassword()); - assertEquals("client", connectionurl.getClientName()); - assertEquals("/localhost", connectionurl.getVirtualHost()); - assertEquals("1", connectionurl.getOption("maxprefetch")); - assertTrue(connectionurl.getBrokerCount() == 1); - - BrokerDetails service = connectionurl.getBrokerDetails(0); - assertTrue(service.getTransport().equals("tcp")); - assertTrue(service.getHost().equals("localhost")); - assertTrue(service.getPort() == 1234); - assertTrue(service.getProperties().containsKey("tcp_nodelay")); - assertEquals("true", service.getProperties().get("tcp_nodelay")); - - String nopasswd = "amqp://guest:********@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''"; - String tostring = connectionurl.toString(); - assertEquals(tostring.indexOf("maxprefetch"), tostring.lastIndexOf("maxprefetch")); - assertEquals(nopasswd, tostring); + String expectedToString = "amqp://guest:********@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''"; + String actualToString = connectionurl.toString(); + assertEquals("Unexpected toString form", expectedToString, actualToString); } public void testSingleTransportMultiOptionURL() throws URLSyntaxException @@ -572,9 +563,64 @@ public class ConnectionURLTest extends TestCase connectionurl.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR)); } - public static junit.framework.Test suite() + /** + * Verify that when the ssl option is not specified, asking for the option returns null, + * such that this can later be used to verify it wasnt specified. + */ + public void testDefaultSsl() throws URLSyntaxException { - return new junit.framework.TestSuite(ConnectionURLTest.class); + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'"; + ConnectionURL connectionURL = new AMQConnectionURL(url); + + assertNull("default ssl value should be null", connectionURL.getOption(ConnectionURL.OPTIONS_SSL)); + } + + /** + * Verify that when the ssl option is specified, asking for the option returns the value, + * such that this can later be used to verify what value it was specified as. + */ + public void testOverridingSsl() throws URLSyntaxException + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&ssl='true'"; + ConnectionURL connectionURL = new AMQConnectionURL(url); + + assertTrue("value should be true", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_SSL))); + + url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&ssl='false'"; + connectionURL = new AMQConnectionURL(url); + + assertFalse("value should be false", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_SSL))); + } + + /** + * Verify that when the {@value ConnectionURL#OPTIONS_VERIFY_QUEUE_ON_SEND} option is not + * specified, asking for the option returns null, such that this can later be used to + * verify it wasn't specified. + */ + public void testDefaultVerifyQueueOnSend() throws URLSyntaxException + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'"; + ConnectionURL connectionURL = new AMQConnectionURL(url); + + assertNull("default ssl value should be null", connectionURL.getOption(ConnectionURL.OPTIONS_SSL)); + } + + /** + * Verify that when the {@value ConnectionURL#OPTIONS_VERIFY_QUEUE_ON_SEND} option is + * specified, asking for the option returns the value, such that this can later be used + * to verify what value it was specified as. + */ + public void testOverridingVerifyQueueOnSend() throws URLSyntaxException + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&verifyQueueOnSend='true'"; + ConnectionURL connectionURL = new AMQConnectionURL(url); + + assertTrue("value should be true", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND))); + + url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&verifyQueueOnSend='false'"; + connectionURL = new AMQConnectionURL(url); + + assertFalse("value should be false", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND))); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java index 9addb0ee71..8f578e6a2f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java @@ -193,6 +193,126 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getQueueName().equals("test:testQueueD")); } + public void testExchangeOptionsNotPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL)); + + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertFalse(dest.isExchangeAutoDelete()); + assertFalse(dest.isExchangeDurable()); + assertFalse(dest.isExchangeInternal()); + } + + public void testExchangeAutoDeleteOptionPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_AUTODELETE + "='true'"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + + assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL)); + + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertTrue(dest.isExchangeAutoDelete()); + assertFalse(dest.isExchangeDurable()); + assertFalse(dest.isExchangeInternal()); + } + + public void testExchangeDurableOptionPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_DURABLE + "='true'"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + + assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL)); + + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertTrue(dest.isExchangeDurable()); + assertFalse(dest.isExchangeAutoDelete()); + assertFalse(dest.isExchangeInternal()); + } + + public void testExchangeInternalOptionPresent() throws URISyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?" + BindingURL.OPTION_EXCHANGE_INTERNAL + "='true'"; + + AMQBindingURL burl = new AMQBindingURL(url); + + assertTrue(url.equals(burl.toString())); + + assertEquals("true", burl.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE)); + assertNull(burl.getOption(BindingURL.OPTION_EXCHANGE_DURABLE)); + + class MyTestAMQDestination extends AMQDestination + { + public MyTestAMQDestination(BindingURL url) + { + super(url); + } + public boolean isNameRequired() + { + return false; + } + }; + + AMQDestination dest = new MyTestAMQDestination(burl); + assertTrue(dest.isExchangeInternal()); + assertFalse(dest.isExchangeDurable()); + assertFalse(dest.isExchangeAutoDelete()); + } + public void testRejectBehaviourPresent() throws URISyntaxException { String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'"; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index f199961b6f..4ad9069ba0 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -124,7 +124,7 @@ public class TestAMQSession extends AMQSession_0_8 return false; } - public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException + public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException { } @@ -139,13 +139,13 @@ public class TestAMQSession extends AMQSession_0_8 return null; } - public void sendExchangeDeclare(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException + public void sendExchangeDeclare(AMQShortString name, AMQShortString type, boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { } public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler, - boolean nowait, boolean passive) throws AMQException, FailoverException + boolean passive) throws AMQException, FailoverException { } @@ -189,14 +189,6 @@ public class TestAMQSession extends AMQSession_0_8 { } - public void handleAddressBasedDestination(AMQDestination dest, - boolean isConsumer, - boolean noWait) throws AMQException - { - throw new UnsupportedOperationException("The new addressing based sytanx is " - + "not supported for AMQP 0-8/0-9 versions"); - } - @Override protected void flushAcknowledgments() { |