diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-21 10:51:29 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-21 10:51:29 +0000 |
| commit | 7f5b6f89219a0d0da412a0f9f7b060fadcdaac3e (patch) | |
| tree | a8e7204512f5d16d965e354731ad6fa359dfeba3 /java/client/src | |
| parent | 7d989598d7015f3cb28b090666229401cbc6ff87 (diff) | |
| download | qpid-python-7f5b6f89219a0d0da412a0f9f7b060fadcdaac3e.tar.gz | |
changed to use futures
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@568054 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java | 197 |
1 files changed, 166 insertions, 31 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java index fc298b01cb..4d3aa01e0c 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java @@ -21,7 +21,7 @@ import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; -import org.apache.qpidity.Option; +import org.apache.qpidity.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +63,8 @@ public class XAResourceImpl implements XAResource * * @param xid A global transaction identifier * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid. - * @throws XAException An error has occurred. Possible XAExceptions are XAER_RMERR, XAER_NOTA or XAER_PROTO. + * @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ, + * XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO. */ public void commit(Xid xid, boolean b) throws XAException { @@ -71,7 +72,40 @@ public class XAResourceImpl implements XAResource { _logger.debug("commit ", xid); } - _xaSession.getQpidSession().dtxCoordinationCommit(new String(xid.getGlobalTransactionId()), b ? Option.ONE_PHASE : Option.NO_OPTION); + if( xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + Future<DtxCoordinationCommitResult> future = _xaSession.getQpidSession().dtxCoordinationCommit( + xid.toString(), b ? Option.ONE_PHASE : Option.NO_OPTION); + // now wait on the future for the result + DtxCoordinationCommitResult result = future.get(); + int status = result.getStatus(); + switch (status) + { + case Constant.XA_OK: + // do nothing this ok + break; + case Constant.XA_HEURHAZ: + throw new XAException(XAException.XA_HEURHAZ); + case Constant.XA_HEURCOM: + throw new XAException(XAException.XA_HEURCOM); + case Constant.XA_HEURRB: + throw new XAException(XAException.XA_HEURRB); + case Constant.XA_HEURMIX: + throw new XAException(XAException.XA_HEURMIX); + case Constant.XA_RBROLLBACK: + throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_RBTIMEOUT: + throw new XAException(XAException.XA_RBTIMEOUT); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("got unexpected status value: ", status); + } + throw new XAException(XAException.XAER_PROTO); + } } /** @@ -87,8 +121,8 @@ public class XAResourceImpl implements XAResource * * @param xid A global transaction identifier that is the same as the identifier used previously in the start method * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND. - * @throws XAException An error has occurred. Possible XAException values - * are XAER_RMERR, XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*. + * @throws XAException An error has occurred. An error has occurred. Possible XAException values are XAER_RMERR, + * XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*. */ public void end(Xid xid, int flag) throws XAException { @@ -96,16 +130,39 @@ public class XAResourceImpl implements XAResource { _logger.debug("end ", xid); } - xid = null; - _xaSession.getQpidSession() - .dtxDemarcationEnd(new String(xid.getGlobalTransactionId()), flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, + if( xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + Future<DtxDemarcationEndResult> future = _xaSession.getQpidSession() + .dtxDemarcationEnd(xid.toString(), flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); + // now wait on the future for the result + DtxDemarcationEndResult result = future.get(); + int status = result.getStatus(); + switch (status) + { + case Constant.XA_OK: + // do nothing this ok + break; + case Constant.XA_RBROLLBACK: + throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_RBTIMEOUT: + throw new XAException(XAException.XA_RBTIMEOUT); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("got unexpected status value: ", status); + } + throw new XAException(XAException.XAER_PROTO); + } } /** * Tells the resource manager to forget about a heuristically completed transaction branch. * - * @param new String(xid.getGlobalTransactionId() A global transaction identifier + * @param xid String(xid.getGlobalTransactionId() A global transaction identifier * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, * XAER_NOTA, XAER_INVAL, or XAER_PROTO. */ @@ -115,6 +172,10 @@ public class XAResourceImpl implements XAResource { _logger.debug("forget ", xid); } + if( xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } _xaSession.getQpidSession().dtxCoordinationForget(new String(xid.getGlobalTransactionId())); } @@ -132,8 +193,9 @@ public class XAResourceImpl implements XAResource int result = 0; if (_xid != null) { - result = 0; - _xaSession.getQpidSession().dtxCoordinationGetTimeout(new String(_xid.getGlobalTransactionId())); + Future<DtxCoordinationGetTimeoutResult> future = + _xaSession.getQpidSession().dtxCoordinationGetTimeout(new String(_xid.getGlobalTransactionId())); + result = (int) future.get().getTimeout(); } return result; } @@ -168,20 +230,36 @@ public class XAResourceImpl implements XAResource { _logger.debug("prepare ", xid); } - int result; - result = 0; - _xaSession.getQpidSession() - .dtxCoordinationPrepare(new String(xid.getGlobalTransactionId())); - - if (result == XAException.XA_RDONLY) + if( xid == null) { - throw new XAException(XAException.XA_RDONLY); + throw new XAException(XAException.XAER_PROTO); } - else if (result == XAException.XA_RBROLLBACK) + Future<DtxCoordinationPrepareResult> future = _xaSession.getQpidSession() + .dtxCoordinationPrepare(new String(xid.getGlobalTransactionId())); + DtxCoordinationPrepareResult result = future.get(); + int status = result.getStatus(); + int outcome = 0; + switch (status) { - throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_OK: + outcome = XAResource.XA_OK; + break; + case Constant.XA_RDONLY: + outcome = XAResource.XA_RDONLY; + break; + case Constant.XA_RBROLLBACK: + throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_RBTIMEOUT: + throw new XAException(XAException.XA_RBTIMEOUT); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("got unexpected status value: ", status); + } + throw new XAException(XAException.XAER_PROTO); } - return result; + return outcome; } /** @@ -198,10 +276,11 @@ public class XAResourceImpl implements XAResource */ public Xid[] recover(int flag) throws XAException { -// the flag is ignored - - _xaSession.getQpidSession() + // the flag is ignored + Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession() .dtxCoordinationRecover(); + DtxCoordinationRecoverResult result = future.get(); + // todo result.getInDoubt() return null; } @@ -213,9 +292,41 @@ public class XAResourceImpl implements XAResource */ public void rollback(Xid xid) throws XAException { -// the flag is ignored - _xaSession.getQpidSession() - .dtxCoordinationRollback(new String(xid.getGlobalTransactionId())); + if( xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + // the flag is ignored + Future<DtxCoordinationRollbackResult> future = _xaSession.getQpidSession() + .dtxCoordinationRollback(xid.toString()); + // now wait on the future for the result + DtxCoordinationRollbackResult result = future.get(); + int status = result.getStatus(); + switch (status) + { + case Constant.XA_OK: + // do nothing this ok + break; + case Constant.XA_HEURHAZ: + throw new XAException(XAException.XA_HEURHAZ); + case Constant.XA_HEURCOM: + throw new XAException(XAException.XA_HEURCOM); + case Constant.XA_HEURRB: + throw new XAException(XAException.XA_HEURRB); + case Constant.XA_HEURMIX: + throw new XAException(XAException.XA_HEURMIX); + case Constant.XA_RBROLLBACK: + throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_RBTIMEOUT: + throw new XAException(XAException.XA_RBTIMEOUT); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("got unexpected status value: ", status); + } + throw new XAException(XAException.XAER_PROTO); + } } /** @@ -234,7 +345,7 @@ public class XAResourceImpl implements XAResource if (_xid != null) { _xaSession.getQpidSession() - .dtxCoordinationSetTimeout(new String(_xid.getGlobalTransactionId()), timeout); + .dtxCoordinationSetTimeout(_xid.toString(), timeout); result = true; } return result; @@ -260,9 +371,33 @@ public class XAResourceImpl implements XAResource { _logger.debug("start ", xid); } + if( xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } _xid = xid; - _xaSession.getQpidSession() - .dtxDemarcationStart(new String(xid.getGlobalTransactionId()), flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, - flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); + Future<DtxDemarcationStartResult> future = _xaSession.getQpidSession() + .dtxDemarcationStart(xid.toString(), flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, + flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); + // now wait on the future for the result + DtxDemarcationStartResult result = future.get(); + int status = result.getStatus(); + switch (status) + { + case Constant.XA_OK: + // do nothing this ok + break; + case Constant.XA_RBROLLBACK: + throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_RBTIMEOUT: + throw new XAException(XAException.XA_RBTIMEOUT); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("got unexpected status value: ", status); + } + throw new XAException(XAException.XAER_PROTO); + } } } |
