diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
| commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
| tree | 1391da89470593209466df68c0b40b89c14963b1 /java/jca/src | |
| parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
| download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/jca/src')
22 files changed, 943 insertions, 790 deletions
diff --git a/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java b/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java index a7b36bc98c..3bddfd80a4 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java @@ -34,7 +34,7 @@ public class ConnectionFactoryProperties private boolean _hasBeenUpdated = false; - private String _clientID; + private String _clientId; private String _connectionURL; @@ -56,7 +56,7 @@ public class ConnectionFactoryProperties { _log.trace("getClientID()"); } - return _clientID; + return _clientId; } public void setClientId(final String clientID) @@ -66,7 +66,7 @@ public class ConnectionFactoryProperties _log.trace("setClientID(" + clientID + ")"); } _hasBeenUpdated = true; - this._clientID = clientID; + this._clientId = clientID; } public boolean isHasBeenUpdated() diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAConnectionRequestInfo.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAConnectionRequestInfo.java index c37a264ebc..779709839a 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAConnectionRequestInfo.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAConnectionRequestInfo.java @@ -45,7 +45,7 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo private String _password; /** The client id */ - private String _clientID; + private String _clientId; /** The type */ private final int _type; @@ -76,13 +76,13 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo final ConnectionURL connectionURL = ra.getDefaultAMQConnectionFactory().getConnectionURL() ; _userName = connectionURL.getUsername(); _password = connectionURL.getPassword(); - _clientID = connectionURL.getClientName(); + _clientId = connectionURL.getClientName(); } else { - _userName = ra.getDefaultUserName(); - _password = ra.getDefaultPassword(); - _clientID = ra.getClientId(); + _userName = ra.getUserName(); + _password = ra.getPassword(); + _clientId = ra.getClientId(); } this._type = type; _transacted = true; @@ -142,9 +142,9 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo { _password = connectionURL.getPassword(); } - if (_clientID == null) + if (_clientId == null) { - _clientID = connectionURL.getClientName(); + _clientId = connectionURL.getClientName(); } } @@ -170,15 +170,15 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo { if (_userName == null) { - _userName = ra.getDefaultUserName(); + _userName = ra.getUserName(); } if (_password == null) { - _password = ra.getDefaultPassword(); + _password = ra.getPassword(); } - if (_clientID == null) + if (_clientId == null) { - _clientID = ra.getClientId(); + _clientId = ra.getClientId(); } } } @@ -243,28 +243,28 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo * Get the client id * @return The value */ - public String getClientID() + public String getClientId() { if (_log.isTraceEnabled()) { _log.trace("getClientID()"); } - return _clientID; + return _clientId; } /** * Set the client id * @param clientID The value */ - public void setClientID(final String clientID) + public void setClientId(final String clientID) { if (_log.isTraceEnabled()) { _log.trace("setClientID(" + clientID + ")"); } - this._clientID = clientID; + this._clientId = clientID; } /** @@ -321,7 +321,7 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo { QpidRAConnectionRequestInfo you = (QpidRAConnectionRequestInfo)obj; return Util.compare(_userName, you.getUserName()) && Util.compare(_password, you.getPassword()) && - Util.compare(_clientID, you.getClientID()) && + Util.compare(_clientId, you.getClientId()) && _type == you.getType() && _transacted == you.isTransacted() && _acknowledgeMode == you.getAcknowledgeMode(); @@ -343,7 +343,7 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo hash += 31 * hash + (_userName != null ? _userName.hashCode() : 0); hash += 31 * hash + (_password != null ? _password.hashCode() : 0); - hash += 31 * hash + (_clientID != null ? _clientID.hashCode() : 0); + hash += 31 * hash + (_clientId != null ? _clientId.hashCode() : 0); hash += 31 * hash + _type; hash += 31 * hash + (_transacted ? 1 : 0); hash += 31 * hash + _acknowledgeMode; @@ -356,6 +356,6 @@ public class QpidRAConnectionRequestInfo implements ConnectionRequestInfo { return "QpidRAConnectionRequestInfo[type=" + _type + ", transacted=" + _transacted + ", acknowledgeMode=" + _acknowledgeMode + - ", clientID=" + _clientID + ", userName=" + _userName + ((_password != null) ? ", password=********]" :"]"); + ", clientID=" + _clientId + ", userName=" + _userName + ((_password != null) ? ", password=********]" :"]"); } } diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java index 53896d8872..eccf77aff2 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java @@ -34,10 +34,11 @@ import java.util.concurrent.locks.ReentrantLock; import javax.jms.Connection; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import javax.jms.QueueConnection; import javax.jms.ResourceAllocationException; import javax.jms.Session; -import javax.jms.QueueConnection; import javax.jms.TopicConnection; +import javax.jms.XAConnection; import javax.jms.XAQueueConnection; import javax.jms.XASession; import javax.jms.XATopicConnection; @@ -260,7 +261,20 @@ public class QpidRAManagedConnection implements ManagedConnection, ExceptionList } catch (JMSException e) { - _log.debug("Error closing session " + this, e); + _log.debug("Error closing XASession " + this, e); + } + + try + { + if(_session != null) + { + _session.close(); + } + + } + catch(JMSException e) + { + _log.error("Error closing Session " + this, e); } if (_connection != null) @@ -585,7 +599,7 @@ public class QpidRAManagedConnection implements ManagedConnection, ExceptionList */ protected Session getSession() throws JMSException { - if(_xaSession != null && !_mcf.getUseLocalTx()) + if(_xaSession != null && !_mcf.getUseLocalTx() && _inManagedTx) { if (_log.isTraceEnabled()) { @@ -761,107 +775,44 @@ public class QpidRAManagedConnection implements ManagedConnection, ExceptionList { if (_userName != null && _password != null) { - if(!transacted) - { - _connection = _mcf.getCleanAMQConnectionFactory().createXATopicConnection(_userName, _password); - } - else - { - _connection = _mcf.getCleanAMQConnectionFactory().createTopicConnection(_userName, _password); - } + _connection = _mcf.getCleanAMQConnectionFactory().createXATopicConnection(_userName, _password); } else { - if(!transacted) - { - _connection = _mcf.getDefaultAMQConnectionFactory().createXATopicConnection(); - } - else - { - _connection = _mcf.getDefaultAMQConnectionFactory().createTopicConnection(); - } + _connection = _mcf.getDefaultAMQConnectionFactory().createXATopicConnection(); } - if(!transacted) - { - _xaSession = ((XATopicConnection)_connection).createXATopicSession(); - } - else - { - _session = ((TopicConnection)_connection).createTopicSession(transacted, acknowledgeMode); - } + _xaSession = ((XATopicConnection)_connection).createXATopicSession(); + _session = ((TopicConnection)_connection).createTopicSession(transacted, acknowledgeMode); + } else if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION) { if (_userName != null && _password != null) { - if(!transacted) - { - _connection = _mcf.getCleanAMQConnectionFactory().createXAQueueConnection(_userName, _password); - } - else - { - _connection = _mcf.getCleanAMQConnectionFactory().createQueueConnection(_userName, _password); - } + _connection = _mcf.getCleanAMQConnectionFactory().createXAQueueConnection(_userName, _password); } else { - if(!transacted) - { - _connection = _mcf.getDefaultAMQConnectionFactory().createXAQueueConnection(); - } - else - { - _connection = _mcf.getDefaultAMQConnectionFactory().createQueueConnection(); - } + _connection = _mcf.getDefaultAMQConnectionFactory().createXAQueueConnection(); } - if(!transacted) - { - _xaSession = ((XAQueueConnection)_connection).createXAQueueSession(); + _xaSession = ((XAQueueConnection)_connection).createXAQueueSession(); + _session = ((QueueConnection)_connection).createQueueSession(transacted, acknowledgeMode); - } - else - { - _session = ((QueueConnection)_connection).createQueueSession(transacted, acknowledgeMode); - - } } else { if (_userName != null && _password != null) { - if(!transacted) - { - _connection = _mcf.getCleanAMQConnectionFactory().createXAConnection(_userName, _password); - } - else - { - _connection = _mcf.getCleanAMQConnectionFactory().createConnection(_userName, _password); - } - } - else - { - if(!transacted) - { - _connection = _mcf.getDefaultAMQConnectionFactory().createXAConnection(); - } - else - { - _connection = _mcf.getDefaultAMQConnectionFactory().createConnection(); - } - } - - if(!transacted) - { - _xaSession = ((XAQueueConnection)_connection).createXASession(); - + _connection = _mcf.getCleanAMQConnectionFactory().createXAConnection(_userName, _password); } else { - _session = ((QueueConnection)_connection).createSession(transacted, acknowledgeMode); - + _connection = _mcf.getDefaultAMQConnectionFactory().createXAConnection(); } + _xaSession = ((XAConnection)_connection).createXASession(); + _session = _connection.createSession(transacted, acknowledgeMode); } _connection.setExceptionListener(this); diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java index 318485a7f2..8744a9deec 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java @@ -365,12 +365,12 @@ public class QpidRAManagedConnectionFactory implements ManagedConnectionFactory, _mcfProperties.setSessionDefaultType(type); } - public String getClientID() + public String getClientId() { return _mcfProperties.getClientId(); } - public void setClientID(final String clientID) + public void setClientId(final String clientID) { _mcfProperties.setClientId(clientID); } diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java index 21f7d2574f..69320575b0 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java @@ -31,10 +31,8 @@ import org.slf4j.LoggerFactory; */ public class QpidRAProperties extends ConnectionFactoryProperties implements Serializable { - /** Serial version UID */ private static final long serialVersionUID = -4823893873707374791L; - /** The logger */ private static final Logger _log = LoggerFactory.getLogger(QpidRAProperties.class); private static final int DEFAULT_SETUP_ATTEMPTS = 10; @@ -45,16 +43,14 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser private long _setupInterval = DEFAULT_SETUP_INTERVAL; - /** Use Local TX instead of XA */ - private Boolean _localTx = false; - /** Class used to locate the Transaction Manager. */ private String _transactionManagerLocatorClass ; /** Method used to locate the TM */ private String _transactionManagerLocatorMethod ; - + private boolean _useConnectionPerHandler = true; + /** * Constructor */ @@ -66,34 +62,6 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser } } - /** - * Get the use XA flag - * @return The value - */ - public Boolean getUseLocalTx() - { - if (_log.isTraceEnabled()) - { - _log.trace("getUseLocalTx()"); - } - - return _localTx; - } - - /** - * Set the use XA flag - * @param localTx The value - */ - public void setUseLocalTx(final Boolean localTx) - { - if (_log.isTraceEnabled()) - { - _log.trace("setUseLocalTx(" + localTx + ")"); - } - - this._localTx = localTx; - } - public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass) { if (_log.isTraceEnabled()) @@ -174,10 +142,20 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser this._setupInterval = setupInterval; } + public boolean isUseConnectionPerHandler() + { + return _useConnectionPerHandler; + } + + public void setUseConnectionPerHandler(boolean connectionPerHandler) + { + this._useConnectionPerHandler = connectionPerHandler; + } + @Override public String toString() { - return "QpidRAProperties[localTx=" + _localTx + + return "QpidRAProperties[" + ", transactionManagerLocatorClass=" + _transactionManagerLocatorClass + ", transactionManagerLocatorMethod=" + _transactionManagerLocatorMethod + ", setupAttempts=" + _setupAttempts + diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java index 081677ca4b..a72f51da51 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java @@ -30,4 +30,6 @@ public interface QpidRASession public void start() throws JMSException; public void close() throws JMSException; + + public void closeSession() throws JMSException; } diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java index cf28d5bba1..2747282a3c 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java @@ -58,5 +58,5 @@ public interface QpidRASessionFactory extends Connection, TopicConnection, Queue * @param session The session * @throws JMSException for any error */ - void closeSession(QpidRASessionImpl session) throws JMSException; + void closeSession(QpidRASession session) throws JMSException; } diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java index e2bc2d2008..f3253e1400 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java @@ -567,7 +567,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference _started = true; for (Iterator<QpidRASession> i = _sessions.iterator(); i.hasNext();) { - QpidRASessionImpl session = (QpidRASessionImpl)i.next(); + QpidRASession session = (QpidRASession)i.next(); session.start(); } } @@ -609,7 +609,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference { for (Iterator<QpidRASession> i = _sessions.iterator(); i.hasNext();) { - QpidRASessionImpl session = (QpidRASessionImpl)i.next(); + QpidRASession session = (QpidRASession)i.next(); try { session.closeSession(); @@ -670,7 +670,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference * @param session The session * @exception JMSException Thrown if an error occurs */ - public void closeSession(final QpidRASessionImpl session) throws JMSException + public void closeSession(final QpidRASession session) throws JMSException { if (_log.isTraceEnabled()) { @@ -679,7 +679,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference synchronized (_sessions) { - _sessions.remove(session); + _sessions.clear(); } } @@ -742,7 +742,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference QpidRAConnectionRequestInfo info = new QpidRAConnectionRequestInfo(sessionType); info.setUserName(_userName); info.setPassword(_password); - info.setClientID(_clientID); + info.setClientId(_clientID); info.setDefaults(_mcf.getDefaultAMQConnectionFactory().getConnectionURL()); if (_log.isTraceEnabled()) @@ -839,7 +839,7 @@ public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Reference sessionType); info.setUserName(_userName); info.setPassword(_password); - info.setClientID(_clientID); + info.setClientId(_clientID); info.setDefaults(_mcf.getDefaultAMQConnectionFactory().getConnectionURL()); if (_log.isTraceEnabled()) diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java index fdd4888a3d..c4cfeaba48 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java @@ -1446,7 +1446,7 @@ public class QpidRASessionImpl implements Session, QueueSession, TopicSession, X * Close session * @exception JMSException Thrown if an error occurs */ - void closeSession() throws JMSException + public void closeSession() throws JMSException { final QpidRAManagedConnection mc = this._mc; if (mc != null) diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java index 363af1bbcd..96fa83ceef 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java @@ -38,8 +38,6 @@ import javax.resource.spi.work.WorkManager; import javax.transaction.TransactionManager; import javax.transaction.xa.XAResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; @@ -47,6 +45,8 @@ import org.apache.qpid.client.XAConnectionImpl; import org.apache.qpid.ra.inflow.QpidActivation; import org.apache.qpid.ra.inflow.QpidActivationSpec; import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The resource adapter for Qpid @@ -54,43 +54,22 @@ import org.apache.qpid.url.URLSyntaxException; */ public class QpidResourceAdapter implements ResourceAdapter, Serializable { - /** - * - */ private static final long serialVersionUID = -2446231446818098726L; - /** - * The logger - */ - private static final Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class); + private static final transient Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class); - /** - * The bootstrap context - */ private BootstrapContext _ctx; - /** - * The resource adapter properties - */ private final QpidRAProperties _raProperties; - /** - * Have the factory been configured - */ private final AtomicBoolean _configured; - /** - * The activations by activation spec - */ private final Map<ActivationSpec, QpidActivation> _activations; private AMQConnectionFactory _defaultAMQConnectionFactory; private TransactionManager _tm; - /** - * Constructor - */ public QpidResourceAdapter() { if (_log.isTraceEnabled()) @@ -223,65 +202,6 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable _log.info("Qpid resource adapter stopped"); } - /** - * Get the user name - * - * @return The value - */ - public String getDefaultUserName() - { - if (_log.isTraceEnabled()) - { - _log.trace("getUserName()"); - } - - return _raProperties.getUserName(); - } - - /** - * Set the user name - * - * @param userName The value - */ - public void setDefaultUserName(final String userName) - { - if (_log.isTraceEnabled()) - { - _log.trace("setUserName(" + userName + ")"); - } - - _raProperties.setUserName(userName); - } - - /** - * Get the password - * - * @return The value - */ - public String getDefaultPassword() - { - if (_log.isTraceEnabled()) - { - _log.trace("getPassword()"); - } - - return _raProperties.getPassword(); - } - - /** - * Set the password - * - * @param password The value - */ - public void setDefaultPassword(final String password) - { - if (_log.isTraceEnabled()) - { - _log.trace("setPassword(****)"); - } - - _raProperties.setPassword(password); - } /** * Get the client ID @@ -403,6 +323,26 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable _raProperties.setPath(path); } + public String getUserName() + { + return _raProperties.getUserName(); + } + + public void setUserName(String userName) + { + _raProperties.setUserName(userName); + } + + public String getPassword() + { + return _raProperties.getPassword(); + } + + public void setPassword(String password) + { + _raProperties.setPassword(password); + } + /** * Get the connection url * @@ -493,14 +433,14 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable * * @return The value */ - public Boolean getUseLocalTx() + public Boolean isUseLocalTx() { if (_log.isTraceEnabled()) { _log.trace("getUseLocalTx()"); } - return _raProperties.getUseLocalTx(); + return _raProperties.isUseLocalTx(); } /** @@ -553,7 +493,27 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable } _raProperties.setSetupInterval(interval); } + + public Boolean isUseConnectionPerHandler() + { + if (_log.isTraceEnabled()) + { + _log.trace("isConnectionPerHandler()"); + } + + return _raProperties.isUseConnectionPerHandler(); + } + public void setUseConnectionPerHandler(Boolean connectionPerHandler) + { + if (_log.isTraceEnabled()) + { + _log.trace("setConnectionPerHandler(" + connectionPerHandler + ")"); + } + + _raProperties.setUseConnectionPerHandler(connectionPerHandler); + } + /** * Indicates whether some other object is "equal to" this one. * @@ -720,9 +680,10 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable return map; } - private void locateTM() + private void locateTM() throws ResourceAdapterInternalException { - if(_raProperties.getTransactionManagerLocatorClass() != null && _raProperties.getTransactionManagerLocatorMethod() != null) + if(_raProperties.getTransactionManagerLocatorClass() != null + && _raProperties.getTransactionManagerLocatorMethod() != null) { String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";"); @@ -742,8 +703,8 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable if (_tm == null) { - _log.warn("It wasn't possible to lookup a Transaction Manager through the configured properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod"); - _log.warn("Qpid Resource Adapter won't be able to set and verify transaction timeouts in certain cases."); + _log.error("It was not possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod"); + throw new ResourceAdapterInternalException("Could not locate javax.transaction.TransactionManager"); } else { @@ -802,6 +763,7 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable final String client = (clientID != null ? clientID : "") ; final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ; + if (_log.isDebugEnabled()) { _log.debug("Initialising connectionURL to " + newurl) ; diff --git a/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java b/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java index 41242fefae..a948948d6a 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java @@ -77,17 +77,17 @@ public class QpidConnectionFactoryProxy implements Externalizable, Referenceable public void writeExternal(ObjectOutput out) throws IOException { - if (_delegate == null) - { - _log.error("Null Destination "); - throw new IOException("Null ConnectionFactory!"); - } try { + if(_delegate == null) + { + getReference(); + } + out.writeObject(((Referenceable) _delegate).getReference()); } - catch (NamingException e) + catch (Exception e) { _log.error("Failed to dereference ConnectionFactory " + e.getMessage(), e); throw new IOException("Failed to dereference ConnectionFactory: " + e.getMessage()); @@ -137,7 +137,20 @@ public class QpidConnectionFactoryProxy implements Externalizable, Referenceable */ public Connection createConnection() throws JMSException { - return _delegate.createConnection(); + try + { + if(_delegate == null) + { + getReference(); + } + + return _delegate.createConnection(); + } + catch(Exception e) + { + throw new JMSException(e.getMessage()); + } + } /** diff --git a/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java b/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java index 417849fc5c..162099d1ee 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java @@ -21,25 +21,15 @@ package org.apache.qpid.ra.admin; import java.io.Externalizable; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInput; -import java.io.ObjectInputStream; import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.util.Hashtable; -import javax.naming.Context; -import javax.naming.Name; import javax.naming.NamingException; -import javax.naming.RefAddr; import javax.naming.Reference; import javax.naming.StringRefAddr; -import javax.naming.spi.ObjectFactory; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.ra.admin.AdminObjectFactory; public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable { @@ -101,19 +91,4 @@ public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable out.writeObject(this._url); } - //TODO move to tests - public static void main(String[] args) throws Exception - { - QpidQueueImpl q = new QpidQueueImpl(); - q.setDestinationAddress("hello.Queue;{create:always, node:{type:queue, x-declare:{auto-delete:true}}}"); - ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("queue.out")); - os.writeObject(q); - os.close(); - - - ObjectInputStream is = new ObjectInputStream(new FileInputStream("queue.out")); - q = (QpidQueueImpl)is.readObject(); - System.out.println(q); - - } } diff --git a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java index 57edec8eee..2327512a62 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java @@ -20,114 +20,28 @@ */ package org.apache.qpid.ra.inflow; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.InitialContext; import javax.resource.ResourceException; import javax.resource.spi.endpoint.MessageEndpointFactory; import javax.resource.spi.work.Work; -import javax.resource.spi.work.WorkManager; +import org.apache.qpid.ra.QpidResourceAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.XAConnectionImpl; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.ra.QpidResourceAdapter; -import org.apache.qpid.ra.Util; - /** * The activation. * */ -public class QpidActivation implements ExceptionListener +public class QpidActivation extends QpidExceptionHandler { - /** - * The logger - */ private static final Logger _log = LoggerFactory.getLogger(QpidActivation.class); - /** - * The onMessage method - */ - public static final Method ONMESSAGE; - - /** - * The resource adapter - */ - private final QpidResourceAdapter _ra; - - /** - * The activation spec - */ - private final QpidActivationSpec _spec; - - /** - * The message endpoint factory - */ - private final MessageEndpointFactory _endpointFactory; - - /** - * Whether delivery is active - */ - private final AtomicBoolean _deliveryActive = new AtomicBoolean(false); - - /** - * The destination type - */ - private boolean _isTopic = false; - - /** - * Is the delivery transacted - */ - private boolean _isDeliveryTransacted; - - private Destination _destination; - - /** - * The connection - */ - private Connection _connection; - private final List<QpidMessageHandler> _handlers = new ArrayList<QpidMessageHandler>(); - private AMQConnectionFactory _factory; - - // Whether we are in the failure recovery loop - private AtomicBoolean _inFailure = new AtomicBoolean(false); - - //Whether or not we have completed activating - private AtomicBoolean _activated = new AtomicBoolean(false); - - static - { - try - { - ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class }); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - + /** * Constructor * @@ -140,97 +54,8 @@ public class QpidActivation implements ExceptionListener final MessageEndpointFactory endpointFactory, final QpidActivationSpec spec) throws ResourceException { - if (_log.isTraceEnabled()) - { - _log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")"); - } - - this._ra = ra; - this._endpointFactory = endpointFactory; - this._spec = spec; - try - { - _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE); - } - catch (Exception e) - { - throw new ResourceException(e); - } - } - - /** - * Get the activation spec - * - * @return The value - */ - public QpidActivationSpec getActivationSpec() - { - if (_log.isTraceEnabled()) - { - _log.trace("getActivationSpec()"); - } - - return _spec; - } - - /** - * Get the message endpoint factory - * - * @return The value - */ - public MessageEndpointFactory getMessageEndpointFactory() - { - if (_log.isTraceEnabled()) - { - _log.trace("getMessageEndpointFactory()"); - } - - return _endpointFactory; - } - - /** - * Get whether delivery is transacted - * - * @return The value - */ - public boolean isDeliveryTransacted() - { - if (_log.isTraceEnabled()) - { - _log.trace("isDeliveryTransacted()"); - } - - return _isDeliveryTransacted; - } - - /** - * Get the work manager - * - * @return The value - */ - public WorkManager getWorkManager() - { - if (_log.isTraceEnabled()) - { - _log.trace("getWorkManager()"); - } - - return _ra.getWorkManager(); - } - - /** - * Is the destination a topic - * - * @return The value - */ - public boolean isTopic() - { - if (_log.isTraceEnabled()) - { - _log.trace("isTopic()"); - } - - return _isTopic; + super(ra, spec, endpointFactory); + } /** @@ -267,70 +92,62 @@ public class QpidActivation implements ExceptionListener * * @throws Exception Thrown if an error occurs */ - protected synchronized void setup() throws Exception + public synchronized void setup() throws Exception { _log.debug("Setting up " + _spec); - setupCF(); - + setupCF(); setupDestination(); - final AMQConnection amqConnection ; - final boolean useLocalTx = _spec.isUseLocalTx() ; - final boolean isXA = _isDeliveryTransacted && !useLocalTx ; - - if (isXA) + + if(!_spec.isUseConnectionPerHandler()) { - amqConnection = (XAConnectionImpl)_factory.createXAConnection() ; + setupConnection(); + _connection.setExceptionListener(this); } - else - { - amqConnection = (AMQConnection)_factory.createConnection() ; - } - - amqConnection.setExceptionListener(this) ; - + for (int i = 0; i < _spec.getMaxSession(); i++) { - Session session = null; - - try - { - if (isXA) - { - session = _ra.createXASession((XAConnectionImpl)amqConnection) ; - } - else - { - session = _ra.createSession((AMQConnection)amqConnection, - _spec.getAcknowledgeModeInt(), - useLocalTx, - _spec.getPrefetchLow(), - _spec.getPrefetchHigh()); - } - - _log.debug("Using session " + Util.asString(session)); - QpidMessageHandler handler = new QpidMessageHandler(this, _ra.getTM(), session); - handler.setup(); - _handlers.add(handler); - } - catch (Exception e) - { - try - { - amqConnection.close() ; - } - catch (Exception e2) - { - _log.trace("Ignored error closing connection", e2); - } - - throw e; - } + try + { + QpidMessageHandler handler = null; + + if(_spec.isUseConnectionPerHandler()) + { + handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM()); + } + else + { + handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM(), _connection); + } + + handler.start(); + _handlers.add(handler); + } + catch(Exception e) + { + try + { + if(_connection != null) + { + this._connection.close(); + } + } + catch (Exception e2) + { + _log.trace("Ignored error closing connection", e2); + } + + throw e; + + } + + } + + if(!_spec.isUseConnectionPerHandler()) + { + this._connection.start(); + _activated.set(true); } - amqConnection.start() ; - this._connection = amqConnection ; - _activated.set(true); - _log.debug("Setup complete " + this); } @@ -340,136 +157,17 @@ public class QpidActivation implements ExceptionListener protected synchronized void teardown() { _log.debug("Tearing down " + _spec); - - try - { - if (_connection != null) - { - _connection.stop(); - } - } - catch (Throwable t) - { - _log.debug("Error stopping connection " + Util.asString(_connection), t); - } + + super.teardown(); for (QpidMessageHandler handler : _handlers) { - handler.teardown(); + handler.stop(); } - try - { - if (_connection != null) - { - _connection.close(); - } - } - catch (Throwable t) - { - _log.debug("Error closing connection " + Util.asString(_connection), t); - } - if (_spec.isHasBeenUpdated()) - { - _factory = null; - } _log.debug("Tearing down complete " + this); } - protected void setupCF() throws Exception - { - if (_spec.isHasBeenUpdated()) - { - _factory = _ra.createAMQConnectionFactory(_spec); - } - else - { - _factory = _ra.getDefaultAMQConnectionFactory(); - } - } - - public Destination getDestination() - { - return _destination; - } - - protected void setupDestination() throws Exception - { - - String destinationName = _spec.getDestination(); - String destinationTypeString = _spec.getDestinationType(); - - if (_spec.isUseJNDI()) - { - Context ctx = new InitialContext(); - _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec); - if (_log.isTraceEnabled()) - { - _log.trace("setupDestination(" + ctx + ")"); - } - - if (destinationTypeString != null && !destinationTypeString.trim().equals("")) - { - _log.debug("Destination type defined as " + destinationTypeString); - - Class<? extends Destination> destinationType; - if (Topic.class.getName().equals(destinationTypeString)) - { - destinationType = Topic.class; - _isTopic = true; - } - else - { - destinationType = Queue.class; - } - - _log.debug("Retrieving destination " + destinationName + - " of type " + - destinationType.getName()); - _destination = Util.lookup(ctx, destinationName, destinationType); - - } - else - { - _log.debug("Destination type not defined"); - _log.debug("Retrieving destination " + destinationName + - " of type " + - Destination.class.getName()); - - _destination = Util.lookup(ctx, destinationName, AMQDestination.class); - _isTopic = !(_destination instanceof Queue) ; - } - } - else - { - _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination()); - if (destinationTypeString != null && !destinationTypeString.trim().equals("")) - { - _log.debug("Destination type defined as " + destinationTypeString); - final boolean match ; - if (Topic.class.getName().equals(destinationTypeString)) - { - match = (_destination instanceof Topic) ; - _isTopic = true; - } - else - { - match = (_destination instanceof Queue) ; - } - if (!match) - { - throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ; - } - } - else - { - _isTopic = !(_destination instanceof Queue) ; - } - } - - _log.debug("Got destination " + _destination + " from " + destinationName); - } - /** * Get a string representation * @@ -492,94 +190,7 @@ public class QpidActivation implements ExceptionListener return buffer.toString(); } - public void onException(final JMSException jmse) - { - if(_activated.get()) - { - handleFailure(jmse) ; - } - else - { - _log.warn("Received JMSException: " + jmse + " while endpoint was not activated."); - } - } - - /** - * Handles any failure by trying to reconnect - * - * @param failure the reason for the failure - */ - public void handleFailure(Throwable failure) - { - if(doesNotExist(failure)) - { - _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination()); - } - else - { - _log.warn("Failure in Qpid activation " + _spec, failure); - } - int reconnectCount = 0; - int setupAttempts = _spec.getSetupAttempts(); - long setupInterval = _spec.getSetupInterval(); - - // Only enter the failure loop once - if (_inFailure.getAndSet(true)) - return; - try - { - while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts)) - { - teardown(); - - try - { - Thread.sleep(setupInterval); - } - catch (InterruptedException e) - { - _log.debug("Interrupted trying to reconnect " + _spec, e); - break; - } - - _log.info("Attempting to reconnect " + _spec); - try - { - setup(); - _log.info("Reconnected with Qpid"); - break; - } - catch (Throwable t) - { - if(doesNotExist(failure)) - { - _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination()); - } - else - { - _log.error("Unable to reconnect " + _spec, t); - } - } - ++reconnectCount; - } - } - finally - { - // Leaving failure recovery loop - _inFailure.set(false); - } - } - - /** - * Check to see if the failure represents a missing endpoint - * @param failure The failure. - * @return true if it represents a missing endpoint, false otherwise - */ - private boolean doesNotExist(final Throwable failure) - { - return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ; - } - + /** * Handles the setup */ diff --git a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java index 5f4e2dcf6b..3d9a88adb5 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java @@ -28,10 +28,10 @@ import javax.resource.spi.ActivationSpec; import javax.resource.spi.InvalidPropertyException; import javax.resource.spi.ResourceAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.qpid.ra.ConnectionFactoryProperties; import org.apache.qpid.ra.QpidResourceAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The activation spec @@ -88,6 +88,8 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A // undefined by default, default is specified at the RA level in QpidRAProperties private Long _setupInterval; + private Boolean _useConnectionPerHandler; + /** * Constructor */ @@ -544,6 +546,16 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A this._setupInterval = setupInterval; } + public Boolean isUseConnectionPerHandler() + { + return (_useConnectionPerHandler == null) ? _ra.isUseConnectionPerHandler() : _useConnectionPerHandler; + } + + public void setUseConnectionPerHandler(Boolean connectionPerHandler) + { + this._useConnectionPerHandler = connectionPerHandler; + } + /** * Validate * @exception InvalidPropertyException Thrown if a validation exception occurs @@ -561,6 +573,7 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A } } + /** * Get a string representation * @return The value @@ -573,23 +586,30 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A buffer.append("ra=").append(_ra); buffer.append(" destination=").append(_destination); buffer.append(" destinationType=").append(_destinationType); + if (_messageSelector != null) { buffer.append(" selector=").append(_messageSelector); } + buffer.append(" ack=").append(getAcknowledgeMode()); buffer.append(" durable=").append(_subscriptionDurability); buffer.append(" clientID=").append(getClientId()); + if (_subscriptionName != null) { buffer.append(" subscription=").append(_subscriptionName); } + buffer.append(" user=").append(getUserName()); + if (getPassword() != null) { - buffer.append(" password=").append("****"); + buffer.append(" password=").append("********"); } + buffer.append(" maxSession=").append(_maxSession); + if (_prefetchLow != null) { buffer.append(" prefetchLow=").append(_prefetchLow); @@ -598,7 +618,10 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A { buffer.append(" prefetchHigh=").append(_prefetchHigh); } + + buffer.append(" connectionPerHandler=").append(isUseConnectionPerHandler()); buffer.append(')'); + return buffer.toString(); } } diff --git a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java new file mode 100644 index 0000000000..8775362eb5 --- /dev/null +++ b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java @@ -0,0 +1,339 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ra.inflow; + +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.jms.XAConnectionFactory; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.resource.ResourceException; +import javax.resource.spi.endpoint.MessageEndpointFactory; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.ra.QpidResourceAdapter; +import org.apache.qpid.ra.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class QpidExceptionHandler implements ExceptionListener +{ + private static final Logger _log = LoggerFactory.getLogger(QpidExceptionHandler.class); + + public static final Method ONMESSAGE; + + protected final MessageEndpointFactory _endpointFactory; + + protected Connection _connection; + + protected ConnectionFactory _factory; + + protected Destination _destination; + + protected final QpidResourceAdapter _ra; + + protected final QpidActivationSpec _spec; + + protected boolean _isDeliveryTransacted; + + protected final AtomicBoolean _deliveryActive = new AtomicBoolean(false); + + protected boolean _isTopic = false; + + // Whether we are in the failure recovery loop + protected AtomicBoolean _inFailure = new AtomicBoolean(false); + + //Whether or not we have completed activating + protected AtomicBoolean _activated = new AtomicBoolean(false); + + static + { + try + { + ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class }); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public abstract void setup() throws Exception; + public abstract void start() throws Exception; + public abstract void stop(); + + protected QpidExceptionHandler(QpidResourceAdapter ra, + QpidActivationSpec spec, + MessageEndpointFactory endpointFactory) throws ResourceException + { + this._ra = ra; + this._spec = spec; + this._endpointFactory = endpointFactory; + + try + { + _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE); + } + catch (Exception e) + { + throw new ResourceException(e); + } + + + } + + public void onException(JMSException e) + { + if(_activated.get()) + { + handleFailure(e) ; + } + else + { + _log.warn("Received JMSException: " + e + " while endpoint was not activated."); + } + } + + /** + * Handles any failure by trying to reconnect + * + * @param failure the reason for the failure + */ + public void handleFailure(Throwable failure) + { + if(doesNotExist(failure)) + { + _log.info("awaiting topic/queue creation " + _spec.getDestination()); + } + else + { + _log.warn("Failure in Qpid activation " + _spec, failure); + } + int reconnectCount = 0; + int setupAttempts = _spec.getSetupAttempts(); + long setupInterval = _spec.getSetupInterval(); + + // Only enter the failure loop once + if (_inFailure.getAndSet(true)) + return; + try + { + while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts)) + { + teardown(); + + try + { + Thread.sleep(setupInterval); + } + catch (InterruptedException e) + { + _log.debug("Interrupted trying to reconnect " + _spec, e); + break; + } + + _log.info("Attempting to reconnect " + _spec); + try + { + setup(); + _log.info("Reconnected with Qpid"); + break; + } + catch (Throwable t) + { + if(doesNotExist(failure)) + { + _log.info("awaiting topic/queue creation " + _spec.getDestination()); + } + else + { + _log.error("Unable to reconnect " + _spec, t); + } + } + ++reconnectCount; + } + } + finally + { + // Leaving failure recovery loop + _inFailure.set(false); + } + } + + /** + * Check to see if the failure represents a missing endpoint + * @param failure The failure. + * @return true if it represents a missing endpoint, false otherwise + */ + protected boolean doesNotExist(final Throwable failure) + { + return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ; + } + + protected boolean isXA() + { + return _isDeliveryTransacted && !_spec.isUseLocalTx(); + } + + protected void setupConnection() throws Exception + { + this._connection = (isXA()) ? ((XAConnectionFactory)_factory).createXAConnection() : _factory.createConnection(); + } + + protected synchronized void teardown() + { + _log.debug("Tearing down " + _spec); + + try + { + if (_connection != null) + { + _connection.stop(); + } + } + catch (Throwable t) + { + _log.debug("Error stopping connection " + Util.asString(_connection), t); + } + + try + { + if (_connection != null) + { + _connection.close(); + } + } + catch (Throwable t) + { + _log.debug("Error closing connection " + Util.asString(_connection), t); + } + if (_spec.isHasBeenUpdated()) + { + _factory = null; + } + _log.debug("Tearing down complete " + this); + } + + protected void setupCF() throws Exception + { + if (_spec.isHasBeenUpdated()) + { + _factory = _ra.createAMQConnectionFactory(_spec); + } + else + { + _factory = _ra.getDefaultAMQConnectionFactory(); + } + } + + protected void setupDestination() throws Exception + { + + String destinationName = _spec.getDestination(); + String destinationTypeString = _spec.getDestinationType(); + + if (_spec.isUseJNDI()) + { + Context ctx = new InitialContext(); + _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec); + if (_log.isTraceEnabled()) + { + _log.trace("setupDestination(" + ctx + ")"); + } + + if (destinationTypeString != null && !destinationTypeString.trim().equals("")) + { + _log.debug("Destination type defined as " + destinationTypeString); + + Class<? extends Destination> destinationType; + if (Topic.class.getName().equals(destinationTypeString)) + { + destinationType = Topic.class; + _isTopic = true; + } + else + { + destinationType = Queue.class; + } + + _log.debug("Retrieving destination " + destinationName + + " of type " + + destinationType.getName()); + _destination = Util.lookup(ctx, destinationName, destinationType); + + } + else + { + _log.debug("Destination type not defined"); + _log.debug("Retrieving destination " + destinationName + + " of type " + + Destination.class.getName()); + + _destination = Util.lookup(ctx, destinationName, AMQDestination.class); + _isTopic = !(_destination instanceof Queue) ; + } + } + else + { + _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination()); + + if (destinationTypeString != null && !destinationTypeString.trim().equals("")) + { + _log.debug("Destination type defined as " + destinationTypeString); + final boolean match ; + if (Topic.class.getName().equals(destinationTypeString)) + { + match = (_destination instanceof Topic) ; + _isTopic = true; + } + else + { + match = (_destination instanceof Queue) ; + } + if (!match) + { + throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ; + } + } + else + { + _isTopic = !(_destination instanceof Queue) ; + } + } + + _log.debug("Got destination " + _destination + " from " + destinationName); + } + + + +} diff --git a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java index 473efab31f..a02adf0dad 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.ra.inflow; +import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -35,6 +36,9 @@ import javax.transaction.Status; import javax.transaction.TransactionManager; import javax.transaction.xa.XAResource; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.XAConnectionImpl; +import org.apache.qpid.ra.QpidResourceAdapter; import org.apache.qpid.ra.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,82 +47,100 @@ import org.slf4j.LoggerFactory; * The message handler * */ -public class QpidMessageHandler implements MessageListener +public class QpidMessageHandler extends QpidExceptionHandler implements MessageListener { - /** - * The logger - */ private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class); - /** - * The session - */ - private final Session _session; - private MessageConsumer _consumer; - /** - * The endpoint - */ private MessageEndpoint _endpoint; - private final QpidActivation _activation; - - private boolean _useLocalTx; - - private boolean _transacted; + private Session _session; private final TransactionManager _tm; - public QpidMessageHandler(final QpidActivation activation, - final TransactionManager tm, - final Session session) + public QpidMessageHandler(final QpidResourceAdapter ra, + final QpidActivationSpec spec, + final MessageEndpointFactory endpointFactory, + final TransactionManager tm, + final Connection connection) throws ResourceException { - this._activation = activation; - this._session = session; + super(ra, spec, endpointFactory); this._tm = tm; + this._connection = connection; } - + + public QpidMessageHandler(final QpidResourceAdapter ra, + final QpidActivationSpec spec, + final MessageEndpointFactory endpointFactory, + final TransactionManager tm) throws ResourceException + { + super(ra, spec, endpointFactory); + this._tm = tm; + } + public void setup() throws Exception { if (_log.isTraceEnabled()) { _log.trace("setup()"); } - - QpidActivationSpec spec = _activation.getActivationSpec(); - String selector = spec.getMessageSelector(); - + + setupCF(); + setupDestination(); + String selector = _spec.getMessageSelector(); + + if(_spec.isUseConnectionPerHandler()) + { + setupConnection(); + _connection.setExceptionListener(this); + } + + if(isXA()) + { + _session = _ra.createXASession((XAConnectionImpl)_connection); + } + else + { + _session = _ra.createSession((AMQConnection)_connection, + _spec.getAcknowledgeModeInt(), + _spec.isUseLocalTx(), + _spec.getPrefetchLow(), + _spec.getPrefetchHigh()); + } // Create the message consumer - if (_activation.isTopic()) + if (_isTopic) { - final Topic topic = (Topic) _activation.getDestination(); - final String subscriptionName = spec.getSubscriptionName(); - if (spec.isSubscriptionDurable()) - _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false); + final Topic topic = (Topic) _destination; + final String subscriptionName = _spec.getSubscriptionName(); + + if (_spec.isSubscriptionDurable()) + { + _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false); + } else - _consumer = _session.createConsumer(topic, selector) ; + { + _consumer = _session.createConsumer(topic, selector) ; + } } else { - final Queue queue = (Queue) _activation.getDestination(); + final Queue queue = (Queue) _destination; _consumer = _session.createConsumer(queue, selector); } - // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX - MessageEndpointFactory endpointFactory = _activation.getMessageEndpointFactory(); - _useLocalTx = _activation.getActivationSpec().isUseLocalTx(); - _transacted = _activation.isDeliveryTransacted() || _useLocalTx ; - if (_activation.isDeliveryTransacted() && !_activation.getActivationSpec().isUseLocalTx()) + if (isXA()) { final XAResource xaResource = ((XASession)_session).getXAResource() ; - _endpoint = endpointFactory.createEndpoint(xaResource); + _endpoint = _endpointFactory.createEndpoint(xaResource); } else { - _endpoint = endpointFactory.createEndpoint(null); + _endpoint = _endpointFactory.createEndpoint(null); } _consumer.setMessageListener(this); + _connection.start(); + _activated.set(true); } /** @@ -126,11 +148,13 @@ public class QpidMessageHandler implements MessageListener */ public void teardown() { - if (_log.isTraceEnabled()) - { - _log.trace("teardown()"); - } + if (_log.isTraceEnabled()) + { + _log.trace("teardown()"); + } + super.teardown(); + try { if (_endpoint != null) @@ -156,27 +180,28 @@ public class QpidMessageHandler implements MessageListener try { - if (_activation.getActivationSpec().getTransactionTimeout() > 0 && _tm != null) + if (_spec.getTransactionTimeout() > 0 && _tm != null) { - _tm.setTransactionTimeout(_activation.getActivationSpec().getTransactionTimeout()); + _tm.setTransactionTimeout(_spec.getTransactionTimeout()); } _endpoint.beforeDelivery(QpidActivation.ONMESSAGE); beforeDelivery = true; - if(_transacted) + if(isXA()) { message.acknowledge(); } ((MessageListener)_endpoint).onMessage(message); - if (_transacted && (_tm.getTransaction() != null)) + if (isXA() && (_tm.getTransaction() != null)) { final int status = _tm.getStatus() ; final boolean rollback = status == Status.STATUS_MARKED_ROLLBACK || status == Status.STATUS_ROLLING_BACK || status == Status.STATUS_ROLLEDBACK; + if (rollback) { _session.recover() ; @@ -196,7 +221,7 @@ public class QpidMessageHandler implements MessageListener _log.warn("Unable to call after delivery", e); return; } - if (_useLocalTx) + if (!isXA() && _spec.isUseLocalTx()) { _session.commit(); } @@ -216,7 +241,7 @@ public class QpidMessageHandler implements MessageListener _log.warn("Unable to call after delivery", e); } } - if (_useLocalTx || !_activation.isDeliveryTransacted()) + if (!isXA() && _spec.isUseLocalTx()) { try { @@ -241,5 +266,17 @@ public class QpidMessageHandler implements MessageListener } } + + public void start() throws Exception + { + _deliveryActive.set(true); + setup(); + } + + public void stop() + { + _deliveryActive.set(false); + teardown(); + } } diff --git a/java/jca/src/main/java/org/apache/qpid/ra/tm/GlassfishTransactionManagerLocator.java b/java/jca/src/main/java/org/apache/qpid/ra/tm/GlassfishTransactionManagerLocator.java new file mode 100644 index 0000000000..cff53d2710 --- /dev/null +++ b/java/jca/src/main/java/org/apache/qpid/ra/tm/GlassfishTransactionManagerLocator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.ra.tm; + + +import javax.naming.InitialContext; +import javax.transaction.TransactionManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GlassfishTransactionManagerLocator +{ + private static final Logger _log = LoggerFactory.getLogger(GlassfishTransactionManagerLocator.class); + + private static final String TM_JNDI_NAME = "java:appserver/TransactionManager"; + + public TransactionManager getTm() throws Exception + { + InitialContext ctx = null; + TransactionManager tm = null; + + try + { + ctx = new InitialContext(); + tm = (TransactionManager)ctx.lookup(TM_JNDI_NAME); + } + catch(Exception e) + { + _log.error("Error attempting to location TM " + e.getMessage()); + } + finally + { + try + { + if(ctx != null) + { + ctx.close(); + } + } + catch(Exception ignore){} + } + + return tm; + } +} diff --git a/java/jca/src/main/java/org/apache/qpid/ra/tm/JBoss7TransactionManagerLocator.java b/java/jca/src/main/java/org/apache/qpid/ra/tm/JBoss7TransactionManagerLocator.java index 266c56bd63..6103433cbf 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/tm/JBoss7TransactionManagerLocator.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/tm/JBoss7TransactionManagerLocator.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.ra.tm; import javax.naming.InitialContext; diff --git a/java/jca/src/main/java/org/apache/qpid/ra/tm/WLSTransactionManagerLocator.java b/java/jca/src/main/java/org/apache/qpid/ra/tm/WLSTransactionManagerLocator.java new file mode 100644 index 0000000000..29e673d28e --- /dev/null +++ b/java/jca/src/main/java/org/apache/qpid/ra/tm/WLSTransactionManagerLocator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.ra.tm; + + +import javax.naming.InitialContext; +import javax.transaction.TransactionManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WLSTransactionManagerLocator +{ + private static final Logger _log = LoggerFactory.getLogger(WLSTransactionManagerLocator.class); + + private static final String TM_JNDI_NAME = "javax.transaction.TransactionManager"; + + public TransactionManager getTm() throws Exception + { + InitialContext ctx = null; + TransactionManager tm = null; + + try + { + ctx = new InitialContext(); + tm = (TransactionManager)ctx.lookup(TM_JNDI_NAME); + } + catch(Exception e) + { + _log.error("Unable to locate javax.transaction.TransactionManager " + e.getMessage()); + } + finally + { + try + { + if(ctx != null) + { + ctx.close(); + } + } + catch(Exception ignore){} + } + + return tm; + } +} + diff --git a/java/jca/src/main/resources/META-INF/ra.xml b/java/jca/src/main/resources/META-INF/ra.xml index 2c8344c8f0..a9374f52d7 100755 --- a/java/jca/src/main/resources/META-INF/ra.xml +++ b/java/jca/src/main/resources/META-INF/ra.xml @@ -69,7 +69,7 @@ </config-property> <config-property> - <description>Interval between setup attempts</description> + <description>Interval between setup attempts in milliseconds</description> <config-property-name>SetupInterval</config-property-name> <config-property-type>java.lang.Long</config-property-type> <config-property-value>5000</config-property-value> @@ -104,10 +104,17 @@ </config-property> <config-property> - <description>connection URL</description> + <description>Connection URL</description> <config-property-name>ConnectionURL</config-property-name> <config-property-type>java.lang.String</config-property-type> - <config-property-value>amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'</config-property-value> + <config-property-value>amqp://anonymous:passwd@client/test?brokerlist='tcp://localhost?sasl_mechs='PLAIN''</config-property-value> + </config-property> + + <config-property> + <description>Use a JMS Connection per MessageHandler</description> + <config-property-name>UseConnectionPerHandler</config-property-name> + <config-property-type>java.lang.Boolean</config-property-type> + <config-property-value>true</config-property-value> </config-property> <outbound-resourceadapter> @@ -116,14 +123,14 @@ <config-property> <description>Default session type</description> - <config-property-name>sessionDefaultType</config-property-name> + <config-property-name>SessionDefaultType</config-property-name> <config-property-type>java.lang.String</config-property-type> <config-property-value>javax.jms.Queue</config-property-value> </config-property> <config-property> <description>Specify lock timeout in seconds</description> - <config-property-name>useTryLock</config-property-name> + <config-property-name>UseTryLock</config-property-name> <config-property-type>java.lang.Integer</config-property-type> <config-property-value>0</config-property-value> </config-property> @@ -137,7 +144,7 @@ <config-property> <description>Client ID for the connection</description> - <config-property-name>ClientID</config-property-name> + <config-property-name>ClientId</config-property-name> <config-property-type>java.lang.String</config-property-type> <config-property-value>client_id</config-property-value> </config-property> @@ -195,12 +202,11 @@ </messagelistener> </messageadapter> </inbound-resourceadapter> - <adminobject> <adminobject-interface>org.apache.qpid.ra.admin.QpidQueue</adminobject-interface> <adminobject-class> org.apache.qpid.ra.admin.QpidQueueImpl</adminobject-class> <config-property> - <config-property-name>destinationAddress </config-property-name> + <config-property-name>DestinationAddress </config-property-name> <config-property-type>java.lang.String </config-property-type> </config-property> </adminobject> @@ -208,30 +214,16 @@ <adminobject-interface>org.apache.qpid.ra.admin.QpidTopic</adminobject-interface> <adminobject-class> org.apache.qpid.ra.admin.QpidTopicImpl</adminobject-class> <config-property> - <config-property-name>destinationAddress </config-property-name> + <config-property-name>DestinationAddress </config-property-name> <config-property-type>java.lang.String </config-property-type> </config-property> </adminobject> - <!-- - <adminobject> - <adminobject-interface>javax.jms.Destination</adminobject-interface> - <adminobject-class> org.apache.qpid.ra.admin.QpidDestinationProxy</adminobject-class> - <config-property> - <config-property-name>destinationAddress </config-property-name> - <config-property-type>java.lang.String </config-property-type> - </config-property> - <config-property> - <config-property-name>destinationType</config-property-name> - <config-property-type>java.lang.String </config-property-type> - </config-property> - </adminobject> - --> <adminobject> <adminobject-interface>javax.jms.ConnectionFactory</adminobject-interface> <adminobject-class> org.apache.qpid.ra.admin.QpidConnectionFactoryProxy</adminobject-class> <config-property> - <config-property-name>connectionURL</config-property-name> - <config-property-type>java.lang.String </config-property-type> + <config-property-name>ConnectionURL</config-property-name> + <config-property-type>java.lang.String</config-property-type> </config-property> </adminobject> </resourceadapter> diff --git a/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java b/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java new file mode 100644 index 0000000000..e811427223 --- /dev/null +++ b/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ra; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; + +import javax.resource.spi.ResourceAdapterInternalException; + +import org.apache.qpid.ra.inflow.QpidActivationSpec; + +import junit.framework.TestCase; + +public class QpidActivationSpecTest extends TestCase +{ + + public void testActivationSpecBasicSerialization() throws Exception + { + QpidActivationSpec spec = new QpidActivationSpec(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(spec); + oos.close(); + assertTrue(out.toByteArray().length > 0); + } + +} diff --git a/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java b/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java new file mode 100644 index 0000000000..a6788a72c5 --- /dev/null +++ b/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ra; + +import javax.resource.spi.ResourceAdapterInternalException; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; + +import junit.framework.TestCase; + + +public class QpidResourceAdapterTest extends TestCase +{ + public void testGetXAResources() throws Exception + { + QpidResourceAdapter ra = new QpidResourceAdapter(); + assertNull(ra.getXAResources(null)); + } + + public void testTransactionManagerLocatorException() throws Exception + { + + QpidResourceAdapter ra = new QpidResourceAdapter(); + assertNull(ra.getTransactionManagerLocatorClass()); + assertNull(ra.getTransactionManagerLocatorMethod()); + + try + { + ra.start(null); + } + catch(ResourceAdapterInternalException e) + { + + } + + ra.setTransactionManagerLocatorClass("DummyLocator"); + + try + { + ra.start(null); + } + catch(ResourceAdapterInternalException e) + { + + } + + } + + public void testResourceAdapterBasicSerialization() throws Exception + { + + QpidResourceAdapter ra = new QpidResourceAdapter(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(ra); + oos.close(); + assertTrue(out.toByteArray().length > 0); + } +} |
