diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-06 17:15:56 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-06 17:15:56 +0000 |
| commit | 088b7c8461979ee8df9480cb9105cc045fd632b6 (patch) | |
| tree | 6caff7beecdb01ba5d9d8535e5b7ad4fdfc002ef /java/client | |
| parent | 900e8cdd0593fbafd3a7b354ce65c462c5f9ed05 (diff) | |
| download | qpid-python-088b7c8461979ee8df9480cb9105cc045fd632b6.tar.gz | |
Added dtx classes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563197 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
8 files changed, 657 insertions, 44 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/DtxSession.java index 045f52541d..127fe4d1a7 100644 --- a/java/client/src/main/java/org/apache/qpidity/DtxSession.java +++ b/java/client/src/main/java/org/apache/qpidity/DtxSession.java @@ -18,7 +18,7 @@ */ package org.apache.qpidity; -import org.apache.qpidity.QpidException; +import javax.transaction.xa.Xid; /** * This session�s resources are control under the scope of a distributed transaction. @@ -27,11 +27,112 @@ public interface DtxSession extends Session { /** - * Get the XA resource associated with this session. + * This method is called when messages should be produced and consumed on behalf a transaction + * branch identified by xid. + * possible options are: + * <ul> + * <li> {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen. + * <li> {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified. + * </ul> * - * @return this session XA resource. - * @throws QpidException If the session fails to retrieve its associated XA resource - * due to some error. + * @param xid Specifies the xid of the transaction branch to be started. + * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}. + * @throws QpidException If the session fails to start due to some error */ - public javax.transaction.xa.XAResource getDTXResource() throws QpidException; + public void dtxDemarcationStart(Xid xid, Option... options) throws QpidException; + + /** + * This method is called when the work done on behalf a transaction branch finishes or needs to + * be suspended. + * possible options are: + * <ul> + * <li> {@link Option#FAIL}: indicates that this portion of work has failed; + * otherwise this portion of work has + * completed successfully. + * <li> {@link Option#SUSPEND}: Indicates that the transaction branch is + * temporarily suspended in an incomplete state. + * </ul> + * + * @param xid Specifies the xid of the transaction branch to be ended. + * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}. + * @throws QpidException If the session fails to end due to some error + */ + public void dtxDemarcationEnd(Xid xid, Option... options) throws QpidException; + + /** + * Commit the work done on behalf a transaction branch. This method commits the work associated + * with xid. Any produced messages are made available and any consumed messages are discarded. + * possible option is: + * <ul> + * <li> {@link Option#ONE_PHASE}: When set then one-phase commit optimization is used. + * </ul> + * + * @param xid Specifies the xid of the transaction branch to be committed. + * @param options Available option is: {@link Option#ONE_PHASE} + * @throws QpidException If the session fails to commit due to some error + */ + public void dtxCoordinationCommit(Xid xid, Option... options) throws QpidException; + + /** + * This method is called to forget about a heuristically completed transaction branch. + * + * @param xid Specifies the xid of the transaction branch to be forgotten. + * @throws QpidException If the session fails to forget due to some error + */ + public void dtxCoordinationForget(Xid xid) throws QpidException; + + /** + * This method obtains the current transaction timeout value in seconds. If set-timeout was not + * used prior to invoking this method, the return value is the default timeout; otherwise, the + * value used in the previous set-timeout call is returned. + * + * @param xid Specifies the xid of the transaction branch for getting the timeout. + * @return The current transaction timeout value in seconds. + * @throws QpidException If the session fails to get the timeout due to some error + */ + public long dtxCoordinationGetTimeout(Xid xid) throws QpidException; + + /** + * This method prepares for commitment any message produced or consumed on behalf of xid. + * + * @param xid Specifies the xid of the transaction branch that can be prepared. + * @return The status of the prepare operation: can be one of those: + * xa-ok: Normal execution. + * <p/> + * xa-rdonly: The transaction branch was read-only and has been committed. + * <p/> + * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified + * reason. + * <p/> + * xa-rbtimeout: The work represented by this transaction branch took too long. + * @throws QpidException If the session fails to prepare due to some error + */ + public short dtxCoordinationPrepare(Xid xid) throws QpidException; + + /** + * This method is called to obtain a list of transaction branches that are in a prepared or + * heuristically completed state. + * + * @return a array of xids to be recovered. + * @throws QpidException If the session fails to recover due to some error + */ + public Xid[] dtxCoordinationRecover() throws QpidException; + + /** + * This method rolls back the work associated with xid. Any produced messages are discarded and + * any consumed messages are re-enqueued. + * + * @param xid Specifies the xid of the transaction branch that can be rolled back. + * @throws QpidException If the session fails to rollback due to some error + */ + public void dtxCoordinationRollback(Xid xid) throws QpidException; + + /** + * Sets the specified transaction branch timeout value in seconds. + * + * @param xid Specifies the xid of the transaction branch for setting the timeout. + * @param timeout The transaction timeout value in seconds. + * @throws QpidException If the session fails to set the timeout due to some error + */ + public void dtxCoordinationSetTimeout(Xid xid, long timeout) throws QpidException; } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java index 25951bc0c1..818b146491 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java @@ -42,7 +42,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect /** * Maps from session id (Integer) to SessionImpl instance */ - private final Vector<SessionImpl> _sessions = new Vector<SessionImpl>(); + protected final Vector<SessionImpl> _sessions = new Vector<SessionImpl>(); /** * This is the clientID @@ -113,10 +113,18 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @return A newly created session * @throws JMSException If the Connection object fails to create a session due to some internal error. */ - public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException + public synchronized Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); - SessionImpl session = new SessionImpl(this, transacted, acknowledgeMode); + SessionImpl session = null; + try + { + session = new SessionImpl(this, transacted, acknowledgeMode, false); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } // add this session with the list of session that are handled by this connection _sessions.add(session); return session; @@ -178,7 +186,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @return the <CODE>ExceptionListener</CODE> for this connection * @throws JMSException In case of unforeseen problem */ - public ExceptionListener getExceptionListener() throws JMSException + public synchronized ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); return _exceptionListener; @@ -203,7 +211,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @param exceptionListener The connection listener. * @throws JMSException If the connection is closed. */ - public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException + public synchronized void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { checkNotClosed(); _exceptionListener = exceptionListener; @@ -217,7 +225,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @throws JMSException In case of a problem due to some internal error. */ - public void start() throws JMSException + public synchronized void start() throws JMSException { checkNotClosed(); if (!_started) @@ -231,7 +239,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect } catch (Exception e) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + throw ExceptionHelper.convertQpidExceptionToJMSException(e); } } _started = true; @@ -248,7 +256,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @throws JMSException In case of a problem due to some internal error. */ - public void stop() throws JMSException + public synchronized void stop() throws JMSException { checkNotClosed(); if (_started) @@ -262,7 +270,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect } catch (Exception e) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + throw ExceptionHelper.convertQpidExceptionToJMSException(e); } } _started = false; @@ -284,7 +292,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @throws JMSException In case of a problem due to some internal error. */ - public void close() throws JMSException + public synchronized void close() throws JMSException { checkNotClosed(); if (!_isClosed) @@ -320,8 +328,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @throws JMSException In case of a problem due to some internal error. */ public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, - ServerSessionPool sessionPool, int maxMessages) throws - JMSException + ServerSessionPool sessionPool, int maxMessages) + throws JMSException { checkNotClosed(); return null; @@ -359,7 +367,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @return A queueSession object/ * @throws JMSException If creating a QueueSession fails due to some internal error. */ - public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException + public synchronized QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); QueueSessionImpl queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode); @@ -380,8 +388,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @throws JMSException In case of a problem due to some internal error. */ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, - ServerSessionPool sessionPool, int maxMessages) throws - JMSException + ServerSessionPool sessionPool, int maxMessages) + throws JMSException { return createConnectionConsumer((Destination) queue, messageSelector, sessionPool, maxMessages); } @@ -396,7 +404,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @return a newly created topic session * @throws JMSException If creating the session fails due to some internal error. */ - public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException + public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); TopicSessionImpl session = new TopicSessionImpl(this, transacted, acknowledgeMode); @@ -418,8 +426,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @throws JMSException In case of a problem due to some internal error. */ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, - ServerSessionPool sessionPool, int maxMessages) throws - JMSException + ServerSessionPool sessionPool, int maxMessages) + throws JMSException { return createConnectionConsumer((Destination) topic, messageSelector, sessionPool, maxMessages); } @@ -442,7 +450,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { _logger.debug("Connection has been closed. Cannot invoke any further operations."); } - throw new javax.jms.IllegalStateException("Connection has been closed. Cannot invoke any further operations."); + throw new javax.jms.IllegalStateException( + "Connection has been closed. Cannot invoke any further operations."); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java index 6174fa9da9..1dc35b5609 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java @@ -20,6 +20,7 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; import javax.jms.JMSException; +import javax.transaction.xa.XAException; /** * Helper class for handling exceptions @@ -47,4 +48,12 @@ public class ExceptionHelper } return jmsException; } + + static public XAException convertQpidExceptionToXAException(QpidException exception) + { + String qpidErrorCode = exception.getErrorCode(); + // todo map this error to an XA code + int xaCode = XAException.XAER_PROTO; + return new XAException(xaCode); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java index 29f867d6b8..f3b6122992 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java @@ -305,10 +305,10 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive); } // check that the message is not a foreign one - + // todo // set the properties - // + // todo // dispatch it // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 3d9b9b9d17..4723257794 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -128,10 +128,12 @@ public class SessionImpl implements Session * @param transacted Indicates if the session transacted. * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to * {@link Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter is true. + * @param isXA Indicates whether this session is an XA session. * @throws JMSSecurityException If the user could not be authenticated. - * @throws JMSException In case of internal error. + * @throws QpidException In case of internal error. */ - protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException + protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode, boolean isXA) + throws QpidException { _connection = connection; _transacted = transacted; @@ -141,19 +143,12 @@ public class SessionImpl implements Session acknowledgeMode = Session.SESSION_TRANSACTED; } _acknowledgeMode = acknowledgeMode; - try - { - // create the qpid session with an expiry <= 0 so that the session does not expire - _qpidSession = _connection.getQpidConnection().createSession(0); - // set transacted if required - if (_transacted) - { - //_qpidSession.setTransacted(); - } - } - catch (QpidException e) + // create the qpid session with an expiry <= 0 so that the session does not expire + _qpidSession = _connection.getQpidConnection().createSession(0); + // set transacted if required + if (_transacted && !isXA) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + _qpidSession.txSelect(); } // init the message dispatcher. initMessageDispatcherThread(); @@ -314,7 +309,6 @@ public class SessionImpl implements Session // commit the underlying Qpid Session try { - // Note: this operation makes sure that asynch message processing has returned _qpidSession.txCommit(); } catch (QpidException e) @@ -341,7 +335,6 @@ public class SessionImpl implements Session // rollback the underlying Qpid Session try { - // Note: this operation makes sure that asynch message processing has returned _qpidSession.txRollback(); } catch (org.apache.qpidity.QpidException e) @@ -640,7 +633,7 @@ public class SessionImpl implements Session } catch (QpidException e) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + throw ExceptionHelper.convertQpidExceptionToJMSException(e); } return result; } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java new file mode 100644 index 0000000000..e76e566efd --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java @@ -0,0 +1,54 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; + +import javax.jms.XAConnection; +import javax.jms.JMSException; +import javax.jms.XASession; + +/** + * This class implements the javax.jms.XAConnection interface + */ +public class XAConnectionImpl extends ConnectionImpl implements XAConnection +{ + /** + * Creates an XASession. + * + * @return A newly created XASession. + * @throws JMSException If the XAConnectiono fails to create an XASession due to + * some internal error. + */ + public synchronized XASession createXASession() throws JMSException + { + checkNotClosed(); + XASessionImpl xasession; + try + { + xasession = new XASessionImpl(this); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + // add this session with the list of session that are handled by this connection + _sessions.add(xasession); + return xasession; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java new file mode 100644 index 0000000000..0ec8c4a423 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java @@ -0,0 +1,329 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpidity.Option; +import org.apache.qpidity.QpidException; + +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import javax.transaction.xa.XAException; + +/** + * This is an implementation of javax.jms.XAResource. + */ +public class XAResourceImpl implements XAResource +{ + /** + * this XAResourceImpl's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class); + + /** + * Reference to the associated XASession + */ + private XASessionImpl _xaSession = null; + + /** + * The XID of this resource + */ + private Xid _xid; + + //--- constructor + + /** + * Create an XAResource associated with a XASession + * + * @param xaSession The session XAresource + */ + protected XAResourceImpl(XASessionImpl xaSession) + { + _xaSession = xaSession; + } + + //--- The XAResource + /** + * Commits the global transaction specified by xid. + * + * @param xid A global transaction identifier + * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid. + * @throws XAException An error has occurred. Possible XAExceptions are XAER_RMERR, XAER_NOTA or XAER_PROTO. + */ + public void commit(Xid xid, boolean b) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("commit ", xid); + } + try + { + _xaSession.getQpidSession().dtxCoordinationCommit(xid, b ? Option.ONE_PHASE : Option.NO_OPTION); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToXAException(e); + } + } + + /** + * Ends the work performed on behalf of a transaction branch. + * The resource manager disassociates the XA resource from the transaction branch specified + * and lets the transaction complete. + * <ul> + * <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily suspended in an incomplete state. + * The transaction context is in a suspended state and must be resumed via the start method with TMRESUME specified. + * <li> If TMFAIL is specified, the portion of work has failed. The resource manager may mark the transaction as rollback-only + * <li> If TMSUCCESS is specified, the portion of work has completed successfully. + * /ul> + * + * @param xid A global transaction identifier that is the same as the identifier used previously in the start method + * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND. + * @throws XAException An error has occurred. Possible XAException values + * are XAER_RMERR, XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*. + */ + public void end(Xid xid, int flag) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("end ", xid); + } + try + { + _xid = null; + _xaSession.getQpidSession() + .dtxDemarcationEnd(xid, flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, + flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToXAException(e); + } + } + + /** + * Tells the resource manager to forget about a heuristically completed transaction branch. + * + * @param xid A global transaction identifier + * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, + * XAER_NOTA, XAER_INVAL, or XAER_PROTO. + */ + public void forget(Xid xid) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("forget ", xid); + } + try + { + _xaSession.getQpidSession() + .dtxCoordinationForget(xid); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToXAException(e); + } + } + + /** + * Obtains the current transaction timeout value set for this XAResource instance. + * If XAResource.setTransactionTimeout was not used prior to invoking this method, + * the return value is the default timeout i.e. 0; + * otherwise, the value used in the previous setTransactionTimeout call is returned. + * + * @return The transaction timeout value in seconds. + * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. + */ + public int getTransactionTimeout() throws XAException + { + int result = 0; + if (_xid != null) + { + try + { + result = (int) _xaSession.getQpidSession().dtxCoordinationGetTimeout(_xid); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToXAException(e); + } + } + return result; + } + + /** + * This method is called to determine if the resource manager instance represented + * by the target object is the same as the resouce manager instance represented by + * the parameter xaResource. + * + * @param xaResource An XAResource object whose resource manager instance is to + * be compared with the resource manager instance of the target object + * @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>. + * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. + */ + public boolean isSameRM(XAResource xaResource) throws XAException + { + // TODO : get the server identity of xaResource and compare it with our own one + return false; + } + + /** + * Prepare for a transaction commit of the transaction specified in <code>Xid</code>. + * + * @param xid A global transaction identifier. + * @return A value indicating the resource manager's vote on the outcome of the transaction. + * The possible values are: XA_RDONLY or XA_OK. + * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA + */ + public int prepare(Xid xid) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("prepare ", xid); + } + int result; + try + { + result = _xaSession.getQpidSession() + .dtxCoordinationPrepare(xid); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToXAException(e); + } + if (result == XAException.XA_RDONLY) + { + throw new XAException(XAException.XA_RDONLY); + } + else if (result == XAException.XA_RBROLLBACK) + { + throw new XAException(XAException.XA_RBROLLBACK); + } + return result; + } + + /** + * Obtains a list of prepared transaction branches. + * <p/> + * The transaction manager calls this method during recovery to obtain the list of transaction branches + * that are currently in prepared or heuristically completed states. + * + * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS. + * TMNOFLAGS must be used when no other flags are set in the parameter. + * @return zero or more XIDs of the transaction branches that are currently in a prepared or heuristically + * completed state. + * @throws XAException An error has occurred. Possible value is XAER_INVAL. + */ + public Xid[] recover(int flag) throws XAException + { + try + { + // the flag is ignored + return _xaSession.getQpidSession() + .dtxCoordinationRecover(); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToXAException(e); + } + } + + /** + * Informs the resource manager to roll back work done on behalf of a transaction branch + * + * @param xid A global transaction identifier. + * @throws XAException An error has occurred. + */ + public void rollback(Xid xid) throws XAException + { + try + { + // the flag is ignored + _xaSession.getQpidSession() + .dtxCoordinationRollback(xid); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToXAException(e); + } + } + + /** + * Sets the current transaction timeout value for this XAResource instance. + * Once set, this timeout value is effective until setTransactionTimeout is + * invoked again with a different value. + * To reset the timeout value to the default value used by the resource manager, set the value to zero. + * + * @param timeout The transaction timeout value in seconds. + * @return true if transaction timeout value is set successfully; otherwise false. + * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL. + */ + public boolean setTransactionTimeout(int timeout) throws XAException + { + boolean result = false; + if (_xid != null) + { + try + { + // the flag is ignored + _xaSession.getQpidSession() + .dtxCoordinationSetTimeout(_xid, timeout); + result = true; + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToXAException(e); + } + } + return result; + } + + /** + * Starts work on behalf of a transaction branch specified in xid. + * <ul> + * <li> If TMJOIN is specified, an exception is thrown as it is not supported + * <li> If TMRESUME is specified, the start applies to resuming a suspended transaction specified in the parameter xid. + * <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified by xid has previously been seen by the + * resource manager, the resource manager throws the XAException exception with XAER_DUPID error code. + * </ul> + * + * @param xid A global transaction identifier to be associated with the resource + * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME + * @throws XAException An error has occurred. Possible exceptions + * are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO. + */ + public void start(Xid xid, int flag) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("start ", xid); + } + _xid = xid; + try + { + _xaSession.getQpidSession() + .dtxDemarcationStart(xid, flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, + flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToXAException(e); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java new file mode 100644 index 0000000000..d62587267a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java @@ -0,0 +1,118 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.DtxSession; + +import javax.jms.XASession; +import javax.jms.Session; +import javax.jms.JMSException; +import javax.jms.TransactionInProgressException; +import javax.transaction.xa.XAResource; + +/** + * This is an implementation of the javax.jms.XASEssion interface. + */ +public class XASessionImpl extends SessionImpl implements XASession +{ + /** + * XAResource associated with this XASession + */ + private final XAResourceImpl _xaResource; + + /** + * This XASession Qpid DtxSession + */ + private DtxSession _qpidDtxSession; + + //-- Constructors + /** + * Create a JMS XASession + * + * @param connection The ConnectionImpl object from which the Session is created. + * @throws QpidException In case of internal error. + */ + protected XASessionImpl(ConnectionImpl connection) throws QpidException + { + super(connection, true, // this is a transacted session + Session.SESSION_TRANSACTED, // the ack mode is transacted + true); // this is an XA session so do not set tx + _qpidDtxSession = getConnection().getQpidConnection().createDTXSession(0); + _xaResource = new XAResourceImpl(this); + } + + //--- javax.jms.XASEssion API + + /** + * Gets the session associated with this XASession. + * + * @return the session object + * @throws JMSException if an internal error occurs. + * @since 1.1 + */ + public Session getSession() throws JMSException + { + return this; + } + + /** + * Returns an XA resource. + * + * @return An XA resource. + */ + public XAResource getXAResource() + { + return _xaResource; + } + + //-- overwritten mehtods + /** + * Throws a {@link TransactionInProgressException}, since it should + * not be called for an XASession object. + * + * @throws TransactionInProgressException always. + */ + public void commit() throws JMSException + { + throw new TransactionInProgressException( + "XASession: A direct invocation of the commit operation is probibited!"); + } + + /** + * Throws a {@link TransactionInProgressException}, since it should + * not be called for an XASession object. + * + * @throws TransactionInProgressException always. + */ + public void rollback() throws JMSException + { + throw new TransactionInProgressException( + "XASession: A direct invocation of the rollback operation is probibited!"); + } + + /** + * Access to the underlying Qpid Session + * + * @return The associated Qpid Session. + */ + protected org.apache.qpidity.DtxSession getQpidSession() + { + return _qpidDtxSession; + } +} |
