summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-05-28 11:50:54 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-05-28 11:50:54 +0000
commitf3208542ff40aa6dad13abf6a4eff85661ba89bb (patch)
treedc6456d29cbddb83372c779b746cb7cac5ca5e33 /qpid/java/client/src/main
parentf3453cb5bb800704d8563f150cab28bf7177560c (diff)
downloadqpid-python-f3208542ff40aa6dad13abf6a4eff85661ba89bb.tar.gz
QPID-1094: Implement XA resource exception handling and add corresponding tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@660911 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java362
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java13
3 files changed, 199 insertions, 180 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index cf5e1bd8ac..e2767e2c9d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -71,6 +71,8 @@ public class AMQSession_0_10 extends AMQSession
private Object _currentExceptionLock = new Object();
private QpidException _currentException;
+ // a ref on the qpidity connection
+ protected org.apache.qpidity.nclient.Connection _qpidConnection;
//--- constructors
/**
@@ -92,7 +94,7 @@ public class AMQSession_0_10 extends AMQSession
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
-
+ _qpidConnection = qpidConnection;
// create the qpid session with an expiry <= 0 so that the session does not expire
_qpidSession = qpidConnection.createSession(0);
// set the exception listnere for this session
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index fd50417fc7..27ec445436 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -73,54 +73,25 @@ public class XAResourceImpl implements XAResource
{
if (_logger.isDebugEnabled())
{
- _logger.debug("commit ", xid);
+ _logger.debug("commit tx branch with xid: ", xid);
}
- if (xid == null)
- {
- throw new XAException(XAException.XAER_PROTO);
- }
- Future<XaResult> future;
+ Future<XaResult> future =
+ _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NO_OPTION);
+
+ // now wait on the future for the result
+ XaResult result = null;
try
{
- future = _xaSession.getQpidSession()
- .dtxCommit(XidImpl.convert(xid), b ? Option.ONE_PHASE : Option.NO_OPTION);
+ result = future.get();
}
- catch (QpidException e)
+ catch (SessionException e)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Cannot convert Xid into String format ", e);
- }
- throw new XAException(XAException.XAER_PROTO);
- }
- // now wait on the future for the result
- XaResult result = future.get();
- DtxXaStatus status = result.getStatus();
- switch (status)
- {
- case XA_OK:
- // do nothing this ok
- break;
- case XA_HEURHAZ:
- throw new XAException(XAException.XA_HEURHAZ);
- case XA_HEURCOM:
- throw new XAException(XAException.XA_HEURCOM);
- case XA_HEURRB:
- throw new XAException(XAException.XA_HEURRB);
- case XA_HEURMIX:
- throw new XAException(XAException.XA_HEURMIX);
- case XA_RBROLLBACK:
- throw new XAException(XAException.XA_RBROLLBACK);
- case XA_RBTIMEOUT:
- throw new XAException(XAException.XA_RBTIMEOUT);
- default:
- // this should not happen
- if (_logger.isDebugEnabled())
- {
- _logger.debug("got unexpected status value: ", status);
- }
- throw new XAException(XAException.XAER_PROTO);
+ // we need to restore the qpidity session that has been closed
+ _xaSession.createSession();
+ // we should get a single exception
+ convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
}
+ checkStatus(result.getStatus());
}
/**
@@ -143,50 +114,29 @@ public class XAResourceImpl implements XAResource
{
if (_logger.isDebugEnabled())
{
- _logger.debug("end ", xid);
+ _logger.debug("end tx branch with xid: ", xid);
}
- if (xid == null)
- {
- throw new XAException(XAException.XAER_PROTO);
- }
- Future<XaResult> future;
+ Future<XaResult> future = _xaSession.getQpidSession()
+ .dtxEnd(convertXid(xid),
+ flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
+ flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+ // now wait on the future for the result
+ XaResult result = null;
try
{
- future = _xaSession.getQpidSession()
- .dtxEnd(XidImpl.convert(xid),
- flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
- flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Cannot convert Xid into String format ", e);
- }
- throw new XAException(XAException.XAER_PROTO);
+ result = future.get();
}
- // now wait on the future for the result
- XaResult result = future.get();
- DtxXaStatus status = result.getStatus();
- switch (status)
+ catch (SessionException e)
{
- case XA_OK:
- // do nothing this ok
- break;
- case XA_RBROLLBACK:
- throw new XAException(XAException.XA_RBROLLBACK);
- case XA_RBTIMEOUT:
- throw new XAException(XAException.XA_RBTIMEOUT);
- default:
- // this should not happen
- if (_logger.isDebugEnabled())
- {
- _logger.debug("got unexpected status value: ", status);
- }
- throw new XAException(XAException.XAER_PROTO);
+ // we need to restore the qpidity session that has been closed
+ _xaSession.createSession();
+ // we should get a single exception
+ convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
}
+ checkStatus(result.getStatus());
}
+
/**
* Tells the resource manager to forget about a heuristically completed transaction branch.
*
@@ -198,16 +148,23 @@ public class XAResourceImpl implements XAResource
{
if (_logger.isDebugEnabled())
{
- _logger.debug("forget ", xid);
+ _logger.debug("forget tx branch with xid: ", xid);
}
- if (xid == null)
+ _xaSession.getQpidSession().dtxForget(convertXid(xid));
+ try
+ {
+ _xaSession.getQpidSession().sync();
+ }
+ catch (SessionException e)
{
- throw new XAException(XAException.XAER_PROTO);
+ // we need to restore the qpidity session that has been closed
+ _xaSession.createSession();
+ // we should get a single exception
+ convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
}
- _xaSession.getQpidSession().dtxForget(new org.apache.qpidity.transport.Xid()
- .setGlobalId((xid.getGlobalTransactionId())));
}
+
/**
* Obtains the current transaction timeout value set for this XAResource instance.
* If XAResource.setTransactionTimeout was not used prior to invoking this method,
@@ -222,19 +179,18 @@ public class XAResourceImpl implements XAResource
int result = 0;
if (_xid != null)
{
+ Future<GetTimeoutResult> future =
+ _xaSession.getQpidSession().dtxGetTimeout(convertXid(_xid));
try
{
- Future<GetTimeoutResult> future =
- _xaSession.getQpidSession().dtxGetTimeout(XidImpl.convert(_xid));
result = (int) future.get().getTimeout();
}
- catch (QpidException e)
+ catch (SessionException e)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Cannot convert Xid into String format ", e);
- }
- throw new XAException(XAException.XAER_PROTO);
+ // we need to restore the qpidity session that has been closed
+ _xaSession.createSession();
+ // we should get a single exception
+ convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
}
}
return result;
@@ -270,46 +226,30 @@ public class XAResourceImpl implements XAResource
{
_logger.debug("prepare ", xid);
}
- if (xid == null)
- {
- throw new XAException(XAException.XAER_PROTO);
- }
- Future<XaResult> future;
+ Future<XaResult> future = _xaSession.getQpidSession().dtxPrepare(convertXid(xid));
+ XaResult result = null;
try
{
- future = _xaSession.getQpidSession()
- .dtxPrepare(XidImpl.convert(xid));
+ result = future.get();
}
- catch (QpidException e)
+ catch (SessionException e)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Cannot convert Xid into String format ", e);
- }
- throw new XAException(XAException.XAER_PROTO);
+ // we need to restore the qpidity session that has been closed
+ _xaSession.createSession();
+ // we should get a single exception
+ convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
}
- XaResult result = future.get();
DtxXaStatus status = result.getStatus();
- int outcome;
+ int outcome = XAResource.XA_OK;
switch (status)
{
case XA_OK:
- outcome = XAResource.XA_OK;
break;
case XA_RDONLY:
outcome = XAResource.XA_RDONLY;
break;
- case XA_RBROLLBACK:
- throw new XAException(XAException.XA_RBROLLBACK);
- case XA_RBTIMEOUT:
- throw new XAException(XAException.XA_RBTIMEOUT);
default:
- // this should not happen
- if (_logger.isDebugEnabled())
- {
- _logger.debug("got unexpected status value: ", status);
- }
- throw new XAException(XAException.XAER_PROTO);
+ checkStatus(status);
}
return outcome;
}
@@ -351,53 +291,26 @@ public class XAResourceImpl implements XAResource
*/
public void rollback(Xid xid) throws XAException
{
- if (xid == null)
+ if (_logger.isDebugEnabled())
{
- throw new XAException(XAException.XAER_PROTO);
+ _logger.debug("rollback tx branch with xid: ", xid);
}
- // the flag is ignored
- Future<XaResult> future;
+
+ Future<XaResult> future = _xaSession.getQpidSession().dtxRollback(convertXid(xid));
+ // now wait on the future for the result
+ XaResult result = null;
try
{
- future = _xaSession.getQpidSession()
- .dtxRollback(XidImpl.convert(xid));
+ result = future.get();
}
- catch (QpidException e)
+ catch (SessionException e)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Cannot convert Xid into String format ", e);
- }
- throw new XAException(XAException.XAER_PROTO);
- }
- // now wait on the future for the result
- XaResult result = future.get();
- DtxXaStatus status = result.getStatus();
- switch (status)
- {
- case XA_OK:
- // do nothing this ok
- break;
- case XA_HEURHAZ:
- throw new XAException(XAException.XA_HEURHAZ);
- case XA_HEURCOM:
- throw new XAException(XAException.XA_HEURCOM);
- case XA_HEURRB:
- throw new XAException(XAException.XA_HEURRB);
- case XA_HEURMIX:
- throw new XAException(XAException.XA_HEURMIX);
- case XA_RBROLLBACK:
- throw new XAException(XAException.XA_RBROLLBACK);
- case XA_RBTIMEOUT:
- throw new XAException(XAException.XA_RBTIMEOUT);
- default:
- // this should not happen
- if (_logger.isDebugEnabled())
- {
- _logger.debug("got unexpected status value: ", status);
- }
- throw new XAException(XAException.XAER_PROTO);
+ // we need to restore the qpidity session that has been closed
+ _xaSession.createSession();
+ // we should get a single exception
+ convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode());
}
+ checkStatus(result.getStatus());
}
/**
@@ -451,48 +364,141 @@ public class XAResourceImpl implements XAResource
{
if (_logger.isDebugEnabled())
{
- _logger.debug("start ", xid);
+ _logger.debug("start tx branch with xid: ", xid);
}
- if (xid == null)
- {
- throw new XAException(XAException.XAER_PROTO);
- }
- _xid = xid;
- Future<XaResult> future;
+ Future<XaResult> future = _xaSession.getQpidSession()
+ .dtxStart(convertXid(xid),
+ flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
+ flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+ // now wait on the future for the result
+ XaResult result = null;
try
{
- future = _xaSession.getQpidSession()
- .dtxStart(XidImpl.convert(xid),
- flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
- flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+ result = future.get();
}
- catch (QpidException e)
+ catch (SessionException e)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Cannot convert Xid into String format ", e);
- }
- throw new XAException(XAException.XAER_PROTO);
+ // we need to restore the qpidity session that has been closed
+ _xaSession.createSession();
+ // we should get a single exception
+ convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ // TODO: The amqp spec does not allow to make the difference
+ // between an already known XID and a wrong arguments (join and resume are set)
+ // TODO: make sure amqp addresses that
}
- // now wait on the future for the result
- XaResult result = future.get();
- DtxXaStatus status = result.getStatus();
+ checkStatus(result.getStatus());
+ _xid = xid;
+ }
+
+ //------------------------------------------------------------------------
+ // Private methods
+ //------------------------------------------------------------------------
+
+ /**
+ * Check xa method outcome and, when required, convert the status into the corresponding xa exception
+ * @param status method status code
+ * @throws XAException corresponding XA Exception when required
+ */
+ private void checkStatus(DtxXaStatus status) throws XAException
+ {
switch (status)
{
case XA_OK:
- // do nothing this ok
+ // Do nothing this ok
break;
case XA_RBROLLBACK:
+ // The tx has been rolled back for an unspecified reason.
throw new XAException(XAException.XA_RBROLLBACK);
case XA_RBTIMEOUT:
+ // The transaction branch took too long.
throw new XAException(XAException.XA_RBTIMEOUT);
+ case XA_HEURHAZ:
+ // The transaction branch may have been heuristically completed.
+ throw new XAException(XAException.XA_HEURHAZ);
+ case XA_HEURCOM:
+ // The transaction branch has been heuristically committed.
+ throw new XAException(XAException.XA_HEURCOM);
+ case XA_HEURRB:
+ // The transaction branch has been heuristically rolled back.
+ throw new XAException(XAException.XA_HEURRB);
+ case XA_HEURMIX:
+ // The transaction branch has been heuristically committed and rolled back.
+ throw new XAException(XAException.XA_HEURMIX);
+ case XA_RDONLY:
+ // The transaction branch was read-only and has been committed.
+ throw new XAException(XAException.XA_RDONLY);
default:
// this should not happen
if (_logger.isDebugEnabled())
{
_logger.debug("got unexpected status value: ", status);
}
+ //A resource manager error has occured in the transaction branch.
+ throw new XAException(XAException.XAER_RMERR);
+ }
+ }
+
+ /**
+ * Convert execution error to xa exception.
+ * @param error the execution error code
+ * @throws XAException
+ */
+ private void convertExecutionErrorToXAErr(ExecutionErrorCode error) throws XAException
+ {
+ switch (error)
+ {
+ case NOT_ALLOWED:
+ // The XID already exists.
+ throw new XAException(XAException.XAER_DUPID);
+ case NOT_FOUND:
+ // The XID is not valid.
+ throw new XAException(XAException.XAER_NOTA);
+ case ILLEGAL_STATE:
+ // Routine was invoked in an inproper context.
throw new XAException(XAException.XAER_PROTO);
+ case NOT_IMPLEMENTED:
+ // the command is not implemented
+ throw new XAException(XAException.XAER_RMERR);
+ case COMMAND_INVALID:
+ // Invalid call
+ throw new XAException(XAException.XAER_INVAL);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Got unexpected error: " + error);
+ }
+ //A resource manager error has occured in the transaction branch.
+ throw new XAException(XAException.XAER_RMERR);
}
}
+
+ /**
+ * convert a generic xid into qpid format
+ * @param xid xid to be converted
+ * @return the qpid formated xid
+ * @throws XAException when xid is null or when it cannot be converted.
+ */
+ private org.apache.qpidity.transport.Xid convertXid(Xid xid) throws XAException
+ {
+ if (xid == null)
+ {
+ // Invalid arguments were given.
+ throw new XAException(XAException.XAER_INVAL);
+ }
+ try
+ {
+ return XidImpl.convert(xid);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ //A resource manager error has occured in the transaction branch.
+ throw new XAException(XAException.XAER_RMERR);
+ }
+ }
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index 97f1098e43..1eda71cc63 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -54,10 +54,21 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
super(qpidConnection, con, channelId, false, // this is not a transacted session
Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
- _qpidDtxSession = qpidConnection.createDTXSession(0);
+ createSession();
_xaResource = new XAResourceImpl(this);
}
+ //-- public methods
+
+ /**
+ * Create a qpidity session.
+ */
+ public void createSession()
+ {
+ _qpidDtxSession = _qpidConnection.createDTXSession(0);
+ }
+
+
//--- javax.njms.XASEssion API
/**