diff options
| author | Weston M. Price <wprice@apache.org> | 2012-05-14 19:30:20 +0000 |
|---|---|---|
| committer | Weston M. Price <wprice@apache.org> | 2012-05-14 19:30:20 +0000 |
| commit | fa7baeaf72635628b9d2ea2ad60ba782d6313044 (patch) | |
| tree | 4a6514c18814b7380463d340797e36bfca36be85 /qpid/java/client | |
| parent | 6bdb9fe2b2955a8ff5665ad2908136348c678383 (diff) | |
| download | qpid-python-fa7baeaf72635628b9d2ea2ad60ba782d6313044.tar.gz | |
QPID-3990: Multiple XAResources isSameRM behavior
*Track XAResource siblings in start/end methods
*Added AMQXAResource interface
*Added systemtest for new XAResource behavior
*Refactored XAResourceTest to extend AbstractXATest
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338355 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java | 32 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java | 98 |
2 files changed, 102 insertions, 28 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java new file mode 100644 index 0000000000..cce6b91781 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java @@ -0,0 +1,32 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.util.List; + +import javax.transaction.xa.XAResource; + +public interface AMQXAResource extends XAResource +{ + public String getBrokerUUID(); + + public List<XAResource> getSiblings(); +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index af9048f1f5..7611c9e8de 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -17,8 +17,13 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import org.apache.qpid.dtx.XidImpl; import org.apache.qpid.transport.DtxXaStatus; @@ -28,15 +33,13 @@ import org.apache.qpid.transport.Option; import org.apache.qpid.transport.RecoverResult; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.XaResult; - -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is an implementation of javax.njms.XAResource. */ -public class XAResourceImpl implements XAResource +public class XAResourceImpl implements AMQXAResource { /** * this XAResourceImpl's logger @@ -57,9 +60,11 @@ public class XAResourceImpl implements XAResource * The time for this resource */ private int _timeout; - + //--- constructor - + + private List<XAResource> _siblings = new ArrayList<XAResource>(); + /** * Create an XAResource associated with a XASession * @@ -157,7 +162,20 @@ public class XAResourceImpl implements XAResource _xaSession.createSession(); convertExecutionErrorToXAErr(e.getException().getErrorCode()); } + checkStatus(result.getStatus()); + + for(XAResource sibling: _siblings) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Calling end for " + _siblings.size() + " XAResource siblings"); + } + + sibling.end(xid, flag); + } + + _siblings.clear(); } @@ -216,28 +234,38 @@ public class XAResourceImpl implements XAResource * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. */ public boolean isSameRM(XAResource xaResource) throws XAException - { + { if(this == xaResource) { - return true; - } - if(!(xaResource instanceof XAResourceImpl)) + return true; + } + + if(!(xaResource instanceof AMQXAResource)) { - return false; + return false; } - - XAResourceImpl other = (XAResourceImpl)xaResource; - String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID(); - String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID(); - + String myUUID = getBrokerUUID(); + String otherUUID = ((AMQXAResource)xaResource).getBrokerUUID(); + if(_logger.isDebugEnabled()) { _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID); } - - return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID)); - + + boolean isSameRm = (myUUID != null && otherUUID != null && myUUID.equals(otherUUID)); + + if(isSameRm) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("XAResource " + xaResource + " is from the ResourceManager. Adding XAResource as sibling for AMQP protocol support. "); + } + _siblings.add(xaResource); + } + + return isSameRm; + } /** @@ -369,12 +397,12 @@ public class XAResourceImpl implements XAResource { _timeout = timeout; if (timeout != _timeout && _xid != null) - { + { setDtxTimeout(_timeout); } return true; } - + private void setDtxTimeout(int timeout) throws XAException { _xaSession.getQpidSession() @@ -437,18 +465,23 @@ public class XAResourceImpl implements XAResource { setDtxTimeout(_timeout); } + + for(XAResource sibling: _siblings) + { + sibling.start(xid, flag); + } } /** * Is this resource currently enlisted in a transaction? - * + * * @return true if the resource is associated with a transaction, false otherwise. */ public boolean isEnlisted() { return (_xid != null) ; } - + //------------------------------------------------------------------------ // Private methods //------------------------------------------------------------------------ @@ -517,7 +550,7 @@ public class XAResourceImpl implements XAResource } catch (XAException e) { - e.printStackTrace(); + _logger.error(e.getMessage(), e); throw e; } case ILLEGAL_STATE: @@ -544,7 +577,7 @@ public class XAResourceImpl implements XAResource * convert a generic xid into qpid format * @param xid xid to be converted * @return the qpid formated xid - * @throws XAException when xid is null + * @throws XAException when xid is null */ private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException { @@ -556,4 +589,13 @@ public class XAResourceImpl implements XAResource return XidImpl.convert(xid); } + public String getBrokerUUID() + { + return ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID(); + } + + public List<XAResource> getSiblings() + { + return Collections.unmodifiableList(_siblings); + } } |
