summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-08-21 10:51:29 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-08-21 10:51:29 +0000
commit7f5b6f89219a0d0da412a0f9f7b060fadcdaac3e (patch)
treea8e7204512f5d16d965e354731ad6fa359dfeba3 /java/client/src
parent7d989598d7015f3cb28b090666229401cbc6ff87 (diff)
downloadqpid-python-7f5b6f89219a0d0da412a0f9f7b060fadcdaac3e.tar.gz
changed to use futures
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@568054 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java197
1 files changed, 166 insertions, 31 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
index fc298b01cb..4d3aa01e0c 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
@@ -21,7 +21,7 @@ import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +63,8 @@ public class XAResourceImpl implements XAResource
*
* @param xid A global transaction identifier
* @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid.
- * @throws XAException An error has occurred. Possible XAExceptions are XAER_RMERR, XAER_NOTA or XAER_PROTO.
+ * @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ,
+ * XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
*/
public void commit(Xid xid, boolean b) throws XAException
{
@@ -71,7 +72,40 @@ public class XAResourceImpl implements XAResource
{
_logger.debug("commit ", xid);
}
- _xaSession.getQpidSession().dtxCoordinationCommit(new String(xid.getGlobalTransactionId()), b ? Option.ONE_PHASE : Option.NO_OPTION);
+ if( xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ Future<DtxCoordinationCommitResult> future = _xaSession.getQpidSession().dtxCoordinationCommit(
+ xid.toString(), b ? Option.ONE_PHASE : Option.NO_OPTION);
+ // now wait on the future for the result
+ DtxCoordinationCommitResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_HEURHAZ:
+ throw new XAException(XAException.XA_HEURHAZ);
+ case Constant.XA_HEURCOM:
+ throw new XAException(XAException.XA_HEURCOM);
+ case Constant.XA_HEURRB:
+ throw new XAException(XAException.XA_HEURRB);
+ case Constant.XA_HEURMIX:
+ throw new XAException(XAException.XA_HEURMIX);
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
}
/**
@@ -87,8 +121,8 @@ public class XAResourceImpl implements XAResource
*
* @param xid A global transaction identifier that is the same as the identifier used previously in the start method
* @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND.
- * @throws XAException An error has occurred. Possible XAException values
- * are XAER_RMERR, XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*.
+ * @throws XAException An error has occurred. An error has occurred. Possible XAException values are XAER_RMERR,
+ * XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*.
*/
public void end(Xid xid, int flag) throws XAException
{
@@ -96,16 +130,39 @@ public class XAResourceImpl implements XAResource
{
_logger.debug("end ", xid);
}
- xid = null;
- _xaSession.getQpidSession()
- .dtxDemarcationEnd(new String(xid.getGlobalTransactionId()), flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
+ if( xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ Future<DtxDemarcationEndResult> future = _xaSession.getQpidSession()
+ .dtxDemarcationEnd(xid.toString(), flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+ // now wait on the future for the result
+ DtxDemarcationEndResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
}
/**
* Tells the resource manager to forget about a heuristically completed transaction branch.
*
- * @param new String(xid.getGlobalTransactionId() A global transaction identifier
+ * @param xid String(xid.getGlobalTransactionId() A global transaction identifier
* @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL,
* XAER_NOTA, XAER_INVAL, or XAER_PROTO.
*/
@@ -115,6 +172,10 @@ public class XAResourceImpl implements XAResource
{
_logger.debug("forget ", xid);
}
+ if( xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
_xaSession.getQpidSession().dtxCoordinationForget(new String(xid.getGlobalTransactionId()));
}
@@ -132,8 +193,9 @@ public class XAResourceImpl implements XAResource
int result = 0;
if (_xid != null)
{
- result = 0;
- _xaSession.getQpidSession().dtxCoordinationGetTimeout(new String(_xid.getGlobalTransactionId()));
+ Future<DtxCoordinationGetTimeoutResult> future =
+ _xaSession.getQpidSession().dtxCoordinationGetTimeout(new String(_xid.getGlobalTransactionId()));
+ result = (int) future.get().getTimeout();
}
return result;
}
@@ -168,20 +230,36 @@ public class XAResourceImpl implements XAResource
{
_logger.debug("prepare ", xid);
}
- int result;
- result = 0;
- _xaSession.getQpidSession()
- .dtxCoordinationPrepare(new String(xid.getGlobalTransactionId()));
-
- if (result == XAException.XA_RDONLY)
+ if( xid == null)
{
- throw new XAException(XAException.XA_RDONLY);
+ throw new XAException(XAException.XAER_PROTO);
}
- else if (result == XAException.XA_RBROLLBACK)
+ Future<DtxCoordinationPrepareResult> future = _xaSession.getQpidSession()
+ .dtxCoordinationPrepare(new String(xid.getGlobalTransactionId()));
+ DtxCoordinationPrepareResult result = future.get();
+ int status = result.getStatus();
+ int outcome = 0;
+ switch (status)
{
- throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_OK:
+ outcome = XAResource.XA_OK;
+ break;
+ case Constant.XA_RDONLY:
+ outcome = XAResource.XA_RDONLY;
+ break;
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
}
- return result;
+ return outcome;
}
/**
@@ -198,10 +276,11 @@ public class XAResourceImpl implements XAResource
*/
public Xid[] recover(int flag) throws XAException
{
-// the flag is ignored
-
- _xaSession.getQpidSession()
+ // the flag is ignored
+ Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession()
.dtxCoordinationRecover();
+ DtxCoordinationRecoverResult result = future.get();
+ // todo result.getInDoubt()
return null;
}
@@ -213,9 +292,41 @@ public class XAResourceImpl implements XAResource
*/
public void rollback(Xid xid) throws XAException
{
-// the flag is ignored
- _xaSession.getQpidSession()
- .dtxCoordinationRollback(new String(xid.getGlobalTransactionId()));
+ if( xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // the flag is ignored
+ Future<DtxCoordinationRollbackResult> future = _xaSession.getQpidSession()
+ .dtxCoordinationRollback(xid.toString());
+ // now wait on the future for the result
+ DtxCoordinationRollbackResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_HEURHAZ:
+ throw new XAException(XAException.XA_HEURHAZ);
+ case Constant.XA_HEURCOM:
+ throw new XAException(XAException.XA_HEURCOM);
+ case Constant.XA_HEURRB:
+ throw new XAException(XAException.XA_HEURRB);
+ case Constant.XA_HEURMIX:
+ throw new XAException(XAException.XA_HEURMIX);
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
}
/**
@@ -234,7 +345,7 @@ public class XAResourceImpl implements XAResource
if (_xid != null)
{
_xaSession.getQpidSession()
- .dtxCoordinationSetTimeout(new String(_xid.getGlobalTransactionId()), timeout);
+ .dtxCoordinationSetTimeout(_xid.toString(), timeout);
result = true;
}
return result;
@@ -260,9 +371,33 @@ public class XAResourceImpl implements XAResource
{
_logger.debug("start ", xid);
}
+ if( xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
_xid = xid;
- _xaSession.getQpidSession()
- .dtxDemarcationStart(new String(xid.getGlobalTransactionId()), flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
- flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+ Future<DtxDemarcationStartResult> future = _xaSession.getQpidSession()
+ .dtxDemarcationStart(xid.toString(), flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
+ flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+ // now wait on the future for the result
+ DtxDemarcationStartResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
}
}