From 088b7c8461979ee8df9480cb9105cc045fd632b6 Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Mon, 6 Aug 2007 17:15:56 +0000 Subject: Added dtx classes git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563197 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpidity/DtxSession.java | 113 ++++++- .../org/apache/qpidity/jms/ConnectionImpl.java | 47 +-- .../org/apache/qpidity/jms/ExceptionHelper.java | 9 + .../apache/qpidity/jms/MessageProducerImpl.java | 4 +- .../java/org/apache/qpidity/jms/SessionImpl.java | 27 +- .../org/apache/qpidity/jms/XAConnectionImpl.java | 54 ++++ .../org/apache/qpidity/jms/XAResourceImpl.java | 329 +++++++++++++++++++++ .../java/org/apache/qpidity/jms/XASessionImpl.java | 118 ++++++++ 8 files changed, 657 insertions(+), 44 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java create mode 100644 java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java create mode 100644 java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpidity/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/DtxSession.java index 045f52541d..127fe4d1a7 100644 --- a/java/client/src/main/java/org/apache/qpidity/DtxSession.java +++ b/java/client/src/main/java/org/apache/qpidity/DtxSession.java @@ -18,7 +18,7 @@ */ package org.apache.qpidity; -import org.apache.qpidity.QpidException; +import javax.transaction.xa.Xid; /** * This session�s resources are control under the scope of a distributed transaction. @@ -27,11 +27,112 @@ public interface DtxSession extends Session { /** - * Get the XA resource associated with this session. + * This method is called when messages should be produced and consumed on behalf a transaction + * branch identified by xid. + * possible options are: + * * - * @return this session XA resource. - * @throws QpidException If the session fails to retrieve its associated XA resource - * due to some error. + * @param xid Specifies the xid of the transaction branch to be started. + * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}. + * @throws QpidException If the session fails to start due to some error */ - public javax.transaction.xa.XAResource getDTXResource() throws QpidException; + public void dtxDemarcationStart(Xid xid, Option... options) throws QpidException; + + /** + * This method is called when the work done on behalf a transaction branch finishes or needs to + * be suspended. + * possible options are: + * + * + * @param xid Specifies the xid of the transaction branch to be ended. + * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}. + * @throws QpidException If the session fails to end due to some error + */ + public void dtxDemarcationEnd(Xid xid, Option... options) throws QpidException; + + /** + * Commit the work done on behalf a transaction branch. This method commits the work associated + * with xid. Any produced messages are made available and any consumed messages are discarded. + * possible option is: + * + * + * @param xid Specifies the xid of the transaction branch to be committed. + * @param options Available option is: {@link Option#ONE_PHASE} + * @throws QpidException If the session fails to commit due to some error + */ + public void dtxCoordinationCommit(Xid xid, Option... options) throws QpidException; + + /** + * This method is called to forget about a heuristically completed transaction branch. + * + * @param xid Specifies the xid of the transaction branch to be forgotten. + * @throws QpidException If the session fails to forget due to some error + */ + public void dtxCoordinationForget(Xid xid) throws QpidException; + + /** + * This method obtains the current transaction timeout value in seconds. If set-timeout was not + * used prior to invoking this method, the return value is the default timeout; otherwise, the + * value used in the previous set-timeout call is returned. + * + * @param xid Specifies the xid of the transaction branch for getting the timeout. + * @return The current transaction timeout value in seconds. + * @throws QpidException If the session fails to get the timeout due to some error + */ + public long dtxCoordinationGetTimeout(Xid xid) throws QpidException; + + /** + * This method prepares for commitment any message produced or consumed on behalf of xid. + * + * @param xid Specifies the xid of the transaction branch that can be prepared. + * @return The status of the prepare operation: can be one of those: + * xa-ok: Normal execution. + *

+ * xa-rdonly: The transaction branch was read-only and has been committed. + *

+ * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified + * reason. + *

+ * xa-rbtimeout: The work represented by this transaction branch took too long. + * @throws QpidException If the session fails to prepare due to some error + */ + public short dtxCoordinationPrepare(Xid xid) throws QpidException; + + /** + * This method is called to obtain a list of transaction branches that are in a prepared or + * heuristically completed state. + * + * @return a array of xids to be recovered. + * @throws QpidException If the session fails to recover due to some error + */ + public Xid[] dtxCoordinationRecover() throws QpidException; + + /** + * This method rolls back the work associated with xid. Any produced messages are discarded and + * any consumed messages are re-enqueued. + * + * @param xid Specifies the xid of the transaction branch that can be rolled back. + * @throws QpidException If the session fails to rollback due to some error + */ + public void dtxCoordinationRollback(Xid xid) throws QpidException; + + /** + * Sets the specified transaction branch timeout value in seconds. + * + * @param xid Specifies the xid of the transaction branch for setting the timeout. + * @param timeout The transaction timeout value in seconds. + * @throws QpidException If the session fails to set the timeout due to some error + */ + public void dtxCoordinationSetTimeout(Xid xid, long timeout) throws QpidException; } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java index 25951bc0c1..818b146491 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java @@ -42,7 +42,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect /** * Maps from session id (Integer) to SessionImpl instance */ - private final Vector _sessions = new Vector(); + protected final Vector _sessions = new Vector(); /** * This is the clientID @@ -113,10 +113,18 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @return A newly created session * @throws JMSException If the Connection object fails to create a session due to some internal error. */ - public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException + public synchronized Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); - SessionImpl session = new SessionImpl(this, transacted, acknowledgeMode); + SessionImpl session = null; + try + { + session = new SessionImpl(this, transacted, acknowledgeMode, false); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } // add this session with the list of session that are handled by this connection _sessions.add(session); return session; @@ -178,7 +186,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @return the ExceptionListener for this connection * @throws JMSException In case of unforeseen problem */ - public ExceptionListener getExceptionListener() throws JMSException + public synchronized ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); return _exceptionListener; @@ -203,7 +211,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @param exceptionListener The connection listener. * @throws JMSException If the connection is closed. */ - public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException + public synchronized void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { checkNotClosed(); _exceptionListener = exceptionListener; @@ -217,7 +225,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @throws JMSException In case of a problem due to some internal error. */ - public void start() throws JMSException + public synchronized void start() throws JMSException { checkNotClosed(); if (!_started) @@ -231,7 +239,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect } catch (Exception e) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + throw ExceptionHelper.convertQpidExceptionToJMSException(e); } } _started = true; @@ -248,7 +256,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @throws JMSException In case of a problem due to some internal error. */ - public void stop() throws JMSException + public synchronized void stop() throws JMSException { checkNotClosed(); if (_started) @@ -262,7 +270,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect } catch (Exception e) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + throw ExceptionHelper.convertQpidExceptionToJMSException(e); } } _started = false; @@ -284,7 +292,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @throws JMSException In case of a problem due to some internal error. */ - public void close() throws JMSException + public synchronized void close() throws JMSException { checkNotClosed(); if (!_isClosed) @@ -320,8 +328,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @throws JMSException In case of a problem due to some internal error. */ public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, - ServerSessionPool sessionPool, int maxMessages) throws - JMSException + ServerSessionPool sessionPool, int maxMessages) + throws JMSException { checkNotClosed(); return null; @@ -359,7 +367,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @return A queueSession object/ * @throws JMSException If creating a QueueSession fails due to some internal error. */ - public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException + public synchronized QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); QueueSessionImpl queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode); @@ -380,8 +388,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @throws JMSException In case of a problem due to some internal error. */ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, - ServerSessionPool sessionPool, int maxMessages) throws - JMSException + ServerSessionPool sessionPool, int maxMessages) + throws JMSException { return createConnectionConsumer((Destination) queue, messageSelector, sessionPool, maxMessages); } @@ -396,7 +404,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @return a newly created topic session * @throws JMSException If creating the session fails due to some internal error. */ - public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException + public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); TopicSessionImpl session = new TopicSessionImpl(this, transacted, acknowledgeMode); @@ -418,8 +426,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @throws JMSException In case of a problem due to some internal error. */ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, - ServerSessionPool sessionPool, int maxMessages) throws - JMSException + ServerSessionPool sessionPool, int maxMessages) + throws JMSException { return createConnectionConsumer((Destination) topic, messageSelector, sessionPool, maxMessages); } @@ -442,7 +450,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { _logger.debug("Connection has been closed. Cannot invoke any further operations."); } - throw new javax.jms.IllegalStateException("Connection has been closed. Cannot invoke any further operations."); + throw new javax.jms.IllegalStateException( + "Connection has been closed. Cannot invoke any further operations."); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java index 6174fa9da9..1dc35b5609 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java @@ -20,6 +20,7 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; import javax.jms.JMSException; +import javax.transaction.xa.XAException; /** * Helper class for handling exceptions @@ -47,4 +48,12 @@ public class ExceptionHelper } return jmsException; } + + static public XAException convertQpidExceptionToXAException(QpidException exception) + { + String qpidErrorCode = exception.getErrorCode(); + // todo map this error to an XA code + int xaCode = XAException.XAER_PROTO; + return new XAException(xaCode); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java index 29f867d6b8..f3b6122992 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java @@ -305,10 +305,10 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive); } // check that the message is not a foreign one - + // todo // set the properties - // + // todo // dispatch it // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 3d9b9b9d17..4723257794 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -128,10 +128,12 @@ public class SessionImpl implements Session * @param transacted Indicates if the session transacted. * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to * {@link Session#SESSION_TRANSACTED} if the transacted parameter is true. + * @param isXA Indicates whether this session is an XA session. * @throws JMSSecurityException If the user could not be authenticated. - * @throws JMSException In case of internal error. + * @throws QpidException In case of internal error. */ - protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException + protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode, boolean isXA) + throws QpidException { _connection = connection; _transacted = transacted; @@ -141,19 +143,12 @@ public class SessionImpl implements Session acknowledgeMode = Session.SESSION_TRANSACTED; } _acknowledgeMode = acknowledgeMode; - try - { - // create the qpid session with an expiry <= 0 so that the session does not expire - _qpidSession = _connection.getQpidConnection().createSession(0); - // set transacted if required - if (_transacted) - { - //_qpidSession.setTransacted(); - } - } - catch (QpidException e) + // create the qpid session with an expiry <= 0 so that the session does not expire + _qpidSession = _connection.getQpidConnection().createSession(0); + // set transacted if required + if (_transacted && !isXA) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + _qpidSession.txSelect(); } // init the message dispatcher. initMessageDispatcherThread(); @@ -314,7 +309,6 @@ public class SessionImpl implements Session // commit the underlying Qpid Session try { - // Note: this operation makes sure that asynch message processing has returned _qpidSession.txCommit(); } catch (QpidException e) @@ -341,7 +335,6 @@ public class SessionImpl implements Session // rollback the underlying Qpid Session try { - // Note: this operation makes sure that asynch message processing has returned _qpidSession.txRollback(); } catch (org.apache.qpidity.QpidException e) @@ -640,7 +633,7 @@ public class SessionImpl implements Session } catch (QpidException e) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + throw ExceptionHelper.convertQpidExceptionToJMSException(e); } return result; } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java new file mode 100644 index 0000000000..e76e566efd --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java @@ -0,0 +1,54 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; + +import javax.jms.XAConnection; +import javax.jms.JMSException; +import javax.jms.XASession; + +/** + * This class implements the javax.jms.XAConnection interface + */ +public class XAConnectionImpl extends ConnectionImpl implements XAConnection +{ + /** + * Creates an XASession. + * + * @return A newly created XASession. + * @throws JMSException If the XAConnectiono fails to create an XASession due to + * some internal error. + */ + public synchronized XASession createXASession() throws JMSException + { + checkNotClosed(); + XASessionImpl xasession; + try + { + xasession = new XASessionImpl(this); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + // add this session with the list of session that are handled by this connection + _sessions.add(xasession); + return xasession; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java new file mode 100644 index 0000000000..0ec8c4a423 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java @@ -0,0 +1,329 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpidity.Option; +import org.apache.qpidity.QpidException; + +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import javax.transaction.xa.XAException; + +/** + * This is an implementation of javax.jms.XAResource. + */ +public class XAResourceImpl implements XAResource +{ + /** + * this XAResourceImpl's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class); + + /** + * Reference to the associated XASession + */ + private XASessionImpl _xaSession = null; + + /** + * The XID of this resource + */ + private Xid _xid; + + //--- constructor + + /** + * Create an XAResource associated with a XASession + * + * @param xaSession The session XAresource + */ + protected XAResourceImpl(XASessionImpl xaSession) + { + _xaSession = xaSession; + } + + //--- The XAResource + /** + * Commits the global transaction specified by xid. + * + * @param xid A global transaction identifier + * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid. + * @throws XAException An error has occurred. Possible XAExceptions are XAER_RMERR, XAER_NOTA or XAER_PROTO. + */ + public void commit(Xid xid, boolean b) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("commit ", xid); + } + try + { + _xaSession.getQpidSession().dtxCoordinationCommit(xid, b ? Option.ONE_PHASE : Option.NO_OPTION); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToXAException(e); + } + } + + /** + * Ends the work performed on behalf of a transaction branch. + * The resource manager disassociates the XA resource from the transaction branch specified + * and lets the transaction complete. + *