summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorWeston M. Price <wprice@apache.org>2012-05-14 19:30:20 +0000
committerWeston M. Price <wprice@apache.org>2012-05-14 19:30:20 +0000
commitfa7baeaf72635628b9d2ea2ad60ba782d6313044 (patch)
tree4a6514c18814b7380463d340797e36bfca36be85 /qpid/java/client
parent6bdb9fe2b2955a8ff5665ad2908136348c678383 (diff)
downloadqpid-python-fa7baeaf72635628b9d2ea2ad60ba782d6313044.tar.gz
QPID-3990: Multiple XAResources isSameRM behavior
*Track XAResource siblings in start/end methods *Added AMQXAResource interface *Added systemtest for new XAResource behavior *Refactored XAResourceTest to extend AbstractXATest git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338355 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java32
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java98
2 files changed, 102 insertions, 28 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java
new file mode 100644
index 0000000000..cce6b91781
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java
@@ -0,0 +1,32 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.List;
+
+import javax.transaction.xa.XAResource;
+
+public interface AMQXAResource extends XAResource
+{
+ public String getBrokerUUID();
+
+ public List<XAResource> getSiblings();
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index af9048f1f5..7611c9e8de 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -17,8 +17,13 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.apache.qpid.dtx.XidImpl;
import org.apache.qpid.transport.DtxXaStatus;
@@ -28,15 +33,13 @@ import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.RecoverResult;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.XaResult;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is an implementation of javax.njms.XAResource.
*/
-public class XAResourceImpl implements XAResource
+public class XAResourceImpl implements AMQXAResource
{
/**
* this XAResourceImpl's logger
@@ -57,9 +60,11 @@ public class XAResourceImpl implements XAResource
* The time for this resource
*/
private int _timeout;
-
+
//--- constructor
-
+
+ private List<XAResource> _siblings = new ArrayList<XAResource>();
+
/**
* Create an XAResource associated with a XASession
*
@@ -157,7 +162,20 @@ public class XAResourceImpl implements XAResource
_xaSession.createSession();
convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
+
checkStatus(result.getStatus());
+
+ for(XAResource sibling: _siblings)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Calling end for " + _siblings.size() + " XAResource siblings");
+ }
+
+ sibling.end(xid, flag);
+ }
+
+ _siblings.clear();
}
@@ -216,28 +234,38 @@ public class XAResourceImpl implements XAResource
* @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
*/
public boolean isSameRM(XAResource xaResource) throws XAException
- {
+ {
if(this == xaResource)
{
- return true;
- }
- if(!(xaResource instanceof XAResourceImpl))
+ return true;
+ }
+
+ if(!(xaResource instanceof AMQXAResource))
{
- return false;
+ return false;
}
-
- XAResourceImpl other = (XAResourceImpl)xaResource;
- String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
- String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID();
-
+ String myUUID = getBrokerUUID();
+ String otherUUID = ((AMQXAResource)xaResource).getBrokerUUID();
+
if(_logger.isDebugEnabled())
{
_logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID);
}
-
- return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
-
+
+ boolean isSameRm = (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
+
+ if(isSameRm)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("XAResource " + xaResource + " is from the ResourceManager. Adding XAResource as sibling for AMQP protocol support. ");
+ }
+ _siblings.add(xaResource);
+ }
+
+ return isSameRm;
+
}
/**
@@ -369,12 +397,12 @@ public class XAResourceImpl implements XAResource
{
_timeout = timeout;
if (timeout != _timeout && _xid != null)
- {
+ {
setDtxTimeout(_timeout);
}
return true;
}
-
+
private void setDtxTimeout(int timeout) throws XAException
{
_xaSession.getQpidSession()
@@ -437,18 +465,23 @@ public class XAResourceImpl implements XAResource
{
setDtxTimeout(_timeout);
}
+
+ for(XAResource sibling: _siblings)
+ {
+ sibling.start(xid, flag);
+ }
}
/**
* Is this resource currently enlisted in a transaction?
- *
+ *
* @return true if the resource is associated with a transaction, false otherwise.
*/
public boolean isEnlisted()
{
return (_xid != null) ;
}
-
+
//------------------------------------------------------------------------
// Private methods
//------------------------------------------------------------------------
@@ -517,7 +550,7 @@ public class XAResourceImpl implements XAResource
}
catch (XAException e)
{
- e.printStackTrace();
+ _logger.error(e.getMessage(), e);
throw e;
}
case ILLEGAL_STATE:
@@ -544,7 +577,7 @@ public class XAResourceImpl implements XAResource
* convert a generic xid into qpid format
* @param xid xid to be converted
* @return the qpid formated xid
- * @throws XAException when xid is null
+ * @throws XAException when xid is null
*/
private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException
{
@@ -556,4 +589,13 @@ public class XAResourceImpl implements XAResource
return XidImpl.convert(xid);
}
+ public String getBrokerUUID()
+ {
+ return ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
+ }
+
+ public List<XAResource> getSiblings()
+ {
+ return Collections.unmodifiableList(_siblings);
+ }
}