diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-05-28 11:50:54 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-05-28 11:50:54 +0000 |
| commit | f3208542ff40aa6dad13abf6a4eff85661ba89bb (patch) | |
| tree | dc6456d29cbddb83372c779b746cb7cac5ca5e33 /qpid/java/client/src/main | |
| parent | f3453cb5bb800704d8563f150cab28bf7177560c (diff) | |
| download | qpid-python-f3208542ff40aa6dad13abf6a4eff85661ba89bb.tar.gz | |
QPID-1094: Implement XA resource exception handling and add corresponding tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@660911 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main')
3 files changed, 199 insertions, 180 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index cf5e1bd8ac..e2767e2c9d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -71,6 +71,8 @@ public class AMQSession_0_10 extends AMQSession private Object _currentExceptionLock = new Object(); private QpidException _currentException; + // a ref on the qpidity connection + protected org.apache.qpidity.nclient.Connection _qpidConnection; //--- constructors /** @@ -92,7 +94,7 @@ public class AMQSession_0_10 extends AMQSession super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark); - + _qpidConnection = qpidConnection; // create the qpid session with an expiry <= 0 so that the session does not expire _qpidSession = qpidConnection.createSession(0); // set the exception listnere for this session diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index fd50417fc7..27ec445436 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -73,54 +73,25 @@ public class XAResourceImpl implements XAResource { if (_logger.isDebugEnabled()) { - _logger.debug("commit ", xid); + _logger.debug("commit tx branch with xid: ", xid); } - if (xid == null) - { - throw new XAException(XAException.XAER_PROTO); - } - Future<XaResult> future; + Future<XaResult> future = + _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NO_OPTION); + + // now wait on the future for the result + XaResult result = null; try { - future = _xaSession.getQpidSession() - .dtxCommit(XidImpl.convert(xid), b ? Option.ONE_PHASE : Option.NO_OPTION); + result = future.get(); } - catch (QpidException e) + catch (SessionException e) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Cannot convert Xid into String format ", e); - } - throw new XAException(XAException.XAER_PROTO); - } - // now wait on the future for the result - XaResult result = future.get(); - DtxXaStatus status = result.getStatus(); - switch (status) - { - case XA_OK: - // do nothing this ok - break; - case XA_HEURHAZ: - throw new XAException(XAException.XA_HEURHAZ); - case XA_HEURCOM: - throw new XAException(XAException.XA_HEURCOM); - case XA_HEURRB: - throw new XAException(XAException.XA_HEURRB); - case XA_HEURMIX: - throw new XAException(XAException.XA_HEURMIX); - case XA_RBROLLBACK: - throw new XAException(XAException.XA_RBROLLBACK); - case XA_RBTIMEOUT: - throw new XAException(XAException.XA_RBTIMEOUT); - default: - // this should not happen - if (_logger.isDebugEnabled()) - { - _logger.debug("got unexpected status value: ", status); - } - throw new XAException(XAException.XAER_PROTO); + // we need to restore the qpidity session that has been closed + _xaSession.createSession(); + // we should get a single exception + convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); } + checkStatus(result.getStatus()); } /** @@ -143,50 +114,29 @@ public class XAResourceImpl implements XAResource { if (_logger.isDebugEnabled()) { - _logger.debug("end ", xid); + _logger.debug("end tx branch with xid: ", xid); } - if (xid == null) - { - throw new XAException(XAException.XAER_PROTO); - } - Future<XaResult> future; + Future<XaResult> future = _xaSession.getQpidSession() + .dtxEnd(convertXid(xid), + flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, + flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); + // now wait on the future for the result + XaResult result = null; try { - future = _xaSession.getQpidSession() - .dtxEnd(XidImpl.convert(xid), - flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, - flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); - } - catch (QpidException e) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Cannot convert Xid into String format ", e); - } - throw new XAException(XAException.XAER_PROTO); + result = future.get(); } - // now wait on the future for the result - XaResult result = future.get(); - DtxXaStatus status = result.getStatus(); - switch (status) + catch (SessionException e) { - case XA_OK: - // do nothing this ok - break; - case XA_RBROLLBACK: - throw new XAException(XAException.XA_RBROLLBACK); - case XA_RBTIMEOUT: - throw new XAException(XAException.XA_RBTIMEOUT); - default: - // this should not happen - if (_logger.isDebugEnabled()) - { - _logger.debug("got unexpected status value: ", status); - } - throw new XAException(XAException.XAER_PROTO); + // we need to restore the qpidity session that has been closed + _xaSession.createSession(); + // we should get a single exception + convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); } + checkStatus(result.getStatus()); } + /** * Tells the resource manager to forget about a heuristically completed transaction branch. * @@ -198,16 +148,23 @@ public class XAResourceImpl implements XAResource { if (_logger.isDebugEnabled()) { - _logger.debug("forget ", xid); + _logger.debug("forget tx branch with xid: ", xid); } - if (xid == null) + _xaSession.getQpidSession().dtxForget(convertXid(xid)); + try + { + _xaSession.getQpidSession().sync(); + } + catch (SessionException e) { - throw new XAException(XAException.XAER_PROTO); + // we need to restore the qpidity session that has been closed + _xaSession.createSession(); + // we should get a single exception + convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); } - _xaSession.getQpidSession().dtxForget(new org.apache.qpidity.transport.Xid() - .setGlobalId((xid.getGlobalTransactionId()))); } + /** * Obtains the current transaction timeout value set for this XAResource instance. * If XAResource.setTransactionTimeout was not used prior to invoking this method, @@ -222,19 +179,18 @@ public class XAResourceImpl implements XAResource int result = 0; if (_xid != null) { + Future<GetTimeoutResult> future = + _xaSession.getQpidSession().dtxGetTimeout(convertXid(_xid)); try { - Future<GetTimeoutResult> future = - _xaSession.getQpidSession().dtxGetTimeout(XidImpl.convert(_xid)); result = (int) future.get().getTimeout(); } - catch (QpidException e) + catch (SessionException e) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Cannot convert Xid into String format ", e); - } - throw new XAException(XAException.XAER_PROTO); + // we need to restore the qpidity session that has been closed + _xaSession.createSession(); + // we should get a single exception + convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); } } return result; @@ -270,46 +226,30 @@ public class XAResourceImpl implements XAResource { _logger.debug("prepare ", xid); } - if (xid == null) - { - throw new XAException(XAException.XAER_PROTO); - } - Future<XaResult> future; + Future<XaResult> future = _xaSession.getQpidSession().dtxPrepare(convertXid(xid)); + XaResult result = null; try { - future = _xaSession.getQpidSession() - .dtxPrepare(XidImpl.convert(xid)); + result = future.get(); } - catch (QpidException e) + catch (SessionException e) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Cannot convert Xid into String format ", e); - } - throw new XAException(XAException.XAER_PROTO); + // we need to restore the qpidity session that has been closed + _xaSession.createSession(); + // we should get a single exception + convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); } - XaResult result = future.get(); DtxXaStatus status = result.getStatus(); - int outcome; + int outcome = XAResource.XA_OK; switch (status) { case XA_OK: - outcome = XAResource.XA_OK; break; case XA_RDONLY: outcome = XAResource.XA_RDONLY; break; - case XA_RBROLLBACK: - throw new XAException(XAException.XA_RBROLLBACK); - case XA_RBTIMEOUT: - throw new XAException(XAException.XA_RBTIMEOUT); default: - // this should not happen - if (_logger.isDebugEnabled()) - { - _logger.debug("got unexpected status value: ", status); - } - throw new XAException(XAException.XAER_PROTO); + checkStatus(status); } return outcome; } @@ -351,53 +291,26 @@ public class XAResourceImpl implements XAResource */ public void rollback(Xid xid) throws XAException { - if (xid == null) + if (_logger.isDebugEnabled()) { - throw new XAException(XAException.XAER_PROTO); + _logger.debug("rollback tx branch with xid: ", xid); } - // the flag is ignored - Future<XaResult> future; + + Future<XaResult> future = _xaSession.getQpidSession().dtxRollback(convertXid(xid)); + // now wait on the future for the result + XaResult result = null; try { - future = _xaSession.getQpidSession() - .dtxRollback(XidImpl.convert(xid)); + result = future.get(); } - catch (QpidException e) + catch (SessionException e) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Cannot convert Xid into String format ", e); - } - throw new XAException(XAException.XAER_PROTO); - } - // now wait on the future for the result - XaResult result = future.get(); - DtxXaStatus status = result.getStatus(); - switch (status) - { - case XA_OK: - // do nothing this ok - break; - case XA_HEURHAZ: - throw new XAException(XAException.XA_HEURHAZ); - case XA_HEURCOM: - throw new XAException(XAException.XA_HEURCOM); - case XA_HEURRB: - throw new XAException(XAException.XA_HEURRB); - case XA_HEURMIX: - throw new XAException(XAException.XA_HEURMIX); - case XA_RBROLLBACK: - throw new XAException(XAException.XA_RBROLLBACK); - case XA_RBTIMEOUT: - throw new XAException(XAException.XA_RBTIMEOUT); - default: - // this should not happen - if (_logger.isDebugEnabled()) - { - _logger.debug("got unexpected status value: ", status); - } - throw new XAException(XAException.XAER_PROTO); + // we need to restore the qpidity session that has been closed + _xaSession.createSession(); + // we should get a single exception + convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode()); } + checkStatus(result.getStatus()); } /** @@ -451,48 +364,141 @@ public class XAResourceImpl implements XAResource { if (_logger.isDebugEnabled()) { - _logger.debug("start ", xid); + _logger.debug("start tx branch with xid: ", xid); } - if (xid == null) - { - throw new XAException(XAException.XAER_PROTO); - } - _xid = xid; - Future<XaResult> future; + Future<XaResult> future = _xaSession.getQpidSession() + .dtxStart(convertXid(xid), + flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, + flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); + // now wait on the future for the result + XaResult result = null; try { - future = _xaSession.getQpidSession() - .dtxStart(XidImpl.convert(xid), - flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, - flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); + result = future.get(); } - catch (QpidException e) + catch (SessionException e) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Cannot convert Xid into String format ", e); - } - throw new XAException(XAException.XAER_PROTO); + // we need to restore the qpidity session that has been closed + _xaSession.createSession(); + // we should get a single exception + convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + // TODO: The amqp spec does not allow to make the difference + // between an already known XID and a wrong arguments (join and resume are set) + // TODO: make sure amqp addresses that } - // now wait on the future for the result - XaResult result = future.get(); - DtxXaStatus status = result.getStatus(); + checkStatus(result.getStatus()); + _xid = xid; + } + + //------------------------------------------------------------------------ + // Private methods + //------------------------------------------------------------------------ + + /** + * Check xa method outcome and, when required, convert the status into the corresponding xa exception + * @param status method status code + * @throws XAException corresponding XA Exception when required + */ + private void checkStatus(DtxXaStatus status) throws XAException + { switch (status) { case XA_OK: - // do nothing this ok + // Do nothing this ok break; case XA_RBROLLBACK: + // The tx has been rolled back for an unspecified reason. throw new XAException(XAException.XA_RBROLLBACK); case XA_RBTIMEOUT: + // The transaction branch took too long. throw new XAException(XAException.XA_RBTIMEOUT); + case XA_HEURHAZ: + // The transaction branch may have been heuristically completed. + throw new XAException(XAException.XA_HEURHAZ); + case XA_HEURCOM: + // The transaction branch has been heuristically committed. + throw new XAException(XAException.XA_HEURCOM); + case XA_HEURRB: + // The transaction branch has been heuristically rolled back. + throw new XAException(XAException.XA_HEURRB); + case XA_HEURMIX: + // The transaction branch has been heuristically committed and rolled back. + throw new XAException(XAException.XA_HEURMIX); + case XA_RDONLY: + // The transaction branch was read-only and has been committed. + throw new XAException(XAException.XA_RDONLY); default: // this should not happen if (_logger.isDebugEnabled()) { _logger.debug("got unexpected status value: ", status); } + //A resource manager error has occured in the transaction branch. + throw new XAException(XAException.XAER_RMERR); + } + } + + /** + * Convert execution error to xa exception. + * @param error the execution error code + * @throws XAException + */ + private void convertExecutionErrorToXAErr(ExecutionErrorCode error) throws XAException + { + switch (error) + { + case NOT_ALLOWED: + // The XID already exists. + throw new XAException(XAException.XAER_DUPID); + case NOT_FOUND: + // The XID is not valid. + throw new XAException(XAException.XAER_NOTA); + case ILLEGAL_STATE: + // Routine was invoked in an inproper context. throw new XAException(XAException.XAER_PROTO); + case NOT_IMPLEMENTED: + // the command is not implemented + throw new XAException(XAException.XAER_RMERR); + case COMMAND_INVALID: + // Invalid call + throw new XAException(XAException.XAER_INVAL); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("Got unexpected error: " + error); + } + //A resource manager error has occured in the transaction branch. + throw new XAException(XAException.XAER_RMERR); } } + + /** + * convert a generic xid into qpid format + * @param xid xid to be converted + * @return the qpid formated xid + * @throws XAException when xid is null or when it cannot be converted. + */ + private org.apache.qpidity.transport.Xid convertXid(Xid xid) throws XAException + { + if (xid == null) + { + // Invalid arguments were given. + throw new XAException(XAException.XAER_INVAL); + } + try + { + return XidImpl.convert(xid); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + //A resource manager error has occured in the transaction branch. + throw new XAException(XAException.XAER_RMERR); + } + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index 97f1098e43..1eda71cc63 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -54,10 +54,21 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic super(qpidConnection, con, channelId, false, // this is not a transacted session Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); - _qpidDtxSession = qpidConnection.createDTXSession(0); + createSession(); _xaResource = new XAResourceImpl(this); } + //-- public methods + + /** + * Create a qpidity session. + */ + public void createSession() + { + _qpidDtxSession = _qpidConnection.createDTXSession(0); + } + + //--- javax.njms.XASEssion API /** |
