summaryrefslogtreecommitdiff
path: root/qpid/java/jca/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/jca/src/main')
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java20
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java53
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java25
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java501
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java29
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java339
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java139
-rwxr-xr-xqpid/java/jca/src/main/resources/META-INF/ra.xml7
8 files changed, 556 insertions, 557 deletions
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
index 1ded1a8db2..69320575b0 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
@@ -31,10 +31,8 @@ import org.slf4j.LoggerFactory;
*/
public class QpidRAProperties extends ConnectionFactoryProperties implements Serializable
{
- /** Serial version UID */
private static final long serialVersionUID = -4823893873707374791L;
- /** The logger */
private static final Logger _log = LoggerFactory.getLogger(QpidRAProperties.class);
private static final int DEFAULT_SETUP_ATTEMPTS = 10;
@@ -45,16 +43,14 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser
private long _setupInterval = DEFAULT_SETUP_INTERVAL;
- /** Use Local TX instead of XA */
- private Boolean _localTx = false;
-
/** Class used to locate the Transaction Manager. */
private String _transactionManagerLocatorClass ;
/** Method used to locate the TM */
private String _transactionManagerLocatorMethod ;
-
+ private boolean _useConnectionPerHandler = true;
+
/**
* Constructor
*/
@@ -146,10 +142,20 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser
this._setupInterval = setupInterval;
}
+ public boolean isUseConnectionPerHandler()
+ {
+ return _useConnectionPerHandler;
+ }
+
+ public void setUseConnectionPerHandler(boolean connectionPerHandler)
+ {
+ this._useConnectionPerHandler = connectionPerHandler;
+ }
+
@Override
public String toString()
{
- return "QpidRAProperties[localTx=" + _localTx +
+ return "QpidRAProperties[" +
", transactionManagerLocatorClass=" + _transactionManagerLocatorClass +
", transactionManagerLocatorMethod=" + _transactionManagerLocatorMethod +
", setupAttempts=" + _setupAttempts +
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
index 14b5354062..96fa83ceef 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
@@ -38,8 +38,6 @@ import javax.resource.spi.work.WorkManager;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
@@ -47,6 +45,8 @@ import org.apache.qpid.client.XAConnectionImpl;
import org.apache.qpid.ra.inflow.QpidActivation;
import org.apache.qpid.ra.inflow.QpidActivationSpec;
import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The resource adapter for Qpid
@@ -54,43 +54,22 @@ import org.apache.qpid.url.URLSyntaxException;
*/
public class QpidResourceAdapter implements ResourceAdapter, Serializable
{
- /**
- *
- */
private static final long serialVersionUID = -2446231446818098726L;
- /**
- * The logger
- */
- private static final Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
+ private static final transient Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
- /**
- * The bootstrap context
- */
private BootstrapContext _ctx;
- /**
- * The resource adapter properties
- */
private final QpidRAProperties _raProperties;
- /**
- * Have the factory been configured
- */
private final AtomicBoolean _configured;
- /**
- * The activations by activation spec
- */
private final Map<ActivationSpec, QpidActivation> _activations;
private AMQConnectionFactory _defaultAMQConnectionFactory;
private TransactionManager _tm;
- /**
- * Constructor
- */
public QpidResourceAdapter()
{
if (_log.isTraceEnabled())
@@ -514,7 +493,27 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
}
_raProperties.setSetupInterval(interval);
}
+
+ public Boolean isUseConnectionPerHandler()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("isConnectionPerHandler()");
+ }
+
+ return _raProperties.isUseConnectionPerHandler();
+ }
+ public void setUseConnectionPerHandler(Boolean connectionPerHandler)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setConnectionPerHandler(" + connectionPerHandler + ")");
+ }
+
+ _raProperties.setUseConnectionPerHandler(connectionPerHandler);
+ }
+
/**
* Indicates whether some other object is "equal to" this one.
*
@@ -683,7 +682,8 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
private void locateTM() throws ResourceAdapterInternalException
{
- if(_raProperties.getTransactionManagerLocatorClass() != null && _raProperties.getTransactionManagerLocatorMethod() != null)
+ if(_raProperties.getTransactionManagerLocatorClass() != null
+ && _raProperties.getTransactionManagerLocatorMethod() != null)
{
String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";");
@@ -703,7 +703,7 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
if (_tm == null)
{
- _log.error("It wasn't possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
+ _log.error("It was not possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
throw new ResourceAdapterInternalException("Could not locate javax.transaction.TransactionManager");
}
else
@@ -763,6 +763,7 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
final String client = (clientID != null ? clientID : "") ;
final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ;
+
if (_log.isDebugEnabled())
{
_log.debug("Initialising connectionURL to " + newurl) ;
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
index 417849fc5c..162099d1ee 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
@@ -21,25 +21,15 @@
package org.apache.qpid.ra.admin;
import java.io.Externalizable;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
-import java.io.ObjectInputStream;
import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Hashtable;
-import javax.naming.Context;
-import javax.naming.Name;
import javax.naming.NamingException;
-import javax.naming.RefAddr;
import javax.naming.Reference;
import javax.naming.StringRefAddr;
-import javax.naming.spi.ObjectFactory;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.ra.admin.AdminObjectFactory;
public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable
{
@@ -101,19 +91,4 @@ public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable
out.writeObject(this._url);
}
- //TODO move to tests
- public static void main(String[] args) throws Exception
- {
- QpidQueueImpl q = new QpidQueueImpl();
- q.setDestinationAddress("hello.Queue;{create:always, node:{type:queue, x-declare:{auto-delete:true}}}");
- ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("queue.out"));
- os.writeObject(q);
- os.close();
-
-
- ObjectInputStream is = new ObjectInputStream(new FileInputStream("queue.out"));
- q = (QpidQueueImpl)is.readObject();
- System.out.println(q);
-
- }
}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
index 57edec8eee..2327512a62 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
@@ -20,114 +20,28 @@
*/
package org.apache.qpid.ra.inflow;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkManager;
+import org.apache.qpid.ra.QpidResourceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.XAConnectionImpl;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.ra.QpidResourceAdapter;
-import org.apache.qpid.ra.Util;
-
/**
* The activation.
*
*/
-public class QpidActivation implements ExceptionListener
+public class QpidActivation extends QpidExceptionHandler
{
- /**
- * The logger
- */
private static final Logger _log = LoggerFactory.getLogger(QpidActivation.class);
- /**
- * The onMessage method
- */
- public static final Method ONMESSAGE;
-
- /**
- * The resource adapter
- */
- private final QpidResourceAdapter _ra;
-
- /**
- * The activation spec
- */
- private final QpidActivationSpec _spec;
-
- /**
- * The message endpoint factory
- */
- private final MessageEndpointFactory _endpointFactory;
-
- /**
- * Whether delivery is active
- */
- private final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
-
- /**
- * The destination type
- */
- private boolean _isTopic = false;
-
- /**
- * Is the delivery transacted
- */
- private boolean _isDeliveryTransacted;
-
- private Destination _destination;
-
- /**
- * The connection
- */
- private Connection _connection;
-
private final List<QpidMessageHandler> _handlers = new ArrayList<QpidMessageHandler>();
- private AMQConnectionFactory _factory;
-
- // Whether we are in the failure recovery loop
- private AtomicBoolean _inFailure = new AtomicBoolean(false);
-
- //Whether or not we have completed activating
- private AtomicBoolean _activated = new AtomicBoolean(false);
-
- static
- {
- try
- {
- ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
+
/**
* Constructor
*
@@ -140,97 +54,8 @@ public class QpidActivation implements ExceptionListener
final MessageEndpointFactory endpointFactory,
final QpidActivationSpec spec) throws ResourceException
{
- if (_log.isTraceEnabled())
- {
- _log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")");
- }
-
- this._ra = ra;
- this._endpointFactory = endpointFactory;
- this._spec = spec;
- try
- {
- _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
- }
- catch (Exception e)
- {
- throw new ResourceException(e);
- }
- }
-
- /**
- * Get the activation spec
- *
- * @return The value
- */
- public QpidActivationSpec getActivationSpec()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getActivationSpec()");
- }
-
- return _spec;
- }
-
- /**
- * Get the message endpoint factory
- *
- * @return The value
- */
- public MessageEndpointFactory getMessageEndpointFactory()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getMessageEndpointFactory()");
- }
-
- return _endpointFactory;
- }
-
- /**
- * Get whether delivery is transacted
- *
- * @return The value
- */
- public boolean isDeliveryTransacted()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("isDeliveryTransacted()");
- }
-
- return _isDeliveryTransacted;
- }
-
- /**
- * Get the work manager
- *
- * @return The value
- */
- public WorkManager getWorkManager()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getWorkManager()");
- }
-
- return _ra.getWorkManager();
- }
-
- /**
- * Is the destination a topic
- *
- * @return The value
- */
- public boolean isTopic()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("isTopic()");
- }
-
- return _isTopic;
+ super(ra, spec, endpointFactory);
+
}
/**
@@ -267,70 +92,62 @@ public class QpidActivation implements ExceptionListener
*
* @throws Exception Thrown if an error occurs
*/
- protected synchronized void setup() throws Exception
+ public synchronized void setup() throws Exception
{
_log.debug("Setting up " + _spec);
- setupCF();
-
+ setupCF();
setupDestination();
- final AMQConnection amqConnection ;
- final boolean useLocalTx = _spec.isUseLocalTx() ;
- final boolean isXA = _isDeliveryTransacted && !useLocalTx ;
-
- if (isXA)
+
+ if(!_spec.isUseConnectionPerHandler())
{
- amqConnection = (XAConnectionImpl)_factory.createXAConnection() ;
+ setupConnection();
+ _connection.setExceptionListener(this);
}
- else
- {
- amqConnection = (AMQConnection)_factory.createConnection() ;
- }
-
- amqConnection.setExceptionListener(this) ;
-
+
for (int i = 0; i < _spec.getMaxSession(); i++)
{
- Session session = null;
-
- try
- {
- if (isXA)
- {
- session = _ra.createXASession((XAConnectionImpl)amqConnection) ;
- }
- else
- {
- session = _ra.createSession((AMQConnection)amqConnection,
- _spec.getAcknowledgeModeInt(),
- useLocalTx,
- _spec.getPrefetchLow(),
- _spec.getPrefetchHigh());
- }
-
- _log.debug("Using session " + Util.asString(session));
- QpidMessageHandler handler = new QpidMessageHandler(this, _ra.getTM(), session);
- handler.setup();
- _handlers.add(handler);
- }
- catch (Exception e)
- {
- try
- {
- amqConnection.close() ;
- }
- catch (Exception e2)
- {
- _log.trace("Ignored error closing connection", e2);
- }
-
- throw e;
- }
+ try
+ {
+ QpidMessageHandler handler = null;
+
+ if(_spec.isUseConnectionPerHandler())
+ {
+ handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM());
+ }
+ else
+ {
+ handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM(), _connection);
+ }
+
+ handler.start();
+ _handlers.add(handler);
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ if(_connection != null)
+ {
+ this._connection.close();
+ }
+ }
+ catch (Exception e2)
+ {
+ _log.trace("Ignored error closing connection", e2);
+ }
+
+ throw e;
+
+ }
+
+ }
+
+ if(!_spec.isUseConnectionPerHandler())
+ {
+ this._connection.start();
+ _activated.set(true);
}
- amqConnection.start() ;
- this._connection = amqConnection ;
- _activated.set(true);
-
_log.debug("Setup complete " + this);
}
@@ -340,136 +157,17 @@ public class QpidActivation implements ExceptionListener
protected synchronized void teardown()
{
_log.debug("Tearing down " + _spec);
-
- try
- {
- if (_connection != null)
- {
- _connection.stop();
- }
- }
- catch (Throwable t)
- {
- _log.debug("Error stopping connection " + Util.asString(_connection), t);
- }
+
+ super.teardown();
for (QpidMessageHandler handler : _handlers)
{
- handler.teardown();
+ handler.stop();
}
- try
- {
- if (_connection != null)
- {
- _connection.close();
- }
- }
- catch (Throwable t)
- {
- _log.debug("Error closing connection " + Util.asString(_connection), t);
- }
- if (_spec.isHasBeenUpdated())
- {
- _factory = null;
- }
_log.debug("Tearing down complete " + this);
}
- protected void setupCF() throws Exception
- {
- if (_spec.isHasBeenUpdated())
- {
- _factory = _ra.createAMQConnectionFactory(_spec);
- }
- else
- {
- _factory = _ra.getDefaultAMQConnectionFactory();
- }
- }
-
- public Destination getDestination()
- {
- return _destination;
- }
-
- protected void setupDestination() throws Exception
- {
-
- String destinationName = _spec.getDestination();
- String destinationTypeString = _spec.getDestinationType();
-
- if (_spec.isUseJNDI())
- {
- Context ctx = new InitialContext();
- _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
- if (_log.isTraceEnabled())
- {
- _log.trace("setupDestination(" + ctx + ")");
- }
-
- if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
- {
- _log.debug("Destination type defined as " + destinationTypeString);
-
- Class<? extends Destination> destinationType;
- if (Topic.class.getName().equals(destinationTypeString))
- {
- destinationType = Topic.class;
- _isTopic = true;
- }
- else
- {
- destinationType = Queue.class;
- }
-
- _log.debug("Retrieving destination " + destinationName +
- " of type " +
- destinationType.getName());
- _destination = Util.lookup(ctx, destinationName, destinationType);
-
- }
- else
- {
- _log.debug("Destination type not defined");
- _log.debug("Retrieving destination " + destinationName +
- " of type " +
- Destination.class.getName());
-
- _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
- _isTopic = !(_destination instanceof Queue) ;
- }
- }
- else
- {
- _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
- if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
- {
- _log.debug("Destination type defined as " + destinationTypeString);
- final boolean match ;
- if (Topic.class.getName().equals(destinationTypeString))
- {
- match = (_destination instanceof Topic) ;
- _isTopic = true;
- }
- else
- {
- match = (_destination instanceof Queue) ;
- }
- if (!match)
- {
- throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
- }
- }
- else
- {
- _isTopic = !(_destination instanceof Queue) ;
- }
- }
-
- _log.debug("Got destination " + _destination + " from " + destinationName);
- }
-
/**
* Get a string representation
*
@@ -492,94 +190,7 @@ public class QpidActivation implements ExceptionListener
return buffer.toString();
}
- public void onException(final JMSException jmse)
- {
- if(_activated.get())
- {
- handleFailure(jmse) ;
- }
- else
- {
- _log.warn("Received JMSException: " + jmse + " while endpoint was not activated.");
- }
- }
-
- /**
- * Handles any failure by trying to reconnect
- *
- * @param failure the reason for the failure
- */
- public void handleFailure(Throwable failure)
- {
- if(doesNotExist(failure))
- {
- _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
- }
- else
- {
- _log.warn("Failure in Qpid activation " + _spec, failure);
- }
- int reconnectCount = 0;
- int setupAttempts = _spec.getSetupAttempts();
- long setupInterval = _spec.getSetupInterval();
-
- // Only enter the failure loop once
- if (_inFailure.getAndSet(true))
- return;
- try
- {
- while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
- {
- teardown();
-
- try
- {
- Thread.sleep(setupInterval);
- }
- catch (InterruptedException e)
- {
- _log.debug("Interrupted trying to reconnect " + _spec, e);
- break;
- }
-
- _log.info("Attempting to reconnect " + _spec);
- try
- {
- setup();
- _log.info("Reconnected with Qpid");
- break;
- }
- catch (Throwable t)
- {
- if(doesNotExist(failure))
- {
- _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
- }
- else
- {
- _log.error("Unable to reconnect " + _spec, t);
- }
- }
- ++reconnectCount;
- }
- }
- finally
- {
- // Leaving failure recovery loop
- _inFailure.set(false);
- }
- }
-
- /**
- * Check to see if the failure represents a missing endpoint
- * @param failure The failure.
- * @return true if it represents a missing endpoint, false otherwise
- */
- private boolean doesNotExist(final Throwable failure)
- {
- return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
- }
-
+
/**
* Handles the setup
*/
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
index 5f4e2dcf6b..3d9a88adb5 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
@@ -28,10 +28,10 @@ import javax.resource.spi.ActivationSpec;
import javax.resource.spi.InvalidPropertyException;
import javax.resource.spi.ResourceAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.qpid.ra.ConnectionFactoryProperties;
import org.apache.qpid.ra.QpidResourceAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The activation spec
@@ -88,6 +88,8 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
// undefined by default, default is specified at the RA level in QpidRAProperties
private Long _setupInterval;
+ private Boolean _useConnectionPerHandler;
+
/**
* Constructor
*/
@@ -544,6 +546,16 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
this._setupInterval = setupInterval;
}
+ public Boolean isUseConnectionPerHandler()
+ {
+ return (_useConnectionPerHandler == null) ? _ra.isUseConnectionPerHandler() : _useConnectionPerHandler;
+ }
+
+ public void setUseConnectionPerHandler(Boolean connectionPerHandler)
+ {
+ this._useConnectionPerHandler = connectionPerHandler;
+ }
+
/**
* Validate
* @exception InvalidPropertyException Thrown if a validation exception occurs
@@ -561,6 +573,7 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
}
}
+
/**
* Get a string representation
* @return The value
@@ -573,23 +586,30 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
buffer.append("ra=").append(_ra);
buffer.append(" destination=").append(_destination);
buffer.append(" destinationType=").append(_destinationType);
+
if (_messageSelector != null)
{
buffer.append(" selector=").append(_messageSelector);
}
+
buffer.append(" ack=").append(getAcknowledgeMode());
buffer.append(" durable=").append(_subscriptionDurability);
buffer.append(" clientID=").append(getClientId());
+
if (_subscriptionName != null)
{
buffer.append(" subscription=").append(_subscriptionName);
}
+
buffer.append(" user=").append(getUserName());
+
if (getPassword() != null)
{
- buffer.append(" password=").append("****");
+ buffer.append(" password=").append("********");
}
+
buffer.append(" maxSession=").append(_maxSession);
+
if (_prefetchLow != null)
{
buffer.append(" prefetchLow=").append(_prefetchLow);
@@ -598,7 +618,10 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
{
buffer.append(" prefetchHigh=").append(_prefetchHigh);
}
+
+ buffer.append(" connectionPerHandler=").append(isUseConnectionPerHandler());
buffer.append(')');
+
return buffer.toString();
}
}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
new file mode 100644
index 0000000000..8775362eb5
--- /dev/null
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
@@ -0,0 +1,339 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.jms.XAConnectionFactory;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ra.QpidResourceAdapter;
+import org.apache.qpid.ra.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class QpidExceptionHandler implements ExceptionListener
+{
+ private static final Logger _log = LoggerFactory.getLogger(QpidExceptionHandler.class);
+
+ public static final Method ONMESSAGE;
+
+ protected final MessageEndpointFactory _endpointFactory;
+
+ protected Connection _connection;
+
+ protected ConnectionFactory _factory;
+
+ protected Destination _destination;
+
+ protected final QpidResourceAdapter _ra;
+
+ protected final QpidActivationSpec _spec;
+
+ protected boolean _isDeliveryTransacted;
+
+ protected final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
+
+ protected boolean _isTopic = false;
+
+ // Whether we are in the failure recovery loop
+ protected AtomicBoolean _inFailure = new AtomicBoolean(false);
+
+ //Whether or not we have completed activating
+ protected AtomicBoolean _activated = new AtomicBoolean(false);
+
+ static
+ {
+ try
+ {
+ ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public abstract void setup() throws Exception;
+ public abstract void start() throws Exception;
+ public abstract void stop();
+
+ protected QpidExceptionHandler(QpidResourceAdapter ra,
+ QpidActivationSpec spec,
+ MessageEndpointFactory endpointFactory) throws ResourceException
+ {
+ this._ra = ra;
+ this._spec = spec;
+ this._endpointFactory = endpointFactory;
+
+ try
+ {
+ _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
+ }
+ catch (Exception e)
+ {
+ throw new ResourceException(e);
+ }
+
+
+ }
+
+ public void onException(JMSException e)
+ {
+ if(_activated.get())
+ {
+ handleFailure(e) ;
+ }
+ else
+ {
+ _log.warn("Received JMSException: " + e + " while endpoint was not activated.");
+ }
+ }
+
+ /**
+ * Handles any failure by trying to reconnect
+ *
+ * @param failure the reason for the failure
+ */
+ public void handleFailure(Throwable failure)
+ {
+ if(doesNotExist(failure))
+ {
+ _log.info("awaiting topic/queue creation " + _spec.getDestination());
+ }
+ else
+ {
+ _log.warn("Failure in Qpid activation " + _spec, failure);
+ }
+ int reconnectCount = 0;
+ int setupAttempts = _spec.getSetupAttempts();
+ long setupInterval = _spec.getSetupInterval();
+
+ // Only enter the failure loop once
+ if (_inFailure.getAndSet(true))
+ return;
+ try
+ {
+ while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
+ {
+ teardown();
+
+ try
+ {
+ Thread.sleep(setupInterval);
+ }
+ catch (InterruptedException e)
+ {
+ _log.debug("Interrupted trying to reconnect " + _spec, e);
+ break;
+ }
+
+ _log.info("Attempting to reconnect " + _spec);
+ try
+ {
+ setup();
+ _log.info("Reconnected with Qpid");
+ break;
+ }
+ catch (Throwable t)
+ {
+ if(doesNotExist(failure))
+ {
+ _log.info("awaiting topic/queue creation " + _spec.getDestination());
+ }
+ else
+ {
+ _log.error("Unable to reconnect " + _spec, t);
+ }
+ }
+ ++reconnectCount;
+ }
+ }
+ finally
+ {
+ // Leaving failure recovery loop
+ _inFailure.set(false);
+ }
+ }
+
+ /**
+ * Check to see if the failure represents a missing endpoint
+ * @param failure The failure.
+ * @return true if it represents a missing endpoint, false otherwise
+ */
+ protected boolean doesNotExist(final Throwable failure)
+ {
+ return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
+ }
+
+ protected boolean isXA()
+ {
+ return _isDeliveryTransacted && !_spec.isUseLocalTx();
+ }
+
+ protected void setupConnection() throws Exception
+ {
+ this._connection = (isXA()) ? ((XAConnectionFactory)_factory).createXAConnection() : _factory.createConnection();
+ }
+
+ protected synchronized void teardown()
+ {
+ _log.debug("Tearing down " + _spec);
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.stop();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error stopping connection " + Util.asString(_connection), t);
+ }
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error closing connection " + Util.asString(_connection), t);
+ }
+ if (_spec.isHasBeenUpdated())
+ {
+ _factory = null;
+ }
+ _log.debug("Tearing down complete " + this);
+ }
+
+ protected void setupCF() throws Exception
+ {
+ if (_spec.isHasBeenUpdated())
+ {
+ _factory = _ra.createAMQConnectionFactory(_spec);
+ }
+ else
+ {
+ _factory = _ra.getDefaultAMQConnectionFactory();
+ }
+ }
+
+ protected void setupDestination() throws Exception
+ {
+
+ String destinationName = _spec.getDestination();
+ String destinationTypeString = _spec.getDestinationType();
+
+ if (_spec.isUseJNDI())
+ {
+ Context ctx = new InitialContext();
+ _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setupDestination(" + ctx + ")");
+ }
+
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+ {
+ _log.debug("Destination type defined as " + destinationTypeString);
+
+ Class<? extends Destination> destinationType;
+ if (Topic.class.getName().equals(destinationTypeString))
+ {
+ destinationType = Topic.class;
+ _isTopic = true;
+ }
+ else
+ {
+ destinationType = Queue.class;
+ }
+
+ _log.debug("Retrieving destination " + destinationName +
+ " of type " +
+ destinationType.getName());
+ _destination = Util.lookup(ctx, destinationName, destinationType);
+
+ }
+ else
+ {
+ _log.debug("Destination type not defined");
+ _log.debug("Retrieving destination " + destinationName +
+ " of type " +
+ Destination.class.getName());
+
+ _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
+ _isTopic = !(_destination instanceof Queue) ;
+ }
+ }
+ else
+ {
+ _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
+
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+ {
+ _log.debug("Destination type defined as " + destinationTypeString);
+ final boolean match ;
+ if (Topic.class.getName().equals(destinationTypeString))
+ {
+ match = (_destination instanceof Topic) ;
+ _isTopic = true;
+ }
+ else
+ {
+ match = (_destination instanceof Queue) ;
+ }
+ if (!match)
+ {
+ throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
+ }
+ }
+ else
+ {
+ _isTopic = !(_destination instanceof Queue) ;
+ }
+ }
+
+ _log.debug("Got destination " + _destination + " from " + destinationName);
+ }
+
+
+
+}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
index 473efab31f..a02adf0dad 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.ra.inflow;
+import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -35,6 +36,9 @@ import javax.transaction.Status;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.ra.QpidResourceAdapter;
import org.apache.qpid.ra.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,82 +47,100 @@ import org.slf4j.LoggerFactory;
* The message handler
*
*/
-public class QpidMessageHandler implements MessageListener
+public class QpidMessageHandler extends QpidExceptionHandler implements MessageListener
{
- /**
- * The logger
- */
private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class);
- /**
- * The session
- */
- private final Session _session;
-
private MessageConsumer _consumer;
- /**
- * The endpoint
- */
private MessageEndpoint _endpoint;
- private final QpidActivation _activation;
-
- private boolean _useLocalTx;
-
- private boolean _transacted;
+ private Session _session;
private final TransactionManager _tm;
- public QpidMessageHandler(final QpidActivation activation,
- final TransactionManager tm,
- final Session session)
+ public QpidMessageHandler(final QpidResourceAdapter ra,
+ final QpidActivationSpec spec,
+ final MessageEndpointFactory endpointFactory,
+ final TransactionManager tm,
+ final Connection connection) throws ResourceException
{
- this._activation = activation;
- this._session = session;
+ super(ra, spec, endpointFactory);
this._tm = tm;
+ this._connection = connection;
}
-
+
+ public QpidMessageHandler(final QpidResourceAdapter ra,
+ final QpidActivationSpec spec,
+ final MessageEndpointFactory endpointFactory,
+ final TransactionManager tm) throws ResourceException
+ {
+ super(ra, spec, endpointFactory);
+ this._tm = tm;
+ }
+
public void setup() throws Exception
{
if (_log.isTraceEnabled())
{
_log.trace("setup()");
}
-
- QpidActivationSpec spec = _activation.getActivationSpec();
- String selector = spec.getMessageSelector();
-
+
+ setupCF();
+ setupDestination();
+ String selector = _spec.getMessageSelector();
+
+ if(_spec.isUseConnectionPerHandler())
+ {
+ setupConnection();
+ _connection.setExceptionListener(this);
+ }
+
+ if(isXA())
+ {
+ _session = _ra.createXASession((XAConnectionImpl)_connection);
+ }
+ else
+ {
+ _session = _ra.createSession((AMQConnection)_connection,
+ _spec.getAcknowledgeModeInt(),
+ _spec.isUseLocalTx(),
+ _spec.getPrefetchLow(),
+ _spec.getPrefetchHigh());
+ }
// Create the message consumer
- if (_activation.isTopic())
+ if (_isTopic)
{
- final Topic topic = (Topic) _activation.getDestination();
- final String subscriptionName = spec.getSubscriptionName();
- if (spec.isSubscriptionDurable())
- _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ final Topic topic = (Topic) _destination;
+ final String subscriptionName = _spec.getSubscriptionName();
+
+ if (_spec.isSubscriptionDurable())
+ {
+ _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ }
else
- _consumer = _session.createConsumer(topic, selector) ;
+ {
+ _consumer = _session.createConsumer(topic, selector) ;
+ }
}
else
{
- final Queue queue = (Queue) _activation.getDestination();
+ final Queue queue = (Queue) _destination;
_consumer = _session.createConsumer(queue, selector);
}
- // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX
- MessageEndpointFactory endpointFactory = _activation.getMessageEndpointFactory();
- _useLocalTx = _activation.getActivationSpec().isUseLocalTx();
- _transacted = _activation.isDeliveryTransacted() || _useLocalTx ;
- if (_activation.isDeliveryTransacted() && !_activation.getActivationSpec().isUseLocalTx())
+ if (isXA())
{
final XAResource xaResource = ((XASession)_session).getXAResource() ;
- _endpoint = endpointFactory.createEndpoint(xaResource);
+ _endpoint = _endpointFactory.createEndpoint(xaResource);
}
else
{
- _endpoint = endpointFactory.createEndpoint(null);
+ _endpoint = _endpointFactory.createEndpoint(null);
}
_consumer.setMessageListener(this);
+ _connection.start();
+ _activated.set(true);
}
/**
@@ -126,11 +148,13 @@ public class QpidMessageHandler implements MessageListener
*/
public void teardown()
{
- if (_log.isTraceEnabled())
- {
- _log.trace("teardown()");
- }
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("teardown()");
+ }
+ super.teardown();
+
try
{
if (_endpoint != null)
@@ -156,27 +180,28 @@ public class QpidMessageHandler implements MessageListener
try
{
- if (_activation.getActivationSpec().getTransactionTimeout() > 0 && _tm != null)
+ if (_spec.getTransactionTimeout() > 0 && _tm != null)
{
- _tm.setTransactionTimeout(_activation.getActivationSpec().getTransactionTimeout());
+ _tm.setTransactionTimeout(_spec.getTransactionTimeout());
}
_endpoint.beforeDelivery(QpidActivation.ONMESSAGE);
beforeDelivery = true;
- if(_transacted)
+ if(isXA())
{
message.acknowledge();
}
((MessageListener)_endpoint).onMessage(message);
- if (_transacted && (_tm.getTransaction() != null))
+ if (isXA() && (_tm.getTransaction() != null))
{
final int status = _tm.getStatus() ;
final boolean rollback = status == Status.STATUS_MARKED_ROLLBACK
|| status == Status.STATUS_ROLLING_BACK
|| status == Status.STATUS_ROLLEDBACK;
+
if (rollback)
{
_session.recover() ;
@@ -196,7 +221,7 @@ public class QpidMessageHandler implements MessageListener
_log.warn("Unable to call after delivery", e);
return;
}
- if (_useLocalTx)
+ if (!isXA() && _spec.isUseLocalTx())
{
_session.commit();
}
@@ -216,7 +241,7 @@ public class QpidMessageHandler implements MessageListener
_log.warn("Unable to call after delivery", e);
}
}
- if (_useLocalTx || !_activation.isDeliveryTransacted())
+ if (!isXA() && _spec.isUseLocalTx())
{
try
{
@@ -241,5 +266,17 @@ public class QpidMessageHandler implements MessageListener
}
}
+
+ public void start() throws Exception
+ {
+ _deliveryActive.set(true);
+ setup();
+ }
+
+ public void stop()
+ {
+ _deliveryActive.set(false);
+ teardown();
+ }
}
diff --git a/qpid/java/jca/src/main/resources/META-INF/ra.xml b/qpid/java/jca/src/main/resources/META-INF/ra.xml
index 01f5eceecf..a9374f52d7 100755
--- a/qpid/java/jca/src/main/resources/META-INF/ra.xml
+++ b/qpid/java/jca/src/main/resources/META-INF/ra.xml
@@ -109,6 +109,13 @@
<config-property-type>java.lang.String</config-property-type>
<config-property-value>amqp://anonymous:passwd@client/test?brokerlist='tcp://localhost?sasl_mechs='PLAIN''</config-property-value>
</config-property>
+
+ <config-property>
+ <description>Use a JMS Connection per MessageHandler</description>
+ <config-property-name>UseConnectionPerHandler</config-property-name>
+ <config-property-type>java.lang.Boolean</config-property-type>
+ <config-property-value>true</config-property-value>
+ </config-property>
<outbound-resourceadapter>
<connection-definition>