diff options
Diffstat (limited to 'qpid/java/jca/src/main')
8 files changed, 556 insertions, 557 deletions
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java index 1ded1a8db2..69320575b0 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java @@ -31,10 +31,8 @@ import org.slf4j.LoggerFactory; */ public class QpidRAProperties extends ConnectionFactoryProperties implements Serializable { - /** Serial version UID */ private static final long serialVersionUID = -4823893873707374791L; - /** The logger */ private static final Logger _log = LoggerFactory.getLogger(QpidRAProperties.class); private static final int DEFAULT_SETUP_ATTEMPTS = 10; @@ -45,16 +43,14 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser private long _setupInterval = DEFAULT_SETUP_INTERVAL; - /** Use Local TX instead of XA */ - private Boolean _localTx = false; - /** Class used to locate the Transaction Manager. */ private String _transactionManagerLocatorClass ; /** Method used to locate the TM */ private String _transactionManagerLocatorMethod ; - + private boolean _useConnectionPerHandler = true; + /** * Constructor */ @@ -146,10 +142,20 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser this._setupInterval = setupInterval; } + public boolean isUseConnectionPerHandler() + { + return _useConnectionPerHandler; + } + + public void setUseConnectionPerHandler(boolean connectionPerHandler) + { + this._useConnectionPerHandler = connectionPerHandler; + } + @Override public String toString() { - return "QpidRAProperties[localTx=" + _localTx + + return "QpidRAProperties[" + ", transactionManagerLocatorClass=" + _transactionManagerLocatorClass + ", transactionManagerLocatorMethod=" + _transactionManagerLocatorMethod + ", setupAttempts=" + _setupAttempts + diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java index 14b5354062..96fa83ceef 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java @@ -38,8 +38,6 @@ import javax.resource.spi.work.WorkManager; import javax.transaction.TransactionManager; import javax.transaction.xa.XAResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; @@ -47,6 +45,8 @@ import org.apache.qpid.client.XAConnectionImpl; import org.apache.qpid.ra.inflow.QpidActivation; import org.apache.qpid.ra.inflow.QpidActivationSpec; import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The resource adapter for Qpid @@ -54,43 +54,22 @@ import org.apache.qpid.url.URLSyntaxException; */ public class QpidResourceAdapter implements ResourceAdapter, Serializable { - /** - * - */ private static final long serialVersionUID = -2446231446818098726L; - /** - * The logger - */ - private static final Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class); + private static final transient Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class); - /** - * The bootstrap context - */ private BootstrapContext _ctx; - /** - * The resource adapter properties - */ private final QpidRAProperties _raProperties; - /** - * Have the factory been configured - */ private final AtomicBoolean _configured; - /** - * The activations by activation spec - */ private final Map<ActivationSpec, QpidActivation> _activations; private AMQConnectionFactory _defaultAMQConnectionFactory; private TransactionManager _tm; - /** - * Constructor - */ public QpidResourceAdapter() { if (_log.isTraceEnabled()) @@ -514,7 +493,27 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable } _raProperties.setSetupInterval(interval); } + + public Boolean isUseConnectionPerHandler() + { + if (_log.isTraceEnabled()) + { + _log.trace("isConnectionPerHandler()"); + } + + return _raProperties.isUseConnectionPerHandler(); + } + public void setUseConnectionPerHandler(Boolean connectionPerHandler) + { + if (_log.isTraceEnabled()) + { + _log.trace("setConnectionPerHandler(" + connectionPerHandler + ")"); + } + + _raProperties.setUseConnectionPerHandler(connectionPerHandler); + } + /** * Indicates whether some other object is "equal to" this one. * @@ -683,7 +682,8 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable private void locateTM() throws ResourceAdapterInternalException { - if(_raProperties.getTransactionManagerLocatorClass() != null && _raProperties.getTransactionManagerLocatorMethod() != null) + if(_raProperties.getTransactionManagerLocatorClass() != null + && _raProperties.getTransactionManagerLocatorMethod() != null) { String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";"); @@ -703,7 +703,7 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable if (_tm == null) { - _log.error("It wasn't possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod"); + _log.error("It was not possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod"); throw new ResourceAdapterInternalException("Could not locate javax.transaction.TransactionManager"); } else @@ -763,6 +763,7 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable final String client = (clientID != null ? clientID : "") ; final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ; + if (_log.isDebugEnabled()) { _log.debug("Initialising connectionURL to " + newurl) ; diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java index 417849fc5c..162099d1ee 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java @@ -21,25 +21,15 @@ package org.apache.qpid.ra.admin; import java.io.Externalizable; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInput; -import java.io.ObjectInputStream; import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.util.Hashtable; -import javax.naming.Context; -import javax.naming.Name; import javax.naming.NamingException; -import javax.naming.RefAddr; import javax.naming.Reference; import javax.naming.StringRefAddr; -import javax.naming.spi.ObjectFactory; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.ra.admin.AdminObjectFactory; public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable { @@ -101,19 +91,4 @@ public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable out.writeObject(this._url); } - //TODO move to tests - public static void main(String[] args) throws Exception - { - QpidQueueImpl q = new QpidQueueImpl(); - q.setDestinationAddress("hello.Queue;{create:always, node:{type:queue, x-declare:{auto-delete:true}}}"); - ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("queue.out")); - os.writeObject(q); - os.close(); - - - ObjectInputStream is = new ObjectInputStream(new FileInputStream("queue.out")); - q = (QpidQueueImpl)is.readObject(); - System.out.println(q); - - } } diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java index 57edec8eee..2327512a62 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java @@ -20,114 +20,28 @@ */ package org.apache.qpid.ra.inflow; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.InitialContext; import javax.resource.ResourceException; import javax.resource.spi.endpoint.MessageEndpointFactory; import javax.resource.spi.work.Work; -import javax.resource.spi.work.WorkManager; +import org.apache.qpid.ra.QpidResourceAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.XAConnectionImpl; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.ra.QpidResourceAdapter; -import org.apache.qpid.ra.Util; - /** * The activation. * */ -public class QpidActivation implements ExceptionListener +public class QpidActivation extends QpidExceptionHandler { - /** - * The logger - */ private static final Logger _log = LoggerFactory.getLogger(QpidActivation.class); - /** - * The onMessage method - */ - public static final Method ONMESSAGE; - - /** - * The resource adapter - */ - private final QpidResourceAdapter _ra; - - /** - * The activation spec - */ - private final QpidActivationSpec _spec; - - /** - * The message endpoint factory - */ - private final MessageEndpointFactory _endpointFactory; - - /** - * Whether delivery is active - */ - private final AtomicBoolean _deliveryActive = new AtomicBoolean(false); - - /** - * The destination type - */ - private boolean _isTopic = false; - - /** - * Is the delivery transacted - */ - private boolean _isDeliveryTransacted; - - private Destination _destination; - - /** - * The connection - */ - private Connection _connection; - private final List<QpidMessageHandler> _handlers = new ArrayList<QpidMessageHandler>(); - private AMQConnectionFactory _factory; - - // Whether we are in the failure recovery loop - private AtomicBoolean _inFailure = new AtomicBoolean(false); - - //Whether or not we have completed activating - private AtomicBoolean _activated = new AtomicBoolean(false); - - static - { - try - { - ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class }); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - + /** * Constructor * @@ -140,97 +54,8 @@ public class QpidActivation implements ExceptionListener final MessageEndpointFactory endpointFactory, final QpidActivationSpec spec) throws ResourceException { - if (_log.isTraceEnabled()) - { - _log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")"); - } - - this._ra = ra; - this._endpointFactory = endpointFactory; - this._spec = spec; - try - { - _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE); - } - catch (Exception e) - { - throw new ResourceException(e); - } - } - - /** - * Get the activation spec - * - * @return The value - */ - public QpidActivationSpec getActivationSpec() - { - if (_log.isTraceEnabled()) - { - _log.trace("getActivationSpec()"); - } - - return _spec; - } - - /** - * Get the message endpoint factory - * - * @return The value - */ - public MessageEndpointFactory getMessageEndpointFactory() - { - if (_log.isTraceEnabled()) - { - _log.trace("getMessageEndpointFactory()"); - } - - return _endpointFactory; - } - - /** - * Get whether delivery is transacted - * - * @return The value - */ - public boolean isDeliveryTransacted() - { - if (_log.isTraceEnabled()) - { - _log.trace("isDeliveryTransacted()"); - } - - return _isDeliveryTransacted; - } - - /** - * Get the work manager - * - * @return The value - */ - public WorkManager getWorkManager() - { - if (_log.isTraceEnabled()) - { - _log.trace("getWorkManager()"); - } - - return _ra.getWorkManager(); - } - - /** - * Is the destination a topic - * - * @return The value - */ - public boolean isTopic() - { - if (_log.isTraceEnabled()) - { - _log.trace("isTopic()"); - } - - return _isTopic; + super(ra, spec, endpointFactory); + } /** @@ -267,70 +92,62 @@ public class QpidActivation implements ExceptionListener * * @throws Exception Thrown if an error occurs */ - protected synchronized void setup() throws Exception + public synchronized void setup() throws Exception { _log.debug("Setting up " + _spec); - setupCF(); - + setupCF(); setupDestination(); - final AMQConnection amqConnection ; - final boolean useLocalTx = _spec.isUseLocalTx() ; - final boolean isXA = _isDeliveryTransacted && !useLocalTx ; - - if (isXA) + + if(!_spec.isUseConnectionPerHandler()) { - amqConnection = (XAConnectionImpl)_factory.createXAConnection() ; + setupConnection(); + _connection.setExceptionListener(this); } - else - { - amqConnection = (AMQConnection)_factory.createConnection() ; - } - - amqConnection.setExceptionListener(this) ; - + for (int i = 0; i < _spec.getMaxSession(); i++) { - Session session = null; - - try - { - if (isXA) - { - session = _ra.createXASession((XAConnectionImpl)amqConnection) ; - } - else - { - session = _ra.createSession((AMQConnection)amqConnection, - _spec.getAcknowledgeModeInt(), - useLocalTx, - _spec.getPrefetchLow(), - _spec.getPrefetchHigh()); - } - - _log.debug("Using session " + Util.asString(session)); - QpidMessageHandler handler = new QpidMessageHandler(this, _ra.getTM(), session); - handler.setup(); - _handlers.add(handler); - } - catch (Exception e) - { - try - { - amqConnection.close() ; - } - catch (Exception e2) - { - _log.trace("Ignored error closing connection", e2); - } - - throw e; - } + try + { + QpidMessageHandler handler = null; + + if(_spec.isUseConnectionPerHandler()) + { + handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM()); + } + else + { + handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM(), _connection); + } + + handler.start(); + _handlers.add(handler); + } + catch(Exception e) + { + try + { + if(_connection != null) + { + this._connection.close(); + } + } + catch (Exception e2) + { + _log.trace("Ignored error closing connection", e2); + } + + throw e; + + } + + } + + if(!_spec.isUseConnectionPerHandler()) + { + this._connection.start(); + _activated.set(true); } - amqConnection.start() ; - this._connection = amqConnection ; - _activated.set(true); - _log.debug("Setup complete " + this); } @@ -340,136 +157,17 @@ public class QpidActivation implements ExceptionListener protected synchronized void teardown() { _log.debug("Tearing down " + _spec); - - try - { - if (_connection != null) - { - _connection.stop(); - } - } - catch (Throwable t) - { - _log.debug("Error stopping connection " + Util.asString(_connection), t); - } + + super.teardown(); for (QpidMessageHandler handler : _handlers) { - handler.teardown(); + handler.stop(); } - try - { - if (_connection != null) - { - _connection.close(); - } - } - catch (Throwable t) - { - _log.debug("Error closing connection " + Util.asString(_connection), t); - } - if (_spec.isHasBeenUpdated()) - { - _factory = null; - } _log.debug("Tearing down complete " + this); } - protected void setupCF() throws Exception - { - if (_spec.isHasBeenUpdated()) - { - _factory = _ra.createAMQConnectionFactory(_spec); - } - else - { - _factory = _ra.getDefaultAMQConnectionFactory(); - } - } - - public Destination getDestination() - { - return _destination; - } - - protected void setupDestination() throws Exception - { - - String destinationName = _spec.getDestination(); - String destinationTypeString = _spec.getDestinationType(); - - if (_spec.isUseJNDI()) - { - Context ctx = new InitialContext(); - _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec); - if (_log.isTraceEnabled()) - { - _log.trace("setupDestination(" + ctx + ")"); - } - - if (destinationTypeString != null && !destinationTypeString.trim().equals("")) - { - _log.debug("Destination type defined as " + destinationTypeString); - - Class<? extends Destination> destinationType; - if (Topic.class.getName().equals(destinationTypeString)) - { - destinationType = Topic.class; - _isTopic = true; - } - else - { - destinationType = Queue.class; - } - - _log.debug("Retrieving destination " + destinationName + - " of type " + - destinationType.getName()); - _destination = Util.lookup(ctx, destinationName, destinationType); - - } - else - { - _log.debug("Destination type not defined"); - _log.debug("Retrieving destination " + destinationName + - " of type " + - Destination.class.getName()); - - _destination = Util.lookup(ctx, destinationName, AMQDestination.class); - _isTopic = !(_destination instanceof Queue) ; - } - } - else - { - _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination()); - if (destinationTypeString != null && !destinationTypeString.trim().equals("")) - { - _log.debug("Destination type defined as " + destinationTypeString); - final boolean match ; - if (Topic.class.getName().equals(destinationTypeString)) - { - match = (_destination instanceof Topic) ; - _isTopic = true; - } - else - { - match = (_destination instanceof Queue) ; - } - if (!match) - { - throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ; - } - } - else - { - _isTopic = !(_destination instanceof Queue) ; - } - } - - _log.debug("Got destination " + _destination + " from " + destinationName); - } - /** * Get a string representation * @@ -492,94 +190,7 @@ public class QpidActivation implements ExceptionListener return buffer.toString(); } - public void onException(final JMSException jmse) - { - if(_activated.get()) - { - handleFailure(jmse) ; - } - else - { - _log.warn("Received JMSException: " + jmse + " while endpoint was not activated."); - } - } - - /** - * Handles any failure by trying to reconnect - * - * @param failure the reason for the failure - */ - public void handleFailure(Throwable failure) - { - if(doesNotExist(failure)) - { - _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination()); - } - else - { - _log.warn("Failure in Qpid activation " + _spec, failure); - } - int reconnectCount = 0; - int setupAttempts = _spec.getSetupAttempts(); - long setupInterval = _spec.getSetupInterval(); - - // Only enter the failure loop once - if (_inFailure.getAndSet(true)) - return; - try - { - while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts)) - { - teardown(); - - try - { - Thread.sleep(setupInterval); - } - catch (InterruptedException e) - { - _log.debug("Interrupted trying to reconnect " + _spec, e); - break; - } - - _log.info("Attempting to reconnect " + _spec); - try - { - setup(); - _log.info("Reconnected with Qpid"); - break; - } - catch (Throwable t) - { - if(doesNotExist(failure)) - { - _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination()); - } - else - { - _log.error("Unable to reconnect " + _spec, t); - } - } - ++reconnectCount; - } - } - finally - { - // Leaving failure recovery loop - _inFailure.set(false); - } - } - - /** - * Check to see if the failure represents a missing endpoint - * @param failure The failure. - * @return true if it represents a missing endpoint, false otherwise - */ - private boolean doesNotExist(final Throwable failure) - { - return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ; - } - + /** * Handles the setup */ diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java index 5f4e2dcf6b..3d9a88adb5 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java @@ -28,10 +28,10 @@ import javax.resource.spi.ActivationSpec; import javax.resource.spi.InvalidPropertyException; import javax.resource.spi.ResourceAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.qpid.ra.ConnectionFactoryProperties; import org.apache.qpid.ra.QpidResourceAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The activation spec @@ -88,6 +88,8 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A // undefined by default, default is specified at the RA level in QpidRAProperties private Long _setupInterval; + private Boolean _useConnectionPerHandler; + /** * Constructor */ @@ -544,6 +546,16 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A this._setupInterval = setupInterval; } + public Boolean isUseConnectionPerHandler() + { + return (_useConnectionPerHandler == null) ? _ra.isUseConnectionPerHandler() : _useConnectionPerHandler; + } + + public void setUseConnectionPerHandler(Boolean connectionPerHandler) + { + this._useConnectionPerHandler = connectionPerHandler; + } + /** * Validate * @exception InvalidPropertyException Thrown if a validation exception occurs @@ -561,6 +573,7 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A } } + /** * Get a string representation * @return The value @@ -573,23 +586,30 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A buffer.append("ra=").append(_ra); buffer.append(" destination=").append(_destination); buffer.append(" destinationType=").append(_destinationType); + if (_messageSelector != null) { buffer.append(" selector=").append(_messageSelector); } + buffer.append(" ack=").append(getAcknowledgeMode()); buffer.append(" durable=").append(_subscriptionDurability); buffer.append(" clientID=").append(getClientId()); + if (_subscriptionName != null) { buffer.append(" subscription=").append(_subscriptionName); } + buffer.append(" user=").append(getUserName()); + if (getPassword() != null) { - buffer.append(" password=").append("****"); + buffer.append(" password=").append("********"); } + buffer.append(" maxSession=").append(_maxSession); + if (_prefetchLow != null) { buffer.append(" prefetchLow=").append(_prefetchLow); @@ -598,7 +618,10 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A { buffer.append(" prefetchHigh=").append(_prefetchHigh); } + + buffer.append(" connectionPerHandler=").append(isUseConnectionPerHandler()); buffer.append(')'); + return buffer.toString(); } } diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java new file mode 100644 index 0000000000..8775362eb5 --- /dev/null +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java @@ -0,0 +1,339 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ra.inflow; + +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.jms.XAConnectionFactory; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.resource.ResourceException; +import javax.resource.spi.endpoint.MessageEndpointFactory; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.ra.QpidResourceAdapter; +import org.apache.qpid.ra.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class QpidExceptionHandler implements ExceptionListener +{ + private static final Logger _log = LoggerFactory.getLogger(QpidExceptionHandler.class); + + public static final Method ONMESSAGE; + + protected final MessageEndpointFactory _endpointFactory; + + protected Connection _connection; + + protected ConnectionFactory _factory; + + protected Destination _destination; + + protected final QpidResourceAdapter _ra; + + protected final QpidActivationSpec _spec; + + protected boolean _isDeliveryTransacted; + + protected final AtomicBoolean _deliveryActive = new AtomicBoolean(false); + + protected boolean _isTopic = false; + + // Whether we are in the failure recovery loop + protected AtomicBoolean _inFailure = new AtomicBoolean(false); + + //Whether or not we have completed activating + protected AtomicBoolean _activated = new AtomicBoolean(false); + + static + { + try + { + ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class }); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public abstract void setup() throws Exception; + public abstract void start() throws Exception; + public abstract void stop(); + + protected QpidExceptionHandler(QpidResourceAdapter ra, + QpidActivationSpec spec, + MessageEndpointFactory endpointFactory) throws ResourceException + { + this._ra = ra; + this._spec = spec; + this._endpointFactory = endpointFactory; + + try + { + _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE); + } + catch (Exception e) + { + throw new ResourceException(e); + } + + + } + + public void onException(JMSException e) + { + if(_activated.get()) + { + handleFailure(e) ; + } + else + { + _log.warn("Received JMSException: " + e + " while endpoint was not activated."); + } + } + + /** + * Handles any failure by trying to reconnect + * + * @param failure the reason for the failure + */ + public void handleFailure(Throwable failure) + { + if(doesNotExist(failure)) + { + _log.info("awaiting topic/queue creation " + _spec.getDestination()); + } + else + { + _log.warn("Failure in Qpid activation " + _spec, failure); + } + int reconnectCount = 0; + int setupAttempts = _spec.getSetupAttempts(); + long setupInterval = _spec.getSetupInterval(); + + // Only enter the failure loop once + if (_inFailure.getAndSet(true)) + return; + try + { + while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts)) + { + teardown(); + + try + { + Thread.sleep(setupInterval); + } + catch (InterruptedException e) + { + _log.debug("Interrupted trying to reconnect " + _spec, e); + break; + } + + _log.info("Attempting to reconnect " + _spec); + try + { + setup(); + _log.info("Reconnected with Qpid"); + break; + } + catch (Throwable t) + { + if(doesNotExist(failure)) + { + _log.info("awaiting topic/queue creation " + _spec.getDestination()); + } + else + { + _log.error("Unable to reconnect " + _spec, t); + } + } + ++reconnectCount; + } + } + finally + { + // Leaving failure recovery loop + _inFailure.set(false); + } + } + + /** + * Check to see if the failure represents a missing endpoint + * @param failure The failure. + * @return true if it represents a missing endpoint, false otherwise + */ + protected boolean doesNotExist(final Throwable failure) + { + return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ; + } + + protected boolean isXA() + { + return _isDeliveryTransacted && !_spec.isUseLocalTx(); + } + + protected void setupConnection() throws Exception + { + this._connection = (isXA()) ? ((XAConnectionFactory)_factory).createXAConnection() : _factory.createConnection(); + } + + protected synchronized void teardown() + { + _log.debug("Tearing down " + _spec); + + try + { + if (_connection != null) + { + _connection.stop(); + } + } + catch (Throwable t) + { + _log.debug("Error stopping connection " + Util.asString(_connection), t); + } + + try + { + if (_connection != null) + { + _connection.close(); + } + } + catch (Throwable t) + { + _log.debug("Error closing connection " + Util.asString(_connection), t); + } + if (_spec.isHasBeenUpdated()) + { + _factory = null; + } + _log.debug("Tearing down complete " + this); + } + + protected void setupCF() throws Exception + { + if (_spec.isHasBeenUpdated()) + { + _factory = _ra.createAMQConnectionFactory(_spec); + } + else + { + _factory = _ra.getDefaultAMQConnectionFactory(); + } + } + + protected void setupDestination() throws Exception + { + + String destinationName = _spec.getDestination(); + String destinationTypeString = _spec.getDestinationType(); + + if (_spec.isUseJNDI()) + { + Context ctx = new InitialContext(); + _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec); + if (_log.isTraceEnabled()) + { + _log.trace("setupDestination(" + ctx + ")"); + } + + if (destinationTypeString != null && !destinationTypeString.trim().equals("")) + { + _log.debug("Destination type defined as " + destinationTypeString); + + Class<? extends Destination> destinationType; + if (Topic.class.getName().equals(destinationTypeString)) + { + destinationType = Topic.class; + _isTopic = true; + } + else + { + destinationType = Queue.class; + } + + _log.debug("Retrieving destination " + destinationName + + " of type " + + destinationType.getName()); + _destination = Util.lookup(ctx, destinationName, destinationType); + + } + else + { + _log.debug("Destination type not defined"); + _log.debug("Retrieving destination " + destinationName + + " of type " + + Destination.class.getName()); + + _destination = Util.lookup(ctx, destinationName, AMQDestination.class); + _isTopic = !(_destination instanceof Queue) ; + } + } + else + { + _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination()); + + if (destinationTypeString != null && !destinationTypeString.trim().equals("")) + { + _log.debug("Destination type defined as " + destinationTypeString); + final boolean match ; + if (Topic.class.getName().equals(destinationTypeString)) + { + match = (_destination instanceof Topic) ; + _isTopic = true; + } + else + { + match = (_destination instanceof Queue) ; + } + if (!match) + { + throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ; + } + } + else + { + _isTopic = !(_destination instanceof Queue) ; + } + } + + _log.debug("Got destination " + _destination + " from " + destinationName); + } + + + +} diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java index 473efab31f..a02adf0dad 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.ra.inflow; +import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -35,6 +36,9 @@ import javax.transaction.Status; import javax.transaction.TransactionManager; import javax.transaction.xa.XAResource; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.XAConnectionImpl; +import org.apache.qpid.ra.QpidResourceAdapter; import org.apache.qpid.ra.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,82 +47,100 @@ import org.slf4j.LoggerFactory; * The message handler * */ -public class QpidMessageHandler implements MessageListener +public class QpidMessageHandler extends QpidExceptionHandler implements MessageListener { - /** - * The logger - */ private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class); - /** - * The session - */ - private final Session _session; - private MessageConsumer _consumer; - /** - * The endpoint - */ private MessageEndpoint _endpoint; - private final QpidActivation _activation; - - private boolean _useLocalTx; - - private boolean _transacted; + private Session _session; private final TransactionManager _tm; - public QpidMessageHandler(final QpidActivation activation, - final TransactionManager tm, - final Session session) + public QpidMessageHandler(final QpidResourceAdapter ra, + final QpidActivationSpec spec, + final MessageEndpointFactory endpointFactory, + final TransactionManager tm, + final Connection connection) throws ResourceException { - this._activation = activation; - this._session = session; + super(ra, spec, endpointFactory); this._tm = tm; + this._connection = connection; } - + + public QpidMessageHandler(final QpidResourceAdapter ra, + final QpidActivationSpec spec, + final MessageEndpointFactory endpointFactory, + final TransactionManager tm) throws ResourceException + { + super(ra, spec, endpointFactory); + this._tm = tm; + } + public void setup() throws Exception { if (_log.isTraceEnabled()) { _log.trace("setup()"); } - - QpidActivationSpec spec = _activation.getActivationSpec(); - String selector = spec.getMessageSelector(); - + + setupCF(); + setupDestination(); + String selector = _spec.getMessageSelector(); + + if(_spec.isUseConnectionPerHandler()) + { + setupConnection(); + _connection.setExceptionListener(this); + } + + if(isXA()) + { + _session = _ra.createXASession((XAConnectionImpl)_connection); + } + else + { + _session = _ra.createSession((AMQConnection)_connection, + _spec.getAcknowledgeModeInt(), + _spec.isUseLocalTx(), + _spec.getPrefetchLow(), + _spec.getPrefetchHigh()); + } // Create the message consumer - if (_activation.isTopic()) + if (_isTopic) { - final Topic topic = (Topic) _activation.getDestination(); - final String subscriptionName = spec.getSubscriptionName(); - if (spec.isSubscriptionDurable()) - _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false); + final Topic topic = (Topic) _destination; + final String subscriptionName = _spec.getSubscriptionName(); + + if (_spec.isSubscriptionDurable()) + { + _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false); + } else - _consumer = _session.createConsumer(topic, selector) ; + { + _consumer = _session.createConsumer(topic, selector) ; + } } else { - final Queue queue = (Queue) _activation.getDestination(); + final Queue queue = (Queue) _destination; _consumer = _session.createConsumer(queue, selector); } - // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX - MessageEndpointFactory endpointFactory = _activation.getMessageEndpointFactory(); - _useLocalTx = _activation.getActivationSpec().isUseLocalTx(); - _transacted = _activation.isDeliveryTransacted() || _useLocalTx ; - if (_activation.isDeliveryTransacted() && !_activation.getActivationSpec().isUseLocalTx()) + if (isXA()) { final XAResource xaResource = ((XASession)_session).getXAResource() ; - _endpoint = endpointFactory.createEndpoint(xaResource); + _endpoint = _endpointFactory.createEndpoint(xaResource); } else { - _endpoint = endpointFactory.createEndpoint(null); + _endpoint = _endpointFactory.createEndpoint(null); } _consumer.setMessageListener(this); + _connection.start(); + _activated.set(true); } /** @@ -126,11 +148,13 @@ public class QpidMessageHandler implements MessageListener */ public void teardown() { - if (_log.isTraceEnabled()) - { - _log.trace("teardown()"); - } + if (_log.isTraceEnabled()) + { + _log.trace("teardown()"); + } + super.teardown(); + try { if (_endpoint != null) @@ -156,27 +180,28 @@ public class QpidMessageHandler implements MessageListener try { - if (_activation.getActivationSpec().getTransactionTimeout() > 0 && _tm != null) + if (_spec.getTransactionTimeout() > 0 && _tm != null) { - _tm.setTransactionTimeout(_activation.getActivationSpec().getTransactionTimeout()); + _tm.setTransactionTimeout(_spec.getTransactionTimeout()); } _endpoint.beforeDelivery(QpidActivation.ONMESSAGE); beforeDelivery = true; - if(_transacted) + if(isXA()) { message.acknowledge(); } ((MessageListener)_endpoint).onMessage(message); - if (_transacted && (_tm.getTransaction() != null)) + if (isXA() && (_tm.getTransaction() != null)) { final int status = _tm.getStatus() ; final boolean rollback = status == Status.STATUS_MARKED_ROLLBACK || status == Status.STATUS_ROLLING_BACK || status == Status.STATUS_ROLLEDBACK; + if (rollback) { _session.recover() ; @@ -196,7 +221,7 @@ public class QpidMessageHandler implements MessageListener _log.warn("Unable to call after delivery", e); return; } - if (_useLocalTx) + if (!isXA() && _spec.isUseLocalTx()) { _session.commit(); } @@ -216,7 +241,7 @@ public class QpidMessageHandler implements MessageListener _log.warn("Unable to call after delivery", e); } } - if (_useLocalTx || !_activation.isDeliveryTransacted()) + if (!isXA() && _spec.isUseLocalTx()) { try { @@ -241,5 +266,17 @@ public class QpidMessageHandler implements MessageListener } } + + public void start() throws Exception + { + _deliveryActive.set(true); + setup(); + } + + public void stop() + { + _deliveryActive.set(false); + teardown(); + } } diff --git a/qpid/java/jca/src/main/resources/META-INF/ra.xml b/qpid/java/jca/src/main/resources/META-INF/ra.xml index 01f5eceecf..a9374f52d7 100755 --- a/qpid/java/jca/src/main/resources/META-INF/ra.xml +++ b/qpid/java/jca/src/main/resources/META-INF/ra.xml @@ -109,6 +109,13 @@ <config-property-type>java.lang.String</config-property-type> <config-property-value>amqp://anonymous:passwd@client/test?brokerlist='tcp://localhost?sasl_mechs='PLAIN''</config-property-value> </config-property> + + <config-property> + <description>Use a JMS Connection per MessageHandler</description> + <config-property-name>UseConnectionPerHandler</config-property-name> + <config-property-type>java.lang.Boolean</config-property-type> + <config-property-value>true</config-property-value> + </config-property> <outbound-resourceadapter> <connection-definition> |
