diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java | 160 |
1 files changed, 131 insertions, 29 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java index 4d3aa01e0c..b84b55966e 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java @@ -22,6 +22,7 @@ import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import org.apache.qpidity.*; +import org.apache.qpidity.dtx.XidImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,12 +73,24 @@ public class XAResourceImpl implements XAResource { _logger.debug("commit ", xid); } - if( xid == null) + if (xid == null) { throw new XAException(XAException.XAER_PROTO); } - Future<DtxCoordinationCommitResult> future = _xaSession.getQpidSession().dtxCoordinationCommit( - xid.toString(), b ? Option.ONE_PHASE : Option.NO_OPTION); + Future<DtxCoordinationCommitResult> future; + try + { + future = _xaSession.getQpidSession() + .dtxCoordinationCommit(XidImpl.convertToString(xid), b ? Option.ONE_PHASE : Option.NO_OPTION); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } // now wait on the future for the result DtxCoordinationCommitResult result = future.get(); int status = result.getStatus(); @@ -130,13 +143,26 @@ public class XAResourceImpl implements XAResource { _logger.debug("end ", xid); } - if( xid == null) + if (xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + Future<DtxDemarcationEndResult> future; + try + { + future = _xaSession.getQpidSession() + .dtxDemarcationEnd(XidImpl.convertToString(xid), + flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, + flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); + } + catch (QpidException e) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } throw new XAException(XAException.XAER_PROTO); } - Future<DtxDemarcationEndResult> future = _xaSession.getQpidSession() - .dtxDemarcationEnd(xid.toString(), flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, - flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); // now wait on the future for the result DtxDemarcationEndResult result = future.get(); int status = result.getStatus(); @@ -172,7 +198,7 @@ public class XAResourceImpl implements XAResource { _logger.debug("forget ", xid); } - if( xid == null) + if (xid == null) { throw new XAException(XAException.XAER_PROTO); } @@ -193,9 +219,20 @@ public class XAResourceImpl implements XAResource int result = 0; if (_xid != null) { - Future<DtxCoordinationGetTimeoutResult> future = - _xaSession.getQpidSession().dtxCoordinationGetTimeout(new String(_xid.getGlobalTransactionId())); - result = (int) future.get().getTimeout(); + try + { + Future<DtxCoordinationGetTimeoutResult> future = + _xaSession.getQpidSession().dtxCoordinationGetTimeout(XidImpl.convertToString(_xid)); + result = (int) future.get().getTimeout(); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } } return result; } @@ -230,15 +267,27 @@ public class XAResourceImpl implements XAResource { _logger.debug("prepare ", xid); } - if( xid == null) + if (xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + Future<DtxCoordinationPrepareResult> future; + try { + future = _xaSession.getQpidSession() + .dtxCoordinationPrepare(XidImpl.convertToString(xid)); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } throw new XAException(XAException.XAER_PROTO); } - Future<DtxCoordinationPrepareResult> future = _xaSession.getQpidSession() - .dtxCoordinationPrepare(new String(xid.getGlobalTransactionId())); DtxCoordinationPrepareResult result = future.get(); int status = result.getStatus(); - int outcome = 0; + int outcome; switch (status) { case Constant.XA_OK: @@ -277,11 +326,28 @@ public class XAResourceImpl implements XAResource public Xid[] recover(int flag) throws XAException { // the flag is ignored - Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession() - .dtxCoordinationRecover(); - DtxCoordinationRecoverResult result = future.get(); - // todo result.getInDoubt() - return null; + Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession().dtxCoordinationRecover(); + DtxCoordinationRecoverResult res = future.get(); + // todo make sure that the keys of the returned map are the xids + Xid[] result = new Xid[res.getInDoubt().size()]; + int i = 0; + try + { + for (String xid : res.getInDoubt().keySet()) + { + result[i] = new XidImpl(xid); + i++; + } + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert string into Xid ", e); + } + throw new XAException(XAException.XAER_PROTO); + } + return result; } /** @@ -292,13 +358,25 @@ public class XAResourceImpl implements XAResource */ public void rollback(Xid xid) throws XAException { - if( xid == null) + if (xid == null) { throw new XAException(XAException.XAER_PROTO); } // the flag is ignored - Future<DtxCoordinationRollbackResult> future = _xaSession.getQpidSession() - .dtxCoordinationRollback(xid.toString()); + Future<DtxCoordinationRollbackResult> future; + try + { + future = _xaSession.getQpidSession() + .dtxCoordinationRollback(XidImpl.convertToString(xid)); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } // now wait on the future for the result DtxCoordinationRollbackResult result = future.get(); int status = result.getStatus(); @@ -344,8 +422,19 @@ public class XAResourceImpl implements XAResource boolean result = false; if (_xid != null) { - _xaSession.getQpidSession() - .dtxCoordinationSetTimeout(_xid.toString(), timeout); + try + { + _xaSession.getQpidSession() + .dtxCoordinationSetTimeout(XidImpl.convertToString(_xid), timeout); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } result = true; } return result; @@ -371,14 +460,27 @@ public class XAResourceImpl implements XAResource { _logger.debug("start ", xid); } - if( xid == null) + if (xid == null) { throw new XAException(XAException.XAER_PROTO); } _xid = xid; - Future<DtxDemarcationStartResult> future = _xaSession.getQpidSession() - .dtxDemarcationStart(xid.toString(), flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, - flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); + Future<DtxDemarcationStartResult> future; + try + { + future = _xaSession.getQpidSession() + .dtxDemarcationStart(XidImpl.convertToString(xid), + flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, + flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } // now wait on the future for the result DtxDemarcationStartResult result = future.get(); int status = result.getStatus(); |
