From 633c33f224f3196f3f9bd80bd2e418d8143fea06 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 4 May 2012 15:39:19 +0000 Subject: QPID-3858: Updated branch - merged from trunk r.1333987 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/src/main/java/client.bnd | 2 +- .../org/apache/qpid/client/AMQBrokerDetails.java | 8 +- .../java/org/apache/qpid/client/AMQConnection.java | 29 ++- .../qpid/client/AMQConnectionDelegate_0_10.java | 2 + .../org/apache/qpid/client/AMQQueueBrowser.java | 68 ++++++- .../java/org/apache/qpid/client/AMQSession.java | 141 +++++++------ .../org/apache/qpid/client/AMQSession_0_10.java | 64 +++--- .../org/apache/qpid/client/AMQSession_0_8.java | 19 +- .../apache/qpid/client/BasicMessageConsumer.java | 18 +- .../apache/qpid/client/BasicMessageProducer.java | 222 ++++++++++++--------- .../qpid/client/BasicMessageProducer_0_10.java | 4 +- .../qpid/client/BasicMessageProducer_0_8.java | 8 +- .../java/org/apache/qpid/client/Closeable.java | 7 +- .../org/apache/qpid/client/XAResourceImpl.java | 25 ++- .../java/org/apache/qpid/client/XASessionImpl.java | 16 +- .../client/handler/ClientMethodDispatcherImpl.java | 4 +- .../qpid/client/message/FieldTableSupport.java | 7 +- .../qpid/client/protocol/AMQProtocolHandler.java | 3 +- .../qpid/client/protocol/AMQProtocolSession.java | 15 +- .../apache/qpid/client/util/BlockingWaiter.java | 39 ++-- .../jndi/PropertiesFileInitialContextFactory.java | 62 ++++-- 21 files changed, 475 insertions(+), 288 deletions(-) (limited to 'java/client/src/main') diff --git a/java/client/src/main/java/client.bnd b/java/client/src/main/java/client.bnd index d92d582ec8..495ea6793f 100755 --- a/java/client/src/main/java/client.bnd +++ b/java/client/src/main/java/client.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.15.0 +ver: 0.17.0 Bundle-SymbolicName: qpid-client Bundle-Version: ${ver} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 771e80c3bc..987404cb80 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -319,18 +319,18 @@ public class AMQBrokerDetails implements BrokerDetails BrokerDetails bd = (BrokerDetails) o; - return _host.equalsIgnoreCase(bd.getHost()) && + return _host.toLowerCase().equals(bd.getHost() == null ? null : bd.getHost().toLowerCase()) && (_port == bd.getPort()) && - _transport.equalsIgnoreCase(bd.getTransport()); + _transport.toLowerCase().equals(bd.getTransport() == null ? null : bd.getTransport().toLowerCase()); //TODO do we need to compare all the options as well? } @Override public int hashCode() { - int result = _host != null ? _host.hashCode() : 0; + int result = _host != null ? _host.toLowerCase().hashCode() : 0; result = 31 * result + _port; - result = 31 * result + (_transport != null ? _transport.hashCode() : 0); + result = 31 * result + (_transport != null ? _transport.toLowerCase().hashCode() : 0); return result; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 39ad282422..23b47c8d67 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -308,9 +308,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _delegate = new AMQConnectionDelegate_0_10(this); } - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Connection:" + connectionURL); + _logger.debug("Connection:" + connectionURL); } _connectionURL = connectionURL; @@ -343,7 +343,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler = new AMQProtocolHandler(this); - _logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } // We are not currently connected setConnected(false); @@ -435,7 +438,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } - _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } _sessions.setMaxChannelID(_delegate.getMaxChannelID()); _sessions.setMinChannelID(_delegate.getMinChannelID()); @@ -462,7 +468,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect String delegateClassName = String.format ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s", pe.getMajorVersion(), pe.getMinorVersion()); - _logger.info("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe); + if (_logger.isDebugEnabled()) + { + _logger.debug("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe); + } Class c = Class.forName(delegateClassName); Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; @@ -569,6 +578,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { + resetClosedFlag(); return _delegate.makeBrokerConnection(brokerDetail); } @@ -968,7 +978,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); - return null; + throw new JmsNotImplementedException(); + } public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, @@ -976,7 +987,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); - return null; + throw new JmsNotImplementedException(); } public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, @@ -984,7 +995,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); - return null; + throw new JmsNotImplementedException(); } public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, @@ -993,7 +1004,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // TODO Auto-generated method stub checkNotClosed(); - return null; + throw new JmsNotImplementedException(); } public long getMaximumChannelCount() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 56ee56d178..a1a06c5547 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -327,6 +327,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } + _conn.setClosed(); + ExceptionListener listener = _conn.getExceptionListenerNoCheck(); if (listener == null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 2313bce474..0c6031ea91 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.client.filter.JMSSelectorFilter; +import org.apache.qpid.protocol.AMQConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,12 +54,50 @@ public class AMQQueueBrowser implements QueueBrowser _session = session; _queue = queue; _messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector; - // Create Consumer to verify message selector. - BasicMessageConsumer consumer = - (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); - // Close this consumer as we are not looking to consume only to establish that, at least for now, - // the QB can be created - consumer.close(); + + + validateQueue((AMQDestination) queue); + + if(_messageSelector != null) + { + validateSelector(_messageSelector); + } + } + + private void validateSelector(String messageSelector) throws InvalidSelectorException + { + try + { + new JMSSelectorFilter(messageSelector); + } + catch (AMQInternalException e) + { + throw new InvalidSelectorException(e.getMessage()); + } + } + + private void validateQueue(AMQDestination queue) throws JMSException + { + try + { + // Essentially just test the connection/session is still active + _session.sync(); + // TODO - should really validate queue exists, but we often rely on creating the consumer to create the queue :( + // _session.declareQueuePassive( queue ); + } + catch (AMQException e) + { + if(e.getErrorCode() == AMQConstant.NOT_FOUND) + { + throw new InvalidDestinationException(e.getMessage()); + } + else + { + final JMSException jmsException = new JMSException(e.getMessage(), String.valueOf(e.getErrorCode().getCode())); + jmsException.setLinkedException(e); + throw jmsException; + } + } } public Queue getQueue() throws JMSException @@ -88,6 +132,10 @@ public class AMQQueueBrowser implements QueueBrowser public Enumeration getEnumeration() throws JMSException { checkState(); + if(!_session.getAMQConnection().started()) + { + throw new IllegalStateException("Cannot enumerate message on the queue while the Connection is stopped"); + } final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); @@ -118,12 +166,12 @@ public class AMQQueueBrowser implements QueueBrowser _consumer = consumer; prefetchMessage(); } - _logger.info("QB:created with first element:" + _nextMessage); + _logger.debug("QB:created with first element:" + _nextMessage); } public boolean hasMoreElements() { - _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + _logger.debug("QB:hasMoreElements:" + (_nextMessage != null)); return (_nextMessage != null); } @@ -136,9 +184,9 @@ public class AMQQueueBrowser implements QueueBrowser } try { - _logger.info("QB:nextElement about to receive"); + _logger.debug("QB:nextElement about to receive"); prefetchMessage(); - _logger.info("QB:nextElement received:" + _nextMessage); + _logger.debug("QB:nextElement received:" + _nextMessage); } catch (JMSException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e7e937b689..55d3ccb6e7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -119,18 +119,6 @@ public abstract class AMQSession( @@ -2642,8 +2634,8 @@ public abstract class AMQSessionNote that this operation automatically retries in the event of fail-over. * - * @param amqd The destination to declare as a queue. - * @param protocolHandler The protocol handler to communicate through. * + * @param amqd The destination to declare as a queue. * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of * the client. * + * + * * @throws AMQException If the queue cannot be declared for any reason. * @todo Verify the destiation is valid or throw an exception. * @todo Be aware of possible changes to parameter order as versions change. */ - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal) throws AMQException { - return declareQueue(amqd, protocolHandler, noLocal, false); + return declareQueue(amqd, noLocal, false); } - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait) + throws AMQException + { + return declareQueue(amqd, noLocal, nowait, false); + } + + protected AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - /*return new FailoverRetrySupport(*/ + final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new FailoverNoopSupport( new FailoverProtectedOperation() { @@ -2767,7 +2773,7 @@ public abstract class AMQSession(*/ + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + return new FailoverNoopSupport( new FailoverProtectedOperation() { @@ -952,7 +950,7 @@ public class AMQSession_0_10 extends AMQSession extends Closeable implements Messa // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); + if(noLocal) + { + ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal); + } _arguments = ft; @@ -275,7 +279,10 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started."); } - _logger.debug("Message listener set for destination " + _destination); + if (_logger.isDebugEnabled()) + { + _logger.debug("Message listener set for destination " + _destination); + } if (messageListener != null) { @@ -553,9 +560,9 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa public void close(boolean sendClose) throws JMSException { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Closing consumer:" + debugIdentity()); + _logger.debug("Closing consumer:" + debugIdentity()); } if (!setClosed()) @@ -586,7 +593,10 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa // no point otherwise as the connection will be gone if (!_session.isClosed() || _session.isClosing()) { - sendCancel(); + synchronized(_session.getMessageDeliveryLock()) + { + sendCancel(); + } cleanupQueue(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 75f198e1fa..9b3b2ce0e9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -20,18 +20,8 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.MessageConverter; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.util.UUIDGen; -import org.apache.qpid.util.UUIDs; - +import java.io.UnsupportedEncodingException; +import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -42,77 +32,22 @@ import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; -import java.io.UnsupportedEncodingException; -import java.util.UUID; +import javax.jms.Topic; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageConverter; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { - /** - * If true, messages will not get a timestamp. - */ - protected boolean isDisableTimestamps() - { - return _disableTimestamps; - } - - protected void setDisableTimestamps(boolean disableTimestamps) - { - _disableTimestamps = disableTimestamps; - } - - protected void setDestination(AMQDestination destination) - { - _destination = destination; - } - - protected AMQProtocolHandler getProtocolHandler() - { - return _protocolHandler; - } - - protected void setProtocolHandler(AMQProtocolHandler protocolHandler) - { - _protocolHandler = protocolHandler; - } - - protected int getChannelId() - { - return _channelId; - } - - protected void setChannelId(int channelId) - { - _channelId = channelId; - } - - protected void setSession(AMQSession session) - { - _session = session; - } - - protected String getUserID() - { - return _userID; - } - - protected void setUserID(String userID) - { - _userID = userID; - } - - protected PublishMode getPublishMode() - { - return publishMode; - } - - protected void setPublishMode(PublishMode publishMode) - { - this.publishMode = publishMode; - } - enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; - private final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger ; private AMQConnection _connection; @@ -166,7 +101,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private final boolean _immediate; - private final boolean _mandatory; + private final Boolean _mandatory; private boolean _disableMessageId; @@ -174,14 +109,37 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private String _userID; // ref user id used in the connection. - private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; + + /** + * The default value for immediate flag used this producer is false. That is, a consumer does + * not need to be attached to a queue. + */ + private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + + /** + * The default value for mandatory flag used by this producer is true. That is, server will not + * silently drop messages where no queue is connected to the exchange for the message. + */ + private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + + /** + * The default value for mandatory flag used by this producer when publishing to a Topic is false. That is, server + * will silently drop messages where no queue is connected to the exchange for the message. + */ + private final boolean _defaultMandatoryTopicValue = + Boolean.parseBoolean(System.getProperty("qpid.default_mandatory_topic", + System.getProperties().containsKey("qpid.default_mandatory") + ? System.getProperty("qpid.default_mandatory") + : "false")); private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; - protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException + protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, + Boolean immediate, Boolean mandatory) throws AMQException { - _connection = connection; + _logger = logger; + _connection = connection; _destination = destination; _transacted = transacted; _protocolHandler = protocolHandler; @@ -193,8 +151,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac declareDestination(destination); } - _immediate = immediate; - _mandatory = mandatory; + _immediate = immediate == null ? _defaultImmediateValue : immediate; + _mandatory = mandatory == null + ? destination == null ? null + : destination instanceof Topic + ? _defaultMandatoryTopicValue + : _defaultMandatoryValue + : mandatory; + _userID = connection.getUsername(); setPublishMode(); } @@ -215,7 +179,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac publishMode = PublishMode.SYNC_PUBLISH_ALL; } - _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode); + if (_logger.isDebugEnabled()) + { + _logger.debug("MessageProducer " + toString() + " using publish mode : " + publishMode); + } } void resubscribe() throws AMQException @@ -381,7 +348,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, + sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, + _mandatory == null + ? destination instanceof Topic + ? _defaultMandatoryTopicValue + : _defaultMandatoryValue + : _mandatory, _immediate); } } @@ -394,7 +366,13 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, + _mandatory == null + ? destination instanceof Topic + ? _defaultMandatoryTopicValue + : _defaultMandatoryValue + : _mandatory, + _immediate); } } @@ -542,7 +520,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _logger.debug("Updating original message"); origMessage.setJMSPriority(message.getJMSPriority()); origMessage.setJMSTimestamp(message.getJMSTimestamp()); - _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); + } origMessage.setJMSExpiration(message.getJMSExpiration()); origMessage.setJMSMessageID(message.getJMSMessageID()); } @@ -646,6 +627,69 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } + /** + * If true, messages will not get a timestamp. + */ + protected boolean isDisableTimestamps() + { + return _disableTimestamps; + } + + protected void setDisableTimestamps(boolean disableTimestamps) + { + _disableTimestamps = disableTimestamps; + } + + protected void setDestination(AMQDestination destination) + { + _destination = destination; + } + + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + + protected void setProtocolHandler(AMQProtocolHandler protocolHandler) + { + _protocolHandler = protocolHandler; + } + + protected int getChannelId() + { + return _channelId; + } + + protected void setChannelId(int channelId) + { + _channelId = channelId; + } + + protected void setSession(AMQSession session) + { + _session = session; + } + + protected String getUserID() + { + return _userID; + } + + protected void setUserID(String userID) + { + _userID = userID; + } + + protected PublishMode getPublishMode() + { + return publishMode; + } + + protected void setPublishMode(PublishMode publishMode) + { + this.publishMode = publishMode; + } + Logger getLogger() { return _logger; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 024219cfd6..a3a1e9c28b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -61,9 +61,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, - boolean immediate, boolean mandatory) throws AMQException + Boolean immediate, Boolean mandatory) throws AMQException { - super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); + super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); userIDBytes = Strings.toUTF8(getUserID()); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 3b5e361f97..21ff6c877a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -33,6 +33,9 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; @@ -42,11 +45,12 @@ import java.util.UUID; public class BasicMessageProducer_0_8 extends BasicMessageProducer { + private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class); BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { - super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); + super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); } void declareDestination(AMQDestination destination) diff --git a/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/java/client/src/main/java/org/apache/qpid/client/Closeable.java index ba26bfc485..2f7fbad30c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/Closeable.java +++ b/java/client/src/main/java/org/apache/qpid/client/Closeable.java @@ -81,7 +81,7 @@ public abstract class Closeable } /** - * Checks if this is closis. + * Checks if this is closing. * * @return true if we are closing, false otherwise. */ @@ -90,6 +90,11 @@ public abstract class Closeable return _closing.get(); } + public void resetClosedFlag() + { + _closed.set(false); + } + protected boolean setClosed() { return _closed.getAndSet(true); diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 128aa18d30..af9048f1f5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -308,13 +308,16 @@ public class XAResourceImpl implements XAResource _xaSession.createSession(); convertExecutionErrorToXAErr( e.getException().getErrorCode()); } - Xid[] result = new Xid[res.getInDoubt().size()]; - int i = 0; - for (Object obj : res.getInDoubt()) + Xid[] result = new Xid[res.getInDoubt() != null ? res.getInDoubt().size() : 0]; + if(result.length != 0) { - org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj; - result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId()); - i++; + int i = 0; + for (Object obj : res.getInDoubt()) + { + org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj; + result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId()); + i++; + } } return result; } @@ -436,6 +439,16 @@ public class XAResourceImpl implements XAResource } } + /** + * Is this resource currently enlisted in a transaction? + * + * @return true if the resource is associated with a transaction, false otherwise. + */ + public boolean isEnlisted() + { + return (_xid != null) ; + } + //------------------------------------------------------------------------ // Private methods //------------------------------------------------------------------------ diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index 85623df8c0..f2efb6e8a5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -6,7 +6,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE 2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -18,6 +18,7 @@ package org.apache.qpid.client; import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.transport.RangeSet; import javax.jms.JMSException; import javax.jms.QueueSession; @@ -178,4 +179,17 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic { return (TopicSession) getSession(); } + + @Override + protected void acknowledgeImpl() + { + if (_xaResource.isEnlisted()) + { + acknowledgeMessage(Long.MAX_VALUE, true) ; + } + else + { + super.acknowledgeImpl() ; + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 558d93538b..e1a0e18262 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -95,9 +95,9 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session) { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("New Method Dispatcher:" + session); + _logger.debug("New Method Dispatcher:" + session); } DispatcherFactory factory = _dispatcherFactories.get(version); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java b/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java index 31a0440b04..bd63cdb5c5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java @@ -48,9 +48,12 @@ public class FieldTableSupport public static Map convertToMap(FieldTable ft) { Map map = new HashMap(); - for (AMQShortString key: ft.keySet() ) + if(ft != null) { - map.put(key.asString(), ft.getObject(key)); + for (AMQShortString key: ft.keySet() ) + { + map.put(key.asString(), ft.getObject(key)); + } } return map; diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index d380402da7..b314453e31 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -652,7 +652,8 @@ public class AMQProtocolHandler implements ProtocolEngine } writeFrame(frame); - return listener.blockForFrame(timeout); + long actualTimeout = timeout == -1 ? DEFAULT_SYNC_TIMEOUT : timeout; + return listener.blockForFrame(actualTimeout); // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index ced734f70f..af57fd98fc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -108,7 +108,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); - _logger.info("Using ProtocolVersion for Session:" + _protocolVersion); + if (_logger.isDebugEnabled()) + { + _logger.debug("Using ProtocolVersion for Session:" + _protocolVersion); + } _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); _connection = connection; @@ -302,7 +305,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void closeSession(AMQSession session) { - _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); + if (_logger.isDebugEnabled()) + { + _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); + } final int channelId = session.getChannelId(); if (channelId <= 0) { @@ -393,7 +399,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void setProtocolVersion(final ProtocolVersion pv) { - _logger.info("Setting ProtocolVersion to :" + pv); + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting ProtocolVersion to :" + pv); + } _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(pv); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); diff --git a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 80d171592f..22dc17e53c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -39,7 +39,7 @@ import java.util.concurrent.locks.ReentrantLock; * differs from a 'rendezvous' in that sense. * *

BlockingWaiters are used to coordinate when waiting for an an event that expect a response. - * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register + * They are always used in a 'one-shot' manner, that is, to receive just one response. Usually the caller has to register * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they * have been completed. * @@ -51,12 +51,12 @@ import java.util.concurrent.locks.ReentrantLock; *

*
CRC Card
Responsibilities Collaborations *
Accept generic objects as events for processing via {@link #process}. - *
Delegate handling and undserstanding of the object to a concrete implementation. + *
Delegate handling and understanding of the object to a concrete implementation. *
Block until {@link #process} determines that waiting is no longer required *
Propagate the most recent exception to the consumer. *
* - * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull + * @todo Interruption is caught but not handled. This could be allowed to fall through. This might actually be useful * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry * when this happens. At the very least, restore the interrupted status flag. * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to @@ -84,13 +84,13 @@ public abstract class BlockingWaiter /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ private volatile Exception _error; - /** Holds the incomming Object. */ + /** Holds the incoming Object. */ private Object _doneObject = null; private AtomicBoolean _waiting = new AtomicBoolean(false); private boolean _closed = false; /** - * Delegates processing of the incomming object to the handler. + * Delegates processing of the incoming object to the handler. * * @param object The object to process. * @@ -146,6 +146,11 @@ public abstract class BlockingWaiter */ public Object block(long timeout) throws AMQException, FailoverException { + if (timeout < 0) + { + throw new IllegalArgumentException("timeout must be zero or greater"); + } + long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout); _lock.lock(); @@ -165,26 +170,18 @@ public abstract class BlockingWaiter { try { - if (timeout == -1) - { - _receivedCondition.await(); - } - else - { - nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); + nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); - if (nanoTimeout <= 0 && !_ready && _error == null) - { - _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); - _ready = true; - } + if (nanoTimeout <= 0 && !_ready && _error == null) + { + _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); + _ready = true; } } catch (InterruptedException e) { _logger.error(e.getMessage(), e); - // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess - + // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivalent to success } } } @@ -285,8 +282,8 @@ public abstract class BlockingWaiter /** * Close this Waiter so that no more errors are processed. * This is a preventative method to ensure that a second error thread does not get stuck in the error method after - * the await has returned. This has not happend but in practise but if two errors occur on the Connection at - * the same time then it is conceiveably possible for the second to get stuck if the first one is processed by a + * the await has returned. This has not happened but in practise but if two errors occur on the Connection at + * the same time then it is conceivably possible for the second to get stuck if the first one is processed by a * waiter. * * Once closed any attempt to wait will throw an exception. diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index bc3f89849e..9b202a13ee 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -20,8 +20,26 @@ */ package org.apache.qpid.jndi; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.naming.ConfigurationException; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQDestination; @@ -33,23 +51,10 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Queue; -import javax.jms.Topic; -import javax.naming.ConfigurationException; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.spi.InitialContextFactory; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; public class PropertiesFileInitialContextFactory implements InitialContextFactory { @@ -60,6 +65,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor private String QUEUE_PREFIX = "queue."; private String TOPIC_PREFIX = "topic."; + @SuppressWarnings({ "rawtypes", "unchecked" }) public Context getInitialContext(Hashtable environment) throws NamingException { Map data = new ConcurrentHashMap(); @@ -68,6 +74,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { String file = null; + if (environment.containsKey(Context.PROVIDER_URL)) { file = (String) environment.get(Context.PROVIDER_URL); @@ -77,13 +84,23 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor file = System.getProperty(Context.PROVIDER_URL); } + // Load the properties specified if (file != null) { _logger.info("Loading Properties from:" + file); + BufferedInputStream inputStream = null; - // Load the properties specified - BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(file)); + if(file.contains("file:")) + { + inputStream = new BufferedInputStream(new FileInputStream(new File(new URI(file)))); + } + else + { + inputStream = new BufferedInputStream(new FileInputStream(file)); + } + Properties p = new Properties(); + try { p.load(inputStream); @@ -119,6 +136,11 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL) +"\n" + "Due to:"+ioe.getMessage()); } + catch(URISyntaxException uoe) + { + _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL) +"\n" + + "Due to:"+uoe.getMessage()); + } createConnectionFactories(data, environment); -- cgit v1.2.1