summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-08-06 17:15:56 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-08-06 17:15:56 +0000
commit088b7c8461979ee8df9480cb9105cc045fd632b6 (patch)
tree6caff7beecdb01ba5d9d8535e5b7ad4fdfc002ef /java
parent900e8cdd0593fbafd3a7b354ce65c462c5f9ed05 (diff)
downloadqpid-python-088b7c8461979ee8df9480cb9105cc045fd632b6.tar.gz
Added dtx classes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563197 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/DtxSession.java113
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java47
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java9
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java27
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java54
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java329
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java118
8 files changed, 657 insertions, 44 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/DtxSession.java
index 045f52541d..127fe4d1a7 100644
--- a/java/client/src/main/java/org/apache/qpidity/DtxSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/DtxSession.java
@@ -18,7 +18,7 @@
*/
package org.apache.qpidity;
-import org.apache.qpidity.QpidException;
+import javax.transaction.xa.Xid;
/**
* This session�s resources are control under the scope of a distributed transaction.
@@ -27,11 +27,112 @@ public interface DtxSession extends Session
{
/**
- * Get the XA resource associated with this session.
+ * This method is called when messages should be produced and consumed on behalf a transaction
+ * branch identified by xid.
+ * possible options are:
+ * <ul>
+ * <li> {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen.
+ * <li> {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified.
+ * </ul>
*
- * @return this session XA resource.
- * @throws QpidException If the session fails to retrieve its associated XA resource
- * due to some error.
+ * @param xid Specifies the xid of the transaction branch to be started.
+ * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}.
+ * @throws QpidException If the session fails to start due to some error
*/
- public javax.transaction.xa.XAResource getDTXResource() throws QpidException;
+ public void dtxDemarcationStart(Xid xid, Option... options) throws QpidException;
+
+ /**
+ * This method is called when the work done on behalf a transaction branch finishes or needs to
+ * be suspended.
+ * possible options are:
+ * <ul>
+ * <li> {@link Option#FAIL}: indicates that this portion of work has failed;
+ * otherwise this portion of work has
+ * completed successfully.
+ * <li> {@link Option#SUSPEND}: Indicates that the transaction branch is
+ * temporarily suspended in an incomplete state.
+ * </ul>
+ *
+ * @param xid Specifies the xid of the transaction branch to be ended.
+ * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}.
+ * @throws QpidException If the session fails to end due to some error
+ */
+ public void dtxDemarcationEnd(Xid xid, Option... options) throws QpidException;
+
+ /**
+ * Commit the work done on behalf a transaction branch. This method commits the work associated
+ * with xid. Any produced messages are made available and any consumed messages are discarded.
+ * possible option is:
+ * <ul>
+ * <li> {@link Option#ONE_PHASE}: When set then one-phase commit optimization is used.
+ * </ul>
+ *
+ * @param xid Specifies the xid of the transaction branch to be committed.
+ * @param options Available option is: {@link Option#ONE_PHASE}
+ * @throws QpidException If the session fails to commit due to some error
+ */
+ public void dtxCoordinationCommit(Xid xid, Option... options) throws QpidException;
+
+ /**
+ * This method is called to forget about a heuristically completed transaction branch.
+ *
+ * @param xid Specifies the xid of the transaction branch to be forgotten.
+ * @throws QpidException If the session fails to forget due to some error
+ */
+ public void dtxCoordinationForget(Xid xid) throws QpidException;
+
+ /**
+ * This method obtains the current transaction timeout value in seconds. If set-timeout was not
+ * used prior to invoking this method, the return value is the default timeout; otherwise, the
+ * value used in the previous set-timeout call is returned.
+ *
+ * @param xid Specifies the xid of the transaction branch for getting the timeout.
+ * @return The current transaction timeout value in seconds.
+ * @throws QpidException If the session fails to get the timeout due to some error
+ */
+ public long dtxCoordinationGetTimeout(Xid xid) throws QpidException;
+
+ /**
+ * This method prepares for commitment any message produced or consumed on behalf of xid.
+ *
+ * @param xid Specifies the xid of the transaction branch that can be prepared.
+ * @return The status of the prepare operation: can be one of those:
+ * xa-ok: Normal execution.
+ * <p/>
+ * xa-rdonly: The transaction branch was read-only and has been committed.
+ * <p/>
+ * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified
+ * reason.
+ * <p/>
+ * xa-rbtimeout: The work represented by this transaction branch took too long.
+ * @throws QpidException If the session fails to prepare due to some error
+ */
+ public short dtxCoordinationPrepare(Xid xid) throws QpidException;
+
+ /**
+ * This method is called to obtain a list of transaction branches that are in a prepared or
+ * heuristically completed state.
+ *
+ * @return a array of xids to be recovered.
+ * @throws QpidException If the session fails to recover due to some error
+ */
+ public Xid[] dtxCoordinationRecover() throws QpidException;
+
+ /**
+ * This method rolls back the work associated with xid. Any produced messages are discarded and
+ * any consumed messages are re-enqueued.
+ *
+ * @param xid Specifies the xid of the transaction branch that can be rolled back.
+ * @throws QpidException If the session fails to rollback due to some error
+ */
+ public void dtxCoordinationRollback(Xid xid) throws QpidException;
+
+ /**
+ * Sets the specified transaction branch timeout value in seconds.
+ *
+ * @param xid Specifies the xid of the transaction branch for setting the timeout.
+ * @param timeout The transaction timeout value in seconds.
+ * @throws QpidException If the session fails to set the timeout due to some error
+ */
+ public void dtxCoordinationSetTimeout(Xid xid, long timeout) throws QpidException;
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
index 25951bc0c1..818b146491 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
@@ -42,7 +42,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
/**
* Maps from session id (Integer) to SessionImpl instance
*/
- private final Vector<SessionImpl> _sessions = new Vector<SessionImpl>();
+ protected final Vector<SessionImpl> _sessions = new Vector<SessionImpl>();
/**
* This is the clientID
@@ -113,10 +113,18 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @return A newly created session
* @throws JMSException If the Connection object fails to create a session due to some internal error.
*/
- public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
+ public synchronized Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
{
checkNotClosed();
- SessionImpl session = new SessionImpl(this, transacted, acknowledgeMode);
+ SessionImpl session = null;
+ try
+ {
+ session = new SessionImpl(this, transacted, acknowledgeMode, false);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
// add this session with the list of session that are handled by this connection
_sessions.add(session);
return session;
@@ -178,7 +186,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @return the <CODE>ExceptionListener</CODE> for this connection
* @throws JMSException In case of unforeseen problem
*/
- public ExceptionListener getExceptionListener() throws JMSException
+ public synchronized ExceptionListener getExceptionListener() throws JMSException
{
checkNotClosed();
return _exceptionListener;
@@ -203,7 +211,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @param exceptionListener The connection listener.
* @throws JMSException If the connection is closed.
*/
- public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException
+ public synchronized void setExceptionListener(ExceptionListener exceptionListener) throws JMSException
{
checkNotClosed();
_exceptionListener = exceptionListener;
@@ -217,7 +225,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
*
* @throws JMSException In case of a problem due to some internal error.
*/
- public void start() throws JMSException
+ public synchronized void start() throws JMSException
{
checkNotClosed();
if (!_started)
@@ -231,7 +239,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
}
catch (Exception e)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
}
_started = true;
@@ -248,7 +256,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
*
* @throws JMSException In case of a problem due to some internal error.
*/
- public void stop() throws JMSException
+ public synchronized void stop() throws JMSException
{
checkNotClosed();
if (_started)
@@ -262,7 +270,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
}
catch (Exception e)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
}
_started = false;
@@ -284,7 +292,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
*
* @throws JMSException In case of a problem due to some internal error.
*/
- public void close() throws JMSException
+ public synchronized void close() throws JMSException
{
checkNotClosed();
if (!_isClosed)
@@ -320,8 +328,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @throws JMSException In case of a problem due to some internal error.
*/
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws
- JMSException
+ ServerSessionPool sessionPool, int maxMessages)
+ throws JMSException
{
checkNotClosed();
return null;
@@ -359,7 +367,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @return A queueSession object/
* @throws JMSException If creating a QueueSession fails due to some internal error.
*/
- public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
+ public synchronized QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
{
checkNotClosed();
QueueSessionImpl queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode);
@@ -380,8 +388,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @throws JMSException In case of a problem due to some internal error.
*/
public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws
- JMSException
+ ServerSessionPool sessionPool, int maxMessages)
+ throws JMSException
{
return createConnectionConsumer((Destination) queue, messageSelector, sessionPool, maxMessages);
}
@@ -396,7 +404,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @return a newly created topic session
* @throws JMSException If creating the session fails due to some internal error.
*/
- public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
+ public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
{
checkNotClosed();
TopicSessionImpl session = new TopicSessionImpl(this, transacted, acknowledgeMode);
@@ -418,8 +426,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @throws JMSException In case of a problem due to some internal error.
*/
public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws
- JMSException
+ ServerSessionPool sessionPool, int maxMessages)
+ throws JMSException
{
return createConnectionConsumer((Destination) topic, messageSelector, sessionPool, maxMessages);
}
@@ -442,7 +450,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
{
_logger.debug("Connection has been closed. Cannot invoke any further operations.");
}
- throw new javax.jms.IllegalStateException("Connection has been closed. Cannot invoke any further operations.");
+ throw new javax.jms.IllegalStateException(
+ "Connection has been closed. Cannot invoke any further operations.");
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
index 6174fa9da9..1dc35b5609 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
@@ -20,6 +20,7 @@ package org.apache.qpidity.jms;
import org.apache.qpidity.QpidException;
import javax.jms.JMSException;
+import javax.transaction.xa.XAException;
/**
* Helper class for handling exceptions
@@ -47,4 +48,12 @@ public class ExceptionHelper
}
return jmsException;
}
+
+ static public XAException convertQpidExceptionToXAException(QpidException exception)
+ {
+ String qpidErrorCode = exception.getErrorCode();
+ // todo map this error to an XA code
+ int xaCode = XAException.XAER_PROTO;
+ return new XAException(xaCode);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
index 29f867d6b8..f3b6122992 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
@@ -305,10 +305,10 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer
throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
}
// check that the message is not a foreign one
-
+ // todo
// set the properties
- //
+ // todo
// dispatch it
// todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option);
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index 3d9b9b9d17..4723257794 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -128,10 +128,12 @@ public class SessionImpl implements Session
* @param transacted Indicates if the session transacted.
* @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
* {@link Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter is true.
+ * @param isXA Indicates whether this session is an XA session.
* @throws JMSSecurityException If the user could not be authenticated.
- * @throws JMSException In case of internal error.
+ * @throws QpidException In case of internal error.
*/
- protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException
+ protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode, boolean isXA)
+ throws QpidException
{
_connection = connection;
_transacted = transacted;
@@ -141,19 +143,12 @@ public class SessionImpl implements Session
acknowledgeMode = Session.SESSION_TRANSACTED;
}
_acknowledgeMode = acknowledgeMode;
- try
- {
- // create the qpid session with an expiry <= 0 so that the session does not expire
- _qpidSession = _connection.getQpidConnection().createSession(0);
- // set transacted if required
- if (_transacted)
- {
- //_qpidSession.setTransacted();
- }
- }
- catch (QpidException e)
+ // create the qpid session with an expiry <= 0 so that the session does not expire
+ _qpidSession = _connection.getQpidConnection().createSession(0);
+ // set transacted if required
+ if (_transacted && !isXA)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ _qpidSession.txSelect();
}
// init the message dispatcher.
initMessageDispatcherThread();
@@ -314,7 +309,6 @@ public class SessionImpl implements Session
// commit the underlying Qpid Session
try
{
- // Note: this operation makes sure that asynch message processing has returned
_qpidSession.txCommit();
}
catch (QpidException e)
@@ -341,7 +335,6 @@ public class SessionImpl implements Session
// rollback the underlying Qpid Session
try
{
- // Note: this operation makes sure that asynch message processing has returned
_qpidSession.txRollback();
}
catch (org.apache.qpidity.QpidException e)
@@ -640,7 +633,7 @@ public class SessionImpl implements Session
}
catch (QpidException e)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
return result;
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
new file mode 100644
index 0000000000..e76e566efd
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
@@ -0,0 +1,54 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.XAConnection;
+import javax.jms.JMSException;
+import javax.jms.XASession;
+
+/**
+ * This class implements the javax.jms.XAConnection interface
+ */
+public class XAConnectionImpl extends ConnectionImpl implements XAConnection
+{
+ /**
+ * Creates an XASession.
+ *
+ * @return A newly created XASession.
+ * @throws JMSException If the XAConnectiono fails to create an XASession due to
+ * some internal error.
+ */
+ public synchronized XASession createXASession() throws JMSException
+ {
+ checkNotClosed();
+ XASessionImpl xasession;
+ try
+ {
+ xasession = new XASessionImpl(this);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ // add this session with the list of session that are handled by this connection
+ _sessions.add(xasession);
+ return xasession;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
new file mode 100644
index 0000000000..0ec8c4a423
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
@@ -0,0 +1,329 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.QpidException;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import javax.transaction.xa.XAException;
+
+/**
+ * This is an implementation of javax.jms.XAResource.
+ */
+public class XAResourceImpl implements XAResource
+{
+ /**
+ * this XAResourceImpl's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class);
+
+ /**
+ * Reference to the associated XASession
+ */
+ private XASessionImpl _xaSession = null;
+
+ /**
+ * The XID of this resource
+ */
+ private Xid _xid;
+
+ //--- constructor
+
+ /**
+ * Create an XAResource associated with a XASession
+ *
+ * @param xaSession The session XAresource
+ */
+ protected XAResourceImpl(XASessionImpl xaSession)
+ {
+ _xaSession = xaSession;
+ }
+
+ //--- The XAResource
+ /**
+ * Commits the global transaction specified by xid.
+ *
+ * @param xid A global transaction identifier
+ * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid.
+ * @throws XAException An error has occurred. Possible XAExceptions are XAER_RMERR, XAER_NOTA or XAER_PROTO.
+ */
+ public void commit(Xid xid, boolean b) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("commit ", xid);
+ }
+ try
+ {
+ _xaSession.getQpidSession().dtxCoordinationCommit(xid, b ? Option.ONE_PHASE : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+
+ /**
+ * Ends the work performed on behalf of a transaction branch.
+ * The resource manager disassociates the XA resource from the transaction branch specified
+ * and lets the transaction complete.
+ * <ul>
+ * <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily suspended in an incomplete state.
+ * The transaction context is in a suspended state and must be resumed via the start method with TMRESUME specified.
+ * <li> If TMFAIL is specified, the portion of work has failed. The resource manager may mark the transaction as rollback-only
+ * <li> If TMSUCCESS is specified, the portion of work has completed successfully.
+ * /ul>
+ *
+ * @param xid A global transaction identifier that is the same as the identifier used previously in the start method
+ * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND.
+ * @throws XAException An error has occurred. Possible XAException values
+ * are XAER_RMERR, XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*.
+ */
+ public void end(Xid xid, int flag) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("end ", xid);
+ }
+ try
+ {
+ _xid = null;
+ _xaSession.getQpidSession()
+ .dtxDemarcationEnd(xid, flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
+ flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+
+ /**
+ * Tells the resource manager to forget about a heuristically completed transaction branch.
+ *
+ * @param xid A global transaction identifier
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL,
+ * XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void forget(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("forget ", xid);
+ }
+ try
+ {
+ _xaSession.getQpidSession()
+ .dtxCoordinationForget(xid);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+
+ /**
+ * Obtains the current transaction timeout value set for this XAResource instance.
+ * If XAResource.setTransactionTimeout was not used prior to invoking this method,
+ * the return value is the default timeout i.e. 0;
+ * otherwise, the value used in the previous setTransactionTimeout call is returned.
+ *
+ * @return The transaction timeout value in seconds.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
+ */
+ public int getTransactionTimeout() throws XAException
+ {
+ int result = 0;
+ if (_xid != null)
+ {
+ try
+ {
+ result = (int) _xaSession.getQpidSession().dtxCoordinationGetTimeout(_xid);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * This method is called to determine if the resource manager instance represented
+ * by the target object is the same as the resouce manager instance represented by
+ * the parameter xaResource.
+ *
+ * @param xaResource An XAResource object whose resource manager instance is to
+ * be compared with the resource manager instance of the target object
+ * @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
+ */
+ public boolean isSameRM(XAResource xaResource) throws XAException
+ {
+ // TODO : get the server identity of xaResource and compare it with our own one
+ return false;
+ }
+
+ /**
+ * Prepare for a transaction commit of the transaction specified in <code>Xid</code>.
+ *
+ * @param xid A global transaction identifier.
+ * @return A value indicating the resource manager's vote on the outcome of the transaction.
+ * The possible values are: XA_RDONLY or XA_OK.
+ * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA
+ */
+ public int prepare(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("prepare ", xid);
+ }
+ int result;
+ try
+ {
+ result = _xaSession.getQpidSession()
+ .dtxCoordinationPrepare(xid);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ if (result == XAException.XA_RDONLY)
+ {
+ throw new XAException(XAException.XA_RDONLY);
+ }
+ else if (result == XAException.XA_RBROLLBACK)
+ {
+ throw new XAException(XAException.XA_RBROLLBACK);
+ }
+ return result;
+ }
+
+ /**
+ * Obtains a list of prepared transaction branches.
+ * <p/>
+ * The transaction manager calls this method during recovery to obtain the list of transaction branches
+ * that are currently in prepared or heuristically completed states.
+ *
+ * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS.
+ * TMNOFLAGS must be used when no other flags are set in the parameter.
+ * @return zero or more XIDs of the transaction branches that are currently in a prepared or heuristically
+ * completed state.
+ * @throws XAException An error has occurred. Possible value is XAER_INVAL.
+ */
+ public Xid[] recover(int flag) throws XAException
+ {
+ try
+ {
+ // the flag is ignored
+ return _xaSession.getQpidSession()
+ .dtxCoordinationRecover();
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+
+ /**
+ * Informs the resource manager to roll back work done on behalf of a transaction branch
+ *
+ * @param xid A global transaction identifier.
+ * @throws XAException An error has occurred.
+ */
+ public void rollback(Xid xid) throws XAException
+ {
+ try
+ {
+ // the flag is ignored
+ _xaSession.getQpidSession()
+ .dtxCoordinationRollback(xid);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+
+ /**
+ * Sets the current transaction timeout value for this XAResource instance.
+ * Once set, this timeout value is effective until setTransactionTimeout is
+ * invoked again with a different value.
+ * To reset the timeout value to the default value used by the resource manager, set the value to zero.
+ *
+ * @param timeout The transaction timeout value in seconds.
+ * @return true if transaction timeout value is set successfully; otherwise false.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL.
+ */
+ public boolean setTransactionTimeout(int timeout) throws XAException
+ {
+ boolean result = false;
+ if (_xid != null)
+ {
+ try
+ {
+ // the flag is ignored
+ _xaSession.getQpidSession()
+ .dtxCoordinationSetTimeout(_xid, timeout);
+ result = true;
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Starts work on behalf of a transaction branch specified in xid.
+ * <ul>
+ * <li> If TMJOIN is specified, an exception is thrown as it is not supported
+ * <li> If TMRESUME is specified, the start applies to resuming a suspended transaction specified in the parameter xid.
+ * <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified by xid has previously been seen by the
+ * resource manager, the resource manager throws the XAException exception with XAER_DUPID error code.
+ * </ul>
+ *
+ * @param xid A global transaction identifier to be associated with the resource
+ * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME
+ * @throws XAException An error has occurred. Possible exceptions
+ * are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void start(Xid xid, int flag) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("start ", xid);
+ }
+ _xid = xid;
+ try
+ {
+ _xaSession.getQpidSession()
+ .dtxDemarcationStart(xid, flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
+ flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
new file mode 100644
index 0000000000..d62587267a
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
@@ -0,0 +1,118 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.DtxSession;
+
+import javax.jms.XASession;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.TransactionInProgressException;
+import javax.transaction.xa.XAResource;
+
+/**
+ * This is an implementation of the javax.jms.XASEssion interface.
+ */
+public class XASessionImpl extends SessionImpl implements XASession
+{
+ /**
+ * XAResource associated with this XASession
+ */
+ private final XAResourceImpl _xaResource;
+
+ /**
+ * This XASession Qpid DtxSession
+ */
+ private DtxSession _qpidDtxSession;
+
+ //-- Constructors
+ /**
+ * Create a JMS XASession
+ *
+ * @param connection The ConnectionImpl object from which the Session is created.
+ * @throws QpidException In case of internal error.
+ */
+ protected XASessionImpl(ConnectionImpl connection) throws QpidException
+ {
+ super(connection, true, // this is a transacted session
+ Session.SESSION_TRANSACTED, // the ack mode is transacted
+ true); // this is an XA session so do not set tx
+ _qpidDtxSession = getConnection().getQpidConnection().createDTXSession(0);
+ _xaResource = new XAResourceImpl(this);
+ }
+
+ //--- javax.jms.XASEssion API
+
+ /**
+ * Gets the session associated with this XASession.
+ *
+ * @return the session object
+ * @throws JMSException if an internal error occurs.
+ * @since 1.1
+ */
+ public Session getSession() throws JMSException
+ {
+ return this;
+ }
+
+ /**
+ * Returns an XA resource.
+ *
+ * @return An XA resource.
+ */
+ public XAResource getXAResource()
+ {
+ return _xaResource;
+ }
+
+ //-- overwritten mehtods
+ /**
+ * Throws a {@link TransactionInProgressException}, since it should
+ * not be called for an XASession object.
+ *
+ * @throws TransactionInProgressException always.
+ */
+ public void commit() throws JMSException
+ {
+ throw new TransactionInProgressException(
+ "XASession: A direct invocation of the commit operation is probibited!");
+ }
+
+ /**
+ * Throws a {@link TransactionInProgressException}, since it should
+ * not be called for an XASession object.
+ *
+ * @throws TransactionInProgressException always.
+ */
+ public void rollback() throws JMSException
+ {
+ throw new TransactionInProgressException(
+ "XASession: A direct invocation of the rollback operation is probibited!");
+ }
+
+ /**
+ * Access to the underlying Qpid Session
+ *
+ * @return The associated Qpid Session.
+ */
+ protected org.apache.qpidity.DtxSession getQpidSession()
+ {
+ return _qpidDtxSession;
+ }
+}