summaryrefslogtreecommitdiff
path: root/java/jca/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
commit633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch)
tree1391da89470593209466df68c0b40b89c14963b1 /java/jca/src
parentc73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff)
downloadqpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/jca/src')
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java6
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/QpidRAConnectionRequestInfo.java36
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java109
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java4
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java48
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java2
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java2
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java12
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java2
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java140
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java27
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java25
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java501
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java29
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java339
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java139
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/tm/GlassfishTransactionManagerLocator.java63
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/tm/JBoss7TransactionManagerLocator.java20
-rw-r--r--java/jca/src/main/java/org/apache/qpid/ra/tm/WLSTransactionManagerLocator.java64
-rwxr-xr-xjava/jca/src/main/resources/META-INF/ra.xml42
-rw-r--r--java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java45
-rw-r--r--java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java78
22 files changed, 943 insertions, 790 deletions
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java b/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java
index a7b36bc98c..3bddfd80a4 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java
@@ -34,7 +34,7 @@ public class ConnectionFactoryProperties
private boolean _hasBeenUpdated = false;
- private String _clientID;
+ private String _clientId;
private String _connectionURL;
@@ -56,7 +56,7 @@ public class ConnectionFactoryProperties
{
_log.trace("getClientID()");
}
- return _clientID;
+ return _clientId;
}
public void setClientId(final String clientID)
@@ -66,7 +66,7 @@ public class ConnectionFactoryProperties
_log.trace("setClientID(" + clientID + ")");
}
_hasBeenUpdated = true;
- this._clientID = clientID;
+ this._clientId = clientID;
}
public boolean isHasBeenUpdated()
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAConnectionRequestInfo.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAConnectionRequestInfo.java
index c37a264ebc..779709839a 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAConnectionRequestInfo.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAConnectionRequestInfo.java
@@ -45,7 +45,7 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo
private String _password;
/** The client id */
- private String _clientID;
+ private String _clientId;
/** The type */
private final int _type;
@@ -76,13 +76,13 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo
final ConnectionURL connectionURL = ra.getDefaultAMQConnectionFactory().getConnectionURL() ;
_userName = connectionURL.getUsername();
_password = connectionURL.getPassword();
- _clientID = connectionURL.getClientName();
+ _clientId = connectionURL.getClientName();
}
else
{
- _userName = ra.getDefaultUserName();
- _password = ra.getDefaultPassword();
- _clientID = ra.getClientId();
+ _userName = ra.getUserName();
+ _password = ra.getPassword();
+ _clientId = ra.getClientId();
}
this._type = type;
_transacted = true;
@@ -142,9 +142,9 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo
{
_password = connectionURL.getPassword();
}
- if (_clientID == null)
+ if (_clientId == null)
{
- _clientID = connectionURL.getClientName();
+ _clientId = connectionURL.getClientName();
}
}
@@ -170,15 +170,15 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo
{
if (_userName == null)
{
- _userName = ra.getDefaultUserName();
+ _userName = ra.getUserName();
}
if (_password == null)
{
- _password = ra.getDefaultPassword();
+ _password = ra.getPassword();
}
- if (_clientID == null)
+ if (_clientId == null)
{
- _clientID = ra.getClientId();
+ _clientId = ra.getClientId();
}
}
}
@@ -243,28 +243,28 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo
* Get the client id
* @return The value
*/
- public String getClientID()
+ public String getClientId()
{
if (_log.isTraceEnabled())
{
_log.trace("getClientID()");
}
- return _clientID;
+ return _clientId;
}
/**
* Set the client id
* @param clientID The value
*/
- public void setClientID(final String clientID)
+ public void setClientId(final String clientID)
{
if (_log.isTraceEnabled())
{
_log.trace("setClientID(" + clientID + ")");
}
- this._clientID = clientID;
+ this._clientId = clientID;
}
/**
@@ -321,7 +321,7 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo
{
QpidRAConnectionRequestInfo you = (QpidRAConnectionRequestInfo)obj;
return Util.compare(_userName, you.getUserName()) && Util.compare(_password, you.getPassword()) &&
- Util.compare(_clientID, you.getClientID()) &&
+ Util.compare(_clientId, you.getClientId()) &&
_type == you.getType() &&
_transacted == you.isTransacted() &&
_acknowledgeMode == you.getAcknowledgeMode();
@@ -343,7 +343,7 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo
hash += 31 * hash + (_userName != null ? _userName.hashCode() : 0);
hash += 31 * hash + (_password != null ? _password.hashCode() : 0);
- hash += 31 * hash + (_clientID != null ? _clientID.hashCode() : 0);
+ hash += 31 * hash + (_clientId != null ? _clientId.hashCode() : 0);
hash += 31 * hash + _type;
hash += 31 * hash + (_transacted ? 1 : 0);
hash += 31 * hash + _acknowledgeMode;
@@ -356,6 +356,6 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo
{
return "QpidRAConnectionRequestInfo[type=" + _type +
", transacted=" + _transacted + ", acknowledgeMode=" + _acknowledgeMode +
- ", clientID=" + _clientID + ", userName=" + _userName + ((_password != null) ? ", password=********]" :"]");
+ ", clientID=" + _clientId + ", userName=" + _userName + ((_password != null) ? ", password=********]" :"]");
}
}
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java
index 53896d8872..eccf77aff2 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java
@@ -34,10 +34,11 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
+import javax.jms.QueueConnection;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
-import javax.jms.QueueConnection;
import javax.jms.TopicConnection;
+import javax.jms.XAConnection;
import javax.jms.XAQueueConnection;
import javax.jms.XASession;
import javax.jms.XATopicConnection;
@@ -260,7 +261,20 @@ public class QpidRAManagedConnection implements ManagedConnection, ExceptionList
}
catch (JMSException e)
{
- _log.debug("Error closing session " + this, e);
+ _log.debug("Error closing XASession " + this, e);
+ }
+
+ try
+ {
+ if(_session != null)
+ {
+ _session.close();
+ }
+
+ }
+ catch(JMSException e)
+ {
+ _log.error("Error closing Session " + this, e);
}
if (_connection != null)
@@ -585,7 +599,7 @@ public class QpidRAManagedConnection implements ManagedConnection, ExceptionList
*/
protected Session getSession() throws JMSException
{
- if(_xaSession != null && !_mcf.getUseLocalTx())
+ if(_xaSession != null && !_mcf.getUseLocalTx() && _inManagedTx)
{
if (_log.isTraceEnabled())
{
@@ -761,107 +775,44 @@ public class QpidRAManagedConnection implements ManagedConnection, ExceptionList
{
if (_userName != null && _password != null)
{
- if(!transacted)
- {
- _connection = _mcf.getCleanAMQConnectionFactory().createXATopicConnection(_userName, _password);
- }
- else
- {
- _connection = _mcf.getCleanAMQConnectionFactory().createTopicConnection(_userName, _password);
- }
+ _connection = _mcf.getCleanAMQConnectionFactory().createXATopicConnection(_userName, _password);
}
else
{
- if(!transacted)
- {
- _connection = _mcf.getDefaultAMQConnectionFactory().createXATopicConnection();
- }
- else
- {
- _connection = _mcf.getDefaultAMQConnectionFactory().createTopicConnection();
- }
+ _connection = _mcf.getDefaultAMQConnectionFactory().createXATopicConnection();
}
- if(!transacted)
- {
- _xaSession = ((XATopicConnection)_connection).createXATopicSession();
- }
- else
- {
- _session = ((TopicConnection)_connection).createTopicSession(transacted, acknowledgeMode);
- }
+ _xaSession = ((XATopicConnection)_connection).createXATopicSession();
+ _session = ((TopicConnection)_connection).createTopicSession(transacted, acknowledgeMode);
+
}
else if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION)
{
if (_userName != null && _password != null)
{
- if(!transacted)
- {
- _connection = _mcf.getCleanAMQConnectionFactory().createXAQueueConnection(_userName, _password);
- }
- else
- {
- _connection = _mcf.getCleanAMQConnectionFactory().createQueueConnection(_userName, _password);
- }
+ _connection = _mcf.getCleanAMQConnectionFactory().createXAQueueConnection(_userName, _password);
}
else
{
- if(!transacted)
- {
- _connection = _mcf.getDefaultAMQConnectionFactory().createXAQueueConnection();
- }
- else
- {
- _connection = _mcf.getDefaultAMQConnectionFactory().createQueueConnection();
- }
+ _connection = _mcf.getDefaultAMQConnectionFactory().createXAQueueConnection();
}
- if(!transacted)
- {
- _xaSession = ((XAQueueConnection)_connection).createXAQueueSession();
+ _xaSession = ((XAQueueConnection)_connection).createXAQueueSession();
+ _session = ((QueueConnection)_connection).createQueueSession(transacted, acknowledgeMode);
- }
- else
- {
- _session = ((QueueConnection)_connection).createQueueSession(transacted, acknowledgeMode);
-
- }
}
else
{
if (_userName != null && _password != null)
{
- if(!transacted)
- {
- _connection = _mcf.getCleanAMQConnectionFactory().createXAConnection(_userName, _password);
- }
- else
- {
- _connection = _mcf.getCleanAMQConnectionFactory().createConnection(_userName, _password);
- }
- }
- else
- {
- if(!transacted)
- {
- _connection = _mcf.getDefaultAMQConnectionFactory().createXAConnection();
- }
- else
- {
- _connection = _mcf.getDefaultAMQConnectionFactory().createConnection();
- }
- }
-
- if(!transacted)
- {
- _xaSession = ((XAQueueConnection)_connection).createXASession();
-
+ _connection = _mcf.getCleanAMQConnectionFactory().createXAConnection(_userName, _password);
}
else
{
- _session = ((QueueConnection)_connection).createSession(transacted, acknowledgeMode);
-
+ _connection = _mcf.getDefaultAMQConnectionFactory().createXAConnection();
}
+ _xaSession = ((XAConnection)_connection).createXASession();
+ _session = _connection.createSession(transacted, acknowledgeMode);
}
_connection.setExceptionListener(this);
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java
index 318485a7f2..8744a9deec 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java
@@ -365,12 +365,12 @@ public class QpidRAManagedConnectionFactory implements ManagedConnectionFactory,
_mcfProperties.setSessionDefaultType(type);
}
- public String getClientID()
+ public String getClientId()
{
return _mcfProperties.getClientId();
}
- public void setClientID(final String clientID)
+ public void setClientId(final String clientID)
{
_mcfProperties.setClientId(clientID);
}
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
index 21f7d2574f..69320575b0 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
@@ -31,10 +31,8 @@ import org.slf4j.LoggerFactory;
*/
public class QpidRAProperties extends ConnectionFactoryProperties implements Serializable
{
- /** Serial version UID */
private static final long serialVersionUID = -4823893873707374791L;
- /** The logger */
private static final Logger _log = LoggerFactory.getLogger(QpidRAProperties.class);
private static final int DEFAULT_SETUP_ATTEMPTS = 10;
@@ -45,16 +43,14 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser
private long _setupInterval = DEFAULT_SETUP_INTERVAL;
- /** Use Local TX instead of XA */
- private Boolean _localTx = false;
-
/** Class used to locate the Transaction Manager. */
private String _transactionManagerLocatorClass ;
/** Method used to locate the TM */
private String _transactionManagerLocatorMethod ;
-
+ private boolean _useConnectionPerHandler = true;
+
/**
* Constructor
*/
@@ -66,34 +62,6 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser
}
}
- /**
- * Get the use XA flag
- * @return The value
- */
- public Boolean getUseLocalTx()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getUseLocalTx()");
- }
-
- return _localTx;
- }
-
- /**
- * Set the use XA flag
- * @param localTx The value
- */
- public void setUseLocalTx(final Boolean localTx)
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("setUseLocalTx(" + localTx + ")");
- }
-
- this._localTx = localTx;
- }
-
public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass)
{
if (_log.isTraceEnabled())
@@ -174,10 +142,20 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser
this._setupInterval = setupInterval;
}
+ public boolean isUseConnectionPerHandler()
+ {
+ return _useConnectionPerHandler;
+ }
+
+ public void setUseConnectionPerHandler(boolean connectionPerHandler)
+ {
+ this._useConnectionPerHandler = connectionPerHandler;
+ }
+
@Override
public String toString()
{
- return "QpidRAProperties[localTx=" + _localTx +
+ return "QpidRAProperties[" +
", transactionManagerLocatorClass=" + _transactionManagerLocatorClass +
", transactionManagerLocatorMethod=" + _transactionManagerLocatorMethod +
", setupAttempts=" + _setupAttempts +
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java
index 081677ca4b..a72f51da51 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java
@@ -30,4 +30,6 @@ public interface QpidRASession
public void start() throws JMSException;
public void close() throws JMSException;
+
+ public void closeSession() throws JMSException;
}
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java
index cf28d5bba1..2747282a3c 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java
@@ -58,5 +58,5 @@ public interface QpidRASessionFactory extends Connection, TopicConnection, Queue
* @param session The session
* @throws JMSException for any error
*/
- void closeSession(QpidRASessionImpl session) throws JMSException;
+ void closeSession(QpidRASession session) throws JMSException;
}
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java
index e2bc2d2008..f3253e1400 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java
@@ -567,7 +567,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference
_started = true;
for (Iterator<QpidRASession> i = _sessions.iterator(); i.hasNext();)
{
- QpidRASessionImpl session = (QpidRASessionImpl)i.next();
+ QpidRASession session = (QpidRASession)i.next();
session.start();
}
}
@@ -609,7 +609,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference
{
for (Iterator<QpidRASession> i = _sessions.iterator(); i.hasNext();)
{
- QpidRASessionImpl session = (QpidRASessionImpl)i.next();
+ QpidRASession session = (QpidRASession)i.next();
try
{
session.closeSession();
@@ -670,7 +670,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference
* @param session The session
* @exception JMSException Thrown if an error occurs
*/
- public void closeSession(final QpidRASessionImpl session) throws JMSException
+ public void closeSession(final QpidRASession session) throws JMSException
{
if (_log.isTraceEnabled())
{
@@ -679,7 +679,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference
synchronized (_sessions)
{
- _sessions.remove(session);
+ _sessions.clear();
}
}
@@ -742,7 +742,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference
QpidRAConnectionRequestInfo info = new QpidRAConnectionRequestInfo(sessionType);
info.setUserName(_userName);
info.setPassword(_password);
- info.setClientID(_clientID);
+ info.setClientId(_clientID);
info.setDefaults(_mcf.getDefaultAMQConnectionFactory().getConnectionURL());
if (_log.isTraceEnabled())
@@ -839,7 +839,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference
sessionType);
info.setUserName(_userName);
info.setPassword(_password);
- info.setClientID(_clientID);
+ info.setClientId(_clientID);
info.setDefaults(_mcf.getDefaultAMQConnectionFactory().getConnectionURL());
if (_log.isTraceEnabled())
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java
index fdd4888a3d..c4cfeaba48 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java
@@ -1446,7 +1446,7 @@ public class QpidRASessionImpl implements Session, QueueSession, TopicSession, X
* Close session
* @exception JMSException Thrown if an error occurs
*/
- void closeSession() throws JMSException
+ public void closeSession() throws JMSException
{
final QpidRAManagedConnection mc = this._mc;
if (mc != null)
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
index 363af1bbcd..96fa83ceef 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
@@ -38,8 +38,6 @@ import javax.resource.spi.work.WorkManager;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
@@ -47,6 +45,8 @@ import org.apache.qpid.client.XAConnectionImpl;
import org.apache.qpid.ra.inflow.QpidActivation;
import org.apache.qpid.ra.inflow.QpidActivationSpec;
import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The resource adapter for Qpid
@@ -54,43 +54,22 @@ import org.apache.qpid.url.URLSyntaxException;
*/
public class QpidResourceAdapter implements ResourceAdapter, Serializable
{
- /**
- *
- */
private static final long serialVersionUID = -2446231446818098726L;
- /**
- * The logger
- */
- private static final Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
+ private static final transient Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
- /**
- * The bootstrap context
- */
private BootstrapContext _ctx;
- /**
- * The resource adapter properties
- */
private final QpidRAProperties _raProperties;
- /**
- * Have the factory been configured
- */
private final AtomicBoolean _configured;
- /**
- * The activations by activation spec
- */
private final Map<ActivationSpec, QpidActivation> _activations;
private AMQConnectionFactory _defaultAMQConnectionFactory;
private TransactionManager _tm;
- /**
- * Constructor
- */
public QpidResourceAdapter()
{
if (_log.isTraceEnabled())
@@ -223,65 +202,6 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
_log.info("Qpid resource adapter stopped");
}
- /**
- * Get the user name
- *
- * @return The value
- */
- public String getDefaultUserName()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getUserName()");
- }
-
- return _raProperties.getUserName();
- }
-
- /**
- * Set the user name
- *
- * @param userName The value
- */
- public void setDefaultUserName(final String userName)
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("setUserName(" + userName + ")");
- }
-
- _raProperties.setUserName(userName);
- }
-
- /**
- * Get the password
- *
- * @return The value
- */
- public String getDefaultPassword()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getPassword()");
- }
-
- return _raProperties.getPassword();
- }
-
- /**
- * Set the password
- *
- * @param password The value
- */
- public void setDefaultPassword(final String password)
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("setPassword(****)");
- }
-
- _raProperties.setPassword(password);
- }
/**
* Get the client ID
@@ -403,6 +323,26 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
_raProperties.setPath(path);
}
+ public String getUserName()
+ {
+ return _raProperties.getUserName();
+ }
+
+ public void setUserName(String userName)
+ {
+ _raProperties.setUserName(userName);
+ }
+
+ public String getPassword()
+ {
+ return _raProperties.getPassword();
+ }
+
+ public void setPassword(String password)
+ {
+ _raProperties.setPassword(password);
+ }
+
/**
* Get the connection url
*
@@ -493,14 +433,14 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
*
* @return The value
*/
- public Boolean getUseLocalTx()
+ public Boolean isUseLocalTx()
{
if (_log.isTraceEnabled())
{
_log.trace("getUseLocalTx()");
}
- return _raProperties.getUseLocalTx();
+ return _raProperties.isUseLocalTx();
}
/**
@@ -553,7 +493,27 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
}
_raProperties.setSetupInterval(interval);
}
+
+ public Boolean isUseConnectionPerHandler()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("isConnectionPerHandler()");
+ }
+
+ return _raProperties.isUseConnectionPerHandler();
+ }
+ public void setUseConnectionPerHandler(Boolean connectionPerHandler)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setConnectionPerHandler(" + connectionPerHandler + ")");
+ }
+
+ _raProperties.setUseConnectionPerHandler(connectionPerHandler);
+ }
+
/**
* Indicates whether some other object is "equal to" this one.
*
@@ -720,9 +680,10 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
return map;
}
- private void locateTM()
+ private void locateTM() throws ResourceAdapterInternalException
{
- if(_raProperties.getTransactionManagerLocatorClass() != null && _raProperties.getTransactionManagerLocatorMethod() != null)
+ if(_raProperties.getTransactionManagerLocatorClass() != null
+ && _raProperties.getTransactionManagerLocatorMethod() != null)
{
String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";");
@@ -742,8 +703,8 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
if (_tm == null)
{
- _log.warn("It wasn't possible to lookup a Transaction Manager through the configured properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
- _log.warn("Qpid Resource Adapter won't be able to set and verify transaction timeouts in certain cases.");
+ _log.error("It was not possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
+ throw new ResourceAdapterInternalException("Could not locate javax.transaction.TransactionManager");
}
else
{
@@ -802,6 +763,7 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
final String client = (clientID != null ? clientID : "") ;
final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ;
+
if (_log.isDebugEnabled())
{
_log.debug("Initialising connectionURL to " + newurl) ;
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java b/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java
index 41242fefae..a948948d6a 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java
@@ -77,17 +77,17 @@ public class QpidConnectionFactoryProxy implements Externalizable, Referenceable
public void writeExternal(ObjectOutput out) throws IOException
{
- if (_delegate == null)
- {
- _log.error("Null Destination ");
- throw new IOException("Null ConnectionFactory!");
- }
try
{
+ if(_delegate == null)
+ {
+ getReference();
+ }
+
out.writeObject(((Referenceable) _delegate).getReference());
}
- catch (NamingException e)
+ catch (Exception e)
{
_log.error("Failed to dereference ConnectionFactory " + e.getMessage(), e);
throw new IOException("Failed to dereference ConnectionFactory: " + e.getMessage());
@@ -137,7 +137,20 @@ public class QpidConnectionFactoryProxy implements Externalizable, Referenceable
*/
public Connection createConnection() throws JMSException
{
- return _delegate.createConnection();
+ try
+ {
+ if(_delegate == null)
+ {
+ getReference();
+ }
+
+ return _delegate.createConnection();
+ }
+ catch(Exception e)
+ {
+ throw new JMSException(e.getMessage());
+ }
+
}
/**
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java b/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
index 417849fc5c..162099d1ee 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
@@ -21,25 +21,15 @@
package org.apache.qpid.ra.admin;
import java.io.Externalizable;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
-import java.io.ObjectInputStream;
import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Hashtable;
-import javax.naming.Context;
-import javax.naming.Name;
import javax.naming.NamingException;
-import javax.naming.RefAddr;
import javax.naming.Reference;
import javax.naming.StringRefAddr;
-import javax.naming.spi.ObjectFactory;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.ra.admin.AdminObjectFactory;
public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable
{
@@ -101,19 +91,4 @@ public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable
out.writeObject(this._url);
}
- //TODO move to tests
- public static void main(String[] args) throws Exception
- {
- QpidQueueImpl q = new QpidQueueImpl();
- q.setDestinationAddress("hello.Queue;{create:always, node:{type:queue, x-declare:{auto-delete:true}}}");
- ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("queue.out"));
- os.writeObject(q);
- os.close();
-
-
- ObjectInputStream is = new ObjectInputStream(new FileInputStream("queue.out"));
- q = (QpidQueueImpl)is.readObject();
- System.out.println(q);
-
- }
}
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
index 57edec8eee..2327512a62 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
@@ -20,114 +20,28 @@
*/
package org.apache.qpid.ra.inflow;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkManager;
+import org.apache.qpid.ra.QpidResourceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.XAConnectionImpl;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.ra.QpidResourceAdapter;
-import org.apache.qpid.ra.Util;
-
/**
* The activation.
*
*/
-public class QpidActivation implements ExceptionListener
+public class QpidActivation extends QpidExceptionHandler
{
- /**
- * The logger
- */
private static final Logger _log = LoggerFactory.getLogger(QpidActivation.class);
- /**
- * The onMessage method
- */
- public static final Method ONMESSAGE;
-
- /**
- * The resource adapter
- */
- private final QpidResourceAdapter _ra;
-
- /**
- * The activation spec
- */
- private final QpidActivationSpec _spec;
-
- /**
- * The message endpoint factory
- */
- private final MessageEndpointFactory _endpointFactory;
-
- /**
- * Whether delivery is active
- */
- private final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
-
- /**
- * The destination type
- */
- private boolean _isTopic = false;
-
- /**
- * Is the delivery transacted
- */
- private boolean _isDeliveryTransacted;
-
- private Destination _destination;
-
- /**
- * The connection
- */
- private Connection _connection;
-
private final List<QpidMessageHandler> _handlers = new ArrayList<QpidMessageHandler>();
- private AMQConnectionFactory _factory;
-
- // Whether we are in the failure recovery loop
- private AtomicBoolean _inFailure = new AtomicBoolean(false);
-
- //Whether or not we have completed activating
- private AtomicBoolean _activated = new AtomicBoolean(false);
-
- static
- {
- try
- {
- ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
+
/**
* Constructor
*
@@ -140,97 +54,8 @@ public class QpidActivation implements ExceptionListener
final MessageEndpointFactory endpointFactory,
final QpidActivationSpec spec) throws ResourceException
{
- if (_log.isTraceEnabled())
- {
- _log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")");
- }
-
- this._ra = ra;
- this._endpointFactory = endpointFactory;
- this._spec = spec;
- try
- {
- _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
- }
- catch (Exception e)
- {
- throw new ResourceException(e);
- }
- }
-
- /**
- * Get the activation spec
- *
- * @return The value
- */
- public QpidActivationSpec getActivationSpec()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getActivationSpec()");
- }
-
- return _spec;
- }
-
- /**
- * Get the message endpoint factory
- *
- * @return The value
- */
- public MessageEndpointFactory getMessageEndpointFactory()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getMessageEndpointFactory()");
- }
-
- return _endpointFactory;
- }
-
- /**
- * Get whether delivery is transacted
- *
- * @return The value
- */
- public boolean isDeliveryTransacted()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("isDeliveryTransacted()");
- }
-
- return _isDeliveryTransacted;
- }
-
- /**
- * Get the work manager
- *
- * @return The value
- */
- public WorkManager getWorkManager()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getWorkManager()");
- }
-
- return _ra.getWorkManager();
- }
-
- /**
- * Is the destination a topic
- *
- * @return The value
- */
- public boolean isTopic()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("isTopic()");
- }
-
- return _isTopic;
+ super(ra, spec, endpointFactory);
+
}
/**
@@ -267,70 +92,62 @@ public class QpidActivation implements ExceptionListener
*
* @throws Exception Thrown if an error occurs
*/
- protected synchronized void setup() throws Exception
+ public synchronized void setup() throws Exception
{
_log.debug("Setting up " + _spec);
- setupCF();
-
+ setupCF();
setupDestination();
- final AMQConnection amqConnection ;
- final boolean useLocalTx = _spec.isUseLocalTx() ;
- final boolean isXA = _isDeliveryTransacted && !useLocalTx ;
-
- if (isXA)
+
+ if(!_spec.isUseConnectionPerHandler())
{
- amqConnection = (XAConnectionImpl)_factory.createXAConnection() ;
+ setupConnection();
+ _connection.setExceptionListener(this);
}
- else
- {
- amqConnection = (AMQConnection)_factory.createConnection() ;
- }
-
- amqConnection.setExceptionListener(this) ;
-
+
for (int i = 0; i < _spec.getMaxSession(); i++)
{
- Session session = null;
-
- try
- {
- if (isXA)
- {
- session = _ra.createXASession((XAConnectionImpl)amqConnection) ;
- }
- else
- {
- session = _ra.createSession((AMQConnection)amqConnection,
- _spec.getAcknowledgeModeInt(),
- useLocalTx,
- _spec.getPrefetchLow(),
- _spec.getPrefetchHigh());
- }
-
- _log.debug("Using session " + Util.asString(session));
- QpidMessageHandler handler = new QpidMessageHandler(this, _ra.getTM(), session);
- handler.setup();
- _handlers.add(handler);
- }
- catch (Exception e)
- {
- try
- {
- amqConnection.close() ;
- }
- catch (Exception e2)
- {
- _log.trace("Ignored error closing connection", e2);
- }
-
- throw e;
- }
+ try
+ {
+ QpidMessageHandler handler = null;
+
+ if(_spec.isUseConnectionPerHandler())
+ {
+ handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM());
+ }
+ else
+ {
+ handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM(), _connection);
+ }
+
+ handler.start();
+ _handlers.add(handler);
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ if(_connection != null)
+ {
+ this._connection.close();
+ }
+ }
+ catch (Exception e2)
+ {
+ _log.trace("Ignored error closing connection", e2);
+ }
+
+ throw e;
+
+ }
+
+ }
+
+ if(!_spec.isUseConnectionPerHandler())
+ {
+ this._connection.start();
+ _activated.set(true);
}
- amqConnection.start() ;
- this._connection = amqConnection ;
- _activated.set(true);
-
_log.debug("Setup complete " + this);
}
@@ -340,136 +157,17 @@ public class QpidActivation implements ExceptionListener
protected synchronized void teardown()
{
_log.debug("Tearing down " + _spec);
-
- try
- {
- if (_connection != null)
- {
- _connection.stop();
- }
- }
- catch (Throwable t)
- {
- _log.debug("Error stopping connection " + Util.asString(_connection), t);
- }
+
+ super.teardown();
for (QpidMessageHandler handler : _handlers)
{
- handler.teardown();
+ handler.stop();
}
- try
- {
- if (_connection != null)
- {
- _connection.close();
- }
- }
- catch (Throwable t)
- {
- _log.debug("Error closing connection " + Util.asString(_connection), t);
- }
- if (_spec.isHasBeenUpdated())
- {
- _factory = null;
- }
_log.debug("Tearing down complete " + this);
}
- protected void setupCF() throws Exception
- {
- if (_spec.isHasBeenUpdated())
- {
- _factory = _ra.createAMQConnectionFactory(_spec);
- }
- else
- {
- _factory = _ra.getDefaultAMQConnectionFactory();
- }
- }
-
- public Destination getDestination()
- {
- return _destination;
- }
-
- protected void setupDestination() throws Exception
- {
-
- String destinationName = _spec.getDestination();
- String destinationTypeString = _spec.getDestinationType();
-
- if (_spec.isUseJNDI())
- {
- Context ctx = new InitialContext();
- _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
- if (_log.isTraceEnabled())
- {
- _log.trace("setupDestination(" + ctx + ")");
- }
-
- if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
- {
- _log.debug("Destination type defined as " + destinationTypeString);
-
- Class<? extends Destination> destinationType;
- if (Topic.class.getName().equals(destinationTypeString))
- {
- destinationType = Topic.class;
- _isTopic = true;
- }
- else
- {
- destinationType = Queue.class;
- }
-
- _log.debug("Retrieving destination " + destinationName +
- " of type " +
- destinationType.getName());
- _destination = Util.lookup(ctx, destinationName, destinationType);
-
- }
- else
- {
- _log.debug("Destination type not defined");
- _log.debug("Retrieving destination " + destinationName +
- " of type " +
- Destination.class.getName());
-
- _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
- _isTopic = !(_destination instanceof Queue) ;
- }
- }
- else
- {
- _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
- if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
- {
- _log.debug("Destination type defined as " + destinationTypeString);
- final boolean match ;
- if (Topic.class.getName().equals(destinationTypeString))
- {
- match = (_destination instanceof Topic) ;
- _isTopic = true;
- }
- else
- {
- match = (_destination instanceof Queue) ;
- }
- if (!match)
- {
- throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
- }
- }
- else
- {
- _isTopic = !(_destination instanceof Queue) ;
- }
- }
-
- _log.debug("Got destination " + _destination + " from " + destinationName);
- }
-
/**
* Get a string representation
*
@@ -492,94 +190,7 @@ public class QpidActivation implements ExceptionListener
return buffer.toString();
}
- public void onException(final JMSException jmse)
- {
- if(_activated.get())
- {
- handleFailure(jmse) ;
- }
- else
- {
- _log.warn("Received JMSException: " + jmse + " while endpoint was not activated.");
- }
- }
-
- /**
- * Handles any failure by trying to reconnect
- *
- * @param failure the reason for the failure
- */
- public void handleFailure(Throwable failure)
- {
- if(doesNotExist(failure))
- {
- _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
- }
- else
- {
- _log.warn("Failure in Qpid activation " + _spec, failure);
- }
- int reconnectCount = 0;
- int setupAttempts = _spec.getSetupAttempts();
- long setupInterval = _spec.getSetupInterval();
-
- // Only enter the failure loop once
- if (_inFailure.getAndSet(true))
- return;
- try
- {
- while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
- {
- teardown();
-
- try
- {
- Thread.sleep(setupInterval);
- }
- catch (InterruptedException e)
- {
- _log.debug("Interrupted trying to reconnect " + _spec, e);
- break;
- }
-
- _log.info("Attempting to reconnect " + _spec);
- try
- {
- setup();
- _log.info("Reconnected with Qpid");
- break;
- }
- catch (Throwable t)
- {
- if(doesNotExist(failure))
- {
- _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
- }
- else
- {
- _log.error("Unable to reconnect " + _spec, t);
- }
- }
- ++reconnectCount;
- }
- }
- finally
- {
- // Leaving failure recovery loop
- _inFailure.set(false);
- }
- }
-
- /**
- * Check to see if the failure represents a missing endpoint
- * @param failure The failure.
- * @return true if it represents a missing endpoint, false otherwise
- */
- private boolean doesNotExist(final Throwable failure)
- {
- return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
- }
-
+
/**
* Handles the setup
*/
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
index 5f4e2dcf6b..3d9a88adb5 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
@@ -28,10 +28,10 @@ import javax.resource.spi.ActivationSpec;
import javax.resource.spi.InvalidPropertyException;
import javax.resource.spi.ResourceAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.qpid.ra.ConnectionFactoryProperties;
import org.apache.qpid.ra.QpidResourceAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The activation spec
@@ -88,6 +88,8 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
// undefined by default, default is specified at the RA level in QpidRAProperties
private Long _setupInterval;
+ private Boolean _useConnectionPerHandler;
+
/**
* Constructor
*/
@@ -544,6 +546,16 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
this._setupInterval = setupInterval;
}
+ public Boolean isUseConnectionPerHandler()
+ {
+ return (_useConnectionPerHandler == null) ? _ra.isUseConnectionPerHandler() : _useConnectionPerHandler;
+ }
+
+ public void setUseConnectionPerHandler(Boolean connectionPerHandler)
+ {
+ this._useConnectionPerHandler = connectionPerHandler;
+ }
+
/**
* Validate
* @exception InvalidPropertyException Thrown if a validation exception occurs
@@ -561,6 +573,7 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
}
}
+
/**
* Get a string representation
* @return The value
@@ -573,23 +586,30 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
buffer.append("ra=").append(_ra);
buffer.append(" destination=").append(_destination);
buffer.append(" destinationType=").append(_destinationType);
+
if (_messageSelector != null)
{
buffer.append(" selector=").append(_messageSelector);
}
+
buffer.append(" ack=").append(getAcknowledgeMode());
buffer.append(" durable=").append(_subscriptionDurability);
buffer.append(" clientID=").append(getClientId());
+
if (_subscriptionName != null)
{
buffer.append(" subscription=").append(_subscriptionName);
}
+
buffer.append(" user=").append(getUserName());
+
if (getPassword() != null)
{
- buffer.append(" password=").append("****");
+ buffer.append(" password=").append("********");
}
+
buffer.append(" maxSession=").append(_maxSession);
+
if (_prefetchLow != null)
{
buffer.append(" prefetchLow=").append(_prefetchLow);
@@ -598,7 +618,10 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
{
buffer.append(" prefetchHigh=").append(_prefetchHigh);
}
+
+ buffer.append(" connectionPerHandler=").append(isUseConnectionPerHandler());
buffer.append(')');
+
return buffer.toString();
}
}
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
new file mode 100644
index 0000000000..8775362eb5
--- /dev/null
+++ b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
@@ -0,0 +1,339 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.jms.XAConnectionFactory;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ra.QpidResourceAdapter;
+import org.apache.qpid.ra.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class QpidExceptionHandler implements ExceptionListener
+{
+ private static final Logger _log = LoggerFactory.getLogger(QpidExceptionHandler.class);
+
+ public static final Method ONMESSAGE;
+
+ protected final MessageEndpointFactory _endpointFactory;
+
+ protected Connection _connection;
+
+ protected ConnectionFactory _factory;
+
+ protected Destination _destination;
+
+ protected final QpidResourceAdapter _ra;
+
+ protected final QpidActivationSpec _spec;
+
+ protected boolean _isDeliveryTransacted;
+
+ protected final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
+
+ protected boolean _isTopic = false;
+
+ // Whether we are in the failure recovery loop
+ protected AtomicBoolean _inFailure = new AtomicBoolean(false);
+
+ //Whether or not we have completed activating
+ protected AtomicBoolean _activated = new AtomicBoolean(false);
+
+ static
+ {
+ try
+ {
+ ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public abstract void setup() throws Exception;
+ public abstract void start() throws Exception;
+ public abstract void stop();
+
+ protected QpidExceptionHandler(QpidResourceAdapter ra,
+ QpidActivationSpec spec,
+ MessageEndpointFactory endpointFactory) throws ResourceException
+ {
+ this._ra = ra;
+ this._spec = spec;
+ this._endpointFactory = endpointFactory;
+
+ try
+ {
+ _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
+ }
+ catch (Exception e)
+ {
+ throw new ResourceException(e);
+ }
+
+
+ }
+
+ public void onException(JMSException e)
+ {
+ if(_activated.get())
+ {
+ handleFailure(e) ;
+ }
+ else
+ {
+ _log.warn("Received JMSException: " + e + " while endpoint was not activated.");
+ }
+ }
+
+ /**
+ * Handles any failure by trying to reconnect
+ *
+ * @param failure the reason for the failure
+ */
+ public void handleFailure(Throwable failure)
+ {
+ if(doesNotExist(failure))
+ {
+ _log.info("awaiting topic/queue creation " + _spec.getDestination());
+ }
+ else
+ {
+ _log.warn("Failure in Qpid activation " + _spec, failure);
+ }
+ int reconnectCount = 0;
+ int setupAttempts = _spec.getSetupAttempts();
+ long setupInterval = _spec.getSetupInterval();
+
+ // Only enter the failure loop once
+ if (_inFailure.getAndSet(true))
+ return;
+ try
+ {
+ while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
+ {
+ teardown();
+
+ try
+ {
+ Thread.sleep(setupInterval);
+ }
+ catch (InterruptedException e)
+ {
+ _log.debug("Interrupted trying to reconnect " + _spec, e);
+ break;
+ }
+
+ _log.info("Attempting to reconnect " + _spec);
+ try
+ {
+ setup();
+ _log.info("Reconnected with Qpid");
+ break;
+ }
+ catch (Throwable t)
+ {
+ if(doesNotExist(failure))
+ {
+ _log.info("awaiting topic/queue creation " + _spec.getDestination());
+ }
+ else
+ {
+ _log.error("Unable to reconnect " + _spec, t);
+ }
+ }
+ ++reconnectCount;
+ }
+ }
+ finally
+ {
+ // Leaving failure recovery loop
+ _inFailure.set(false);
+ }
+ }
+
+ /**
+ * Check to see if the failure represents a missing endpoint
+ * @param failure The failure.
+ * @return true if it represents a missing endpoint, false otherwise
+ */
+ protected boolean doesNotExist(final Throwable failure)
+ {
+ return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
+ }
+
+ protected boolean isXA()
+ {
+ return _isDeliveryTransacted && !_spec.isUseLocalTx();
+ }
+
+ protected void setupConnection() throws Exception
+ {
+ this._connection = (isXA()) ? ((XAConnectionFactory)_factory).createXAConnection() : _factory.createConnection();
+ }
+
+ protected synchronized void teardown()
+ {
+ _log.debug("Tearing down " + _spec);
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.stop();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error stopping connection " + Util.asString(_connection), t);
+ }
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error closing connection " + Util.asString(_connection), t);
+ }
+ if (_spec.isHasBeenUpdated())
+ {
+ _factory = null;
+ }
+ _log.debug("Tearing down complete " + this);
+ }
+
+ protected void setupCF() throws Exception
+ {
+ if (_spec.isHasBeenUpdated())
+ {
+ _factory = _ra.createAMQConnectionFactory(_spec);
+ }
+ else
+ {
+ _factory = _ra.getDefaultAMQConnectionFactory();
+ }
+ }
+
+ protected void setupDestination() throws Exception
+ {
+
+ String destinationName = _spec.getDestination();
+ String destinationTypeString = _spec.getDestinationType();
+
+ if (_spec.isUseJNDI())
+ {
+ Context ctx = new InitialContext();
+ _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setupDestination(" + ctx + ")");
+ }
+
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+ {
+ _log.debug("Destination type defined as " + destinationTypeString);
+
+ Class<? extends Destination> destinationType;
+ if (Topic.class.getName().equals(destinationTypeString))
+ {
+ destinationType = Topic.class;
+ _isTopic = true;
+ }
+ else
+ {
+ destinationType = Queue.class;
+ }
+
+ _log.debug("Retrieving destination " + destinationName +
+ " of type " +
+ destinationType.getName());
+ _destination = Util.lookup(ctx, destinationName, destinationType);
+
+ }
+ else
+ {
+ _log.debug("Destination type not defined");
+ _log.debug("Retrieving destination " + destinationName +
+ " of type " +
+ Destination.class.getName());
+
+ _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
+ _isTopic = !(_destination instanceof Queue) ;
+ }
+ }
+ else
+ {
+ _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
+
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+ {
+ _log.debug("Destination type defined as " + destinationTypeString);
+ final boolean match ;
+ if (Topic.class.getName().equals(destinationTypeString))
+ {
+ match = (_destination instanceof Topic) ;
+ _isTopic = true;
+ }
+ else
+ {
+ match = (_destination instanceof Queue) ;
+ }
+ if (!match)
+ {
+ throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
+ }
+ }
+ else
+ {
+ _isTopic = !(_destination instanceof Queue) ;
+ }
+ }
+
+ _log.debug("Got destination " + _destination + " from " + destinationName);
+ }
+
+
+
+}
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
index 473efab31f..a02adf0dad 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.ra.inflow;
+import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -35,6 +36,9 @@ import javax.transaction.Status;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.ra.QpidResourceAdapter;
import org.apache.qpid.ra.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,82 +47,100 @@ import org.slf4j.LoggerFactory;
* The message handler
*
*/
-public class QpidMessageHandler implements MessageListener
+public class QpidMessageHandler extends QpidExceptionHandler implements MessageListener
{
- /**
- * The logger
- */
private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class);
- /**
- * The session
- */
- private final Session _session;
-
private MessageConsumer _consumer;
- /**
- * The endpoint
- */
private MessageEndpoint _endpoint;
- private final QpidActivation _activation;
-
- private boolean _useLocalTx;
-
- private boolean _transacted;
+ private Session _session;
private final TransactionManager _tm;
- public QpidMessageHandler(final QpidActivation activation,
- final TransactionManager tm,
- final Session session)
+ public QpidMessageHandler(final QpidResourceAdapter ra,
+ final QpidActivationSpec spec,
+ final MessageEndpointFactory endpointFactory,
+ final TransactionManager tm,
+ final Connection connection) throws ResourceException
{
- this._activation = activation;
- this._session = session;
+ super(ra, spec, endpointFactory);
this._tm = tm;
+ this._connection = connection;
}
-
+
+ public QpidMessageHandler(final QpidResourceAdapter ra,
+ final QpidActivationSpec spec,
+ final MessageEndpointFactory endpointFactory,
+ final TransactionManager tm) throws ResourceException
+ {
+ super(ra, spec, endpointFactory);
+ this._tm = tm;
+ }
+
public void setup() throws Exception
{
if (_log.isTraceEnabled())
{
_log.trace("setup()");
}
-
- QpidActivationSpec spec = _activation.getActivationSpec();
- String selector = spec.getMessageSelector();
-
+
+ setupCF();
+ setupDestination();
+ String selector = _spec.getMessageSelector();
+
+ if(_spec.isUseConnectionPerHandler())
+ {
+ setupConnection();
+ _connection.setExceptionListener(this);
+ }
+
+ if(isXA())
+ {
+ _session = _ra.createXASession((XAConnectionImpl)_connection);
+ }
+ else
+ {
+ _session = _ra.createSession((AMQConnection)_connection,
+ _spec.getAcknowledgeModeInt(),
+ _spec.isUseLocalTx(),
+ _spec.getPrefetchLow(),
+ _spec.getPrefetchHigh());
+ }
// Create the message consumer
- if (_activation.isTopic())
+ if (_isTopic)
{
- final Topic topic = (Topic) _activation.getDestination();
- final String subscriptionName = spec.getSubscriptionName();
- if (spec.isSubscriptionDurable())
- _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ final Topic topic = (Topic) _destination;
+ final String subscriptionName = _spec.getSubscriptionName();
+
+ if (_spec.isSubscriptionDurable())
+ {
+ _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ }
else
- _consumer = _session.createConsumer(topic, selector) ;
+ {
+ _consumer = _session.createConsumer(topic, selector) ;
+ }
}
else
{
- final Queue queue = (Queue) _activation.getDestination();
+ final Queue queue = (Queue) _destination;
_consumer = _session.createConsumer(queue, selector);
}
- // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX
- MessageEndpointFactory endpointFactory = _activation.getMessageEndpointFactory();
- _useLocalTx = _activation.getActivationSpec().isUseLocalTx();
- _transacted = _activation.isDeliveryTransacted() || _useLocalTx ;
- if (_activation.isDeliveryTransacted() && !_activation.getActivationSpec().isUseLocalTx())
+ if (isXA())
{
final XAResource xaResource = ((XASession)_session).getXAResource() ;
- _endpoint = endpointFactory.createEndpoint(xaResource);
+ _endpoint = _endpointFactory.createEndpoint(xaResource);
}
else
{
- _endpoint = endpointFactory.createEndpoint(null);
+ _endpoint = _endpointFactory.createEndpoint(null);
}
_consumer.setMessageListener(this);
+ _connection.start();
+ _activated.set(true);
}
/**
@@ -126,11 +148,13 @@ public class QpidMessageHandler implements MessageListener
*/
public void teardown()
{
- if (_log.isTraceEnabled())
- {
- _log.trace("teardown()");
- }
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("teardown()");
+ }
+ super.teardown();
+
try
{
if (_endpoint != null)
@@ -156,27 +180,28 @@ public class QpidMessageHandler implements MessageListener
try
{
- if (_activation.getActivationSpec().getTransactionTimeout() > 0 && _tm != null)
+ if (_spec.getTransactionTimeout() > 0 && _tm != null)
{
- _tm.setTransactionTimeout(_activation.getActivationSpec().getTransactionTimeout());
+ _tm.setTransactionTimeout(_spec.getTransactionTimeout());
}
_endpoint.beforeDelivery(QpidActivation.ONMESSAGE);
beforeDelivery = true;
- if(_transacted)
+ if(isXA())
{
message.acknowledge();
}
((MessageListener)_endpoint).onMessage(message);
- if (_transacted && (_tm.getTransaction() != null))
+ if (isXA() && (_tm.getTransaction() != null))
{
final int status = _tm.getStatus() ;
final boolean rollback = status == Status.STATUS_MARKED_ROLLBACK
|| status == Status.STATUS_ROLLING_BACK
|| status == Status.STATUS_ROLLEDBACK;
+
if (rollback)
{
_session.recover() ;
@@ -196,7 +221,7 @@ public class QpidMessageHandler implements MessageListener
_log.warn("Unable to call after delivery", e);
return;
}
- if (_useLocalTx)
+ if (!isXA() && _spec.isUseLocalTx())
{
_session.commit();
}
@@ -216,7 +241,7 @@ public class QpidMessageHandler implements MessageListener
_log.warn("Unable to call after delivery", e);
}
}
- if (_useLocalTx || !_activation.isDeliveryTransacted())
+ if (!isXA() && _spec.isUseLocalTx())
{
try
{
@@ -241,5 +266,17 @@ public class QpidMessageHandler implements MessageListener
}
}
+
+ public void start() throws Exception
+ {
+ _deliveryActive.set(true);
+ setup();
+ }
+
+ public void stop()
+ {
+ _deliveryActive.set(false);
+ teardown();
+ }
}
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/tm/GlassfishTransactionManagerLocator.java b/java/jca/src/main/java/org/apache/qpid/ra/tm/GlassfishTransactionManagerLocator.java
new file mode 100644
index 0000000000..cff53d2710
--- /dev/null
+++ b/java/jca/src/main/java/org/apache/qpid/ra/tm/GlassfishTransactionManagerLocator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.ra.tm;
+
+
+import javax.naming.InitialContext;
+import javax.transaction.TransactionManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GlassfishTransactionManagerLocator
+{
+ private static final Logger _log = LoggerFactory.getLogger(GlassfishTransactionManagerLocator.class);
+
+ private static final String TM_JNDI_NAME = "java:appserver/TransactionManager";
+
+ public TransactionManager getTm() throws Exception
+ {
+ InitialContext ctx = null;
+ TransactionManager tm = null;
+
+ try
+ {
+ ctx = new InitialContext();
+ tm = (TransactionManager)ctx.lookup(TM_JNDI_NAME);
+ }
+ catch(Exception e)
+ {
+ _log.error("Error attempting to location TM " + e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ if(ctx != null)
+ {
+ ctx.close();
+ }
+ }
+ catch(Exception ignore){}
+ }
+
+ return tm;
+ }
+}
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/tm/JBoss7TransactionManagerLocator.java b/java/jca/src/main/java/org/apache/qpid/ra/tm/JBoss7TransactionManagerLocator.java
index 266c56bd63..6103433cbf 100644
--- a/java/jca/src/main/java/org/apache/qpid/ra/tm/JBoss7TransactionManagerLocator.java
+++ b/java/jca/src/main/java/org/apache/qpid/ra/tm/JBoss7TransactionManagerLocator.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.ra.tm;
import javax.naming.InitialContext;
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/tm/WLSTransactionManagerLocator.java b/java/jca/src/main/java/org/apache/qpid/ra/tm/WLSTransactionManagerLocator.java
new file mode 100644
index 0000000000..29e673d28e
--- /dev/null
+++ b/java/jca/src/main/java/org/apache/qpid/ra/tm/WLSTransactionManagerLocator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.ra.tm;
+
+
+import javax.naming.InitialContext;
+import javax.transaction.TransactionManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WLSTransactionManagerLocator
+{
+ private static final Logger _log = LoggerFactory.getLogger(WLSTransactionManagerLocator.class);
+
+ private static final String TM_JNDI_NAME = "javax.transaction.TransactionManager";
+
+ public TransactionManager getTm() throws Exception
+ {
+ InitialContext ctx = null;
+ TransactionManager tm = null;
+
+ try
+ {
+ ctx = new InitialContext();
+ tm = (TransactionManager)ctx.lookup(TM_JNDI_NAME);
+ }
+ catch(Exception e)
+ {
+ _log.error("Unable to locate javax.transaction.TransactionManager " + e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ if(ctx != null)
+ {
+ ctx.close();
+ }
+ }
+ catch(Exception ignore){}
+ }
+
+ return tm;
+ }
+}
+
diff --git a/java/jca/src/main/resources/META-INF/ra.xml b/java/jca/src/main/resources/META-INF/ra.xml
index 2c8344c8f0..a9374f52d7 100755
--- a/java/jca/src/main/resources/META-INF/ra.xml
+++ b/java/jca/src/main/resources/META-INF/ra.xml
@@ -69,7 +69,7 @@
</config-property>
<config-property>
- <description>Interval between setup attempts</description>
+ <description>Interval between setup attempts in milliseconds</description>
<config-property-name>SetupInterval</config-property-name>
<config-property-type>java.lang.Long</config-property-type>
<config-property-value>5000</config-property-value>
@@ -104,10 +104,17 @@
</config-property>
<config-property>
- <description>connection URL</description>
+ <description>Connection URL</description>
<config-property-name>ConnectionURL</config-property-name>
<config-property-type>java.lang.String</config-property-type>
- <config-property-value>amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'</config-property-value>
+ <config-property-value>amqp://anonymous:passwd@client/test?brokerlist='tcp://localhost?sasl_mechs='PLAIN''</config-property-value>
+ </config-property>
+
+ <config-property>
+ <description>Use a JMS Connection per MessageHandler</description>
+ <config-property-name>UseConnectionPerHandler</config-property-name>
+ <config-property-type>java.lang.Boolean</config-property-type>
+ <config-property-value>true</config-property-value>
</config-property>
<outbound-resourceadapter>
@@ -116,14 +123,14 @@
<config-property>
<description>Default session type</description>
- <config-property-name>sessionDefaultType</config-property-name>
+ <config-property-name>SessionDefaultType</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value>javax.jms.Queue</config-property-value>
</config-property>
<config-property>
<description>Specify lock timeout in seconds</description>
- <config-property-name>useTryLock</config-property-name>
+ <config-property-name>UseTryLock</config-property-name>
<config-property-type>java.lang.Integer</config-property-type>
<config-property-value>0</config-property-value>
</config-property>
@@ -137,7 +144,7 @@
<config-property>
<description>Client ID for the connection</description>
- <config-property-name>ClientID</config-property-name>
+ <config-property-name>ClientId</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value>client_id</config-property-value>
</config-property>
@@ -195,12 +202,11 @@
</messagelistener>
</messageadapter>
</inbound-resourceadapter>
-
<adminobject>
<adminobject-interface>org.apache.qpid.ra.admin.QpidQueue</adminobject-interface>
<adminobject-class> org.apache.qpid.ra.admin.QpidQueueImpl</adminobject-class>
<config-property>
- <config-property-name>destinationAddress </config-property-name>
+ <config-property-name>DestinationAddress </config-property-name>
<config-property-type>java.lang.String </config-property-type>
</config-property>
</adminobject>
@@ -208,30 +214,16 @@
<adminobject-interface>org.apache.qpid.ra.admin.QpidTopic</adminobject-interface>
<adminobject-class> org.apache.qpid.ra.admin.QpidTopicImpl</adminobject-class>
<config-property>
- <config-property-name>destinationAddress </config-property-name>
+ <config-property-name>DestinationAddress </config-property-name>
<config-property-type>java.lang.String </config-property-type>
</config-property>
</adminobject>
- <!--
- <adminobject>
- <adminobject-interface>javax.jms.Destination</adminobject-interface>
- <adminobject-class> org.apache.qpid.ra.admin.QpidDestinationProxy</adminobject-class>
- <config-property>
- <config-property-name>destinationAddress </config-property-name>
- <config-property-type>java.lang.String </config-property-type>
- </config-property>
- <config-property>
- <config-property-name>destinationType</config-property-name>
- <config-property-type>java.lang.String </config-property-type>
- </config-property>
- </adminobject>
- -->
<adminobject>
<adminobject-interface>javax.jms.ConnectionFactory</adminobject-interface>
<adminobject-class> org.apache.qpid.ra.admin.QpidConnectionFactoryProxy</adminobject-class>
<config-property>
- <config-property-name>connectionURL</config-property-name>
- <config-property-type>java.lang.String </config-property-type>
+ <config-property-name>ConnectionURL</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
</config-property>
</adminobject>
</resourceadapter>
diff --git a/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java b/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java
new file mode 100644
index 0000000000..e811427223
--- /dev/null
+++ b/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+import javax.resource.spi.ResourceAdapterInternalException;
+
+import org.apache.qpid.ra.inflow.QpidActivationSpec;
+
+import junit.framework.TestCase;
+
+public class QpidActivationSpecTest extends TestCase
+{
+
+ public void testActivationSpecBasicSerialization() throws Exception
+ {
+ QpidActivationSpec spec = new QpidActivationSpec();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(spec);
+ oos.close();
+ assertTrue(out.toByteArray().length > 0);
+ }
+
+}
diff --git a/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java b/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java
new file mode 100644
index 0000000000..a6788a72c5
--- /dev/null
+++ b/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import javax.resource.spi.ResourceAdapterInternalException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+import junit.framework.TestCase;
+
+
+public class QpidResourceAdapterTest extends TestCase
+{
+ public void testGetXAResources() throws Exception
+ {
+ QpidResourceAdapter ra = new QpidResourceAdapter();
+ assertNull(ra.getXAResources(null));
+ }
+
+ public void testTransactionManagerLocatorException() throws Exception
+ {
+
+ QpidResourceAdapter ra = new QpidResourceAdapter();
+ assertNull(ra.getTransactionManagerLocatorClass());
+ assertNull(ra.getTransactionManagerLocatorMethod());
+
+ try
+ {
+ ra.start(null);
+ }
+ catch(ResourceAdapterInternalException e)
+ {
+
+ }
+
+ ra.setTransactionManagerLocatorClass("DummyLocator");
+
+ try
+ {
+ ra.start(null);
+ }
+ catch(ResourceAdapterInternalException e)
+ {
+
+ }
+
+ }
+
+ public void testResourceAdapterBasicSerialization() throws Exception
+ {
+
+ QpidResourceAdapter ra = new QpidResourceAdapter();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(ra);
+ oos.close();
+ assertTrue(out.toByteArray().length > 0);
+ }
+}