summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-06-27 15:39:00 +0000
committerRobert Gemmell <robbie@apache.org>2012-06-27 15:39:00 +0000
commit276f296da8025ee25e7cdbc6d14e5cc5f9371497 (patch)
treee347044b8ab644d655e70186302b4b34497d8045
parent6b1bf756f95d1355f2d97babb8921e572ad4018f (diff)
downloadqpid-python-276f296da8025ee25e7cdbc6d14e5cc5f9371497.tar.gz
merge changes from trunk up to r1349191
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-config-and-management@1354583 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java4
-rw-r--r--qpid/java/build.deps2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java15
-rw-r--r--qpid/java/ivy.retrieve.xml2
-rw-r--r--qpid/java/lib/poms/je-5.0.55.xml (renamed from qpid/java/lib/poms/je-5.0.48.xml)2
6 files changed, 31 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 1e3bb3c50b..53420ded9b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -24,6 +24,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
@@ -32,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class ConflationQueueList extends SimpleQueueEntryList
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
private final String _conflationKey;
private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
@@ -68,6 +71,11 @@ public class ConflationQueueList extends SimpleQueueEntryList
final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
if (keyValue != null)
{
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
+ }
+
final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry);
AtomicReference<QueueEntry> entryReferenceFromMap = null;
QueueEntry entryFromMap;
@@ -100,12 +108,18 @@ public class ConflationQueueList extends SimpleQueueEntryList
}
else if (entryFromMap.compareTo(addedEntry) > 0)
{
- // A newer entry came along
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("New entry " + addedEntry.getEntryId() + " for message " + addedEntry.getMessage().getMessageNumber() + " being immediately discarded because a newer entry arrived. The newer entry is: " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
+ }
discardEntry(addedEntry);
}
else if (entryFromMap.compareTo(addedEntry) < 0)
{
- // We replaced some other entry to become the newest value
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Entry " + addedEntry + " for message " + addedEntry.getMessage().getMessageNumber() + " replacing older entry " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
+ }
discardEntry(entryFromMap);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java
index 5c54c1164f..e3bc076d72 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java
@@ -116,7 +116,7 @@ public class DtxRegistry
return (_branches.remove(new ComparableXid(branch.getXid())) != null);
}
- public void commit(Xid id, boolean onePhase)
+ public synchronized void commit(Xid id, boolean onePhase)
throws IncorrectDtxStateException, UnknownDtxBranchException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
{
DtxBranch branch = getBranch(id);
@@ -204,7 +204,7 @@ public class DtxRegistry
}
}
- public void rollback(Xid id)
+ public synchronized void rollback(Xid id)
throws IncorrectDtxStateException,
UnknownDtxBranchException,
AMQStoreException, TimeoutDtxException
diff --git a/qpid/java/build.deps b/qpid/java/build.deps
index 81b80a8378..45e1f80ef8 100644
--- a/qpid/java/build.deps
+++ b/qpid/java/build.deps
@@ -164,7 +164,7 @@ jca.libs=${geronimo-j2ee} ${geronimo-jta} ${geronimo-jms} ${test.libs} ${geronim
jca.test.libs=${test.libs}
# optional bdbstore module deps
-bdb-je=lib/bdbstore/je-5.0.48.jar
+bdb-je=lib/bdbstore/je-5.0.55.jar
bdbstore.libs=${bdb-je}
bdbstore.test.libs=${test.libs}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index f2efb6e8a5..05bd121bbd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.transport.RangeSet;
@@ -31,7 +32,7 @@ import javax.jms.XATopicSession;
import javax.transaction.xa.XAResource;
/**
- * This is an implementation of the javax.njms.XASEssion interface.
+ * This is an implementation of the javax.jms.XASession interface.
*/
public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopicSession, XAQueueSession
{
@@ -67,7 +68,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
{
this(qpidConnection, con, channelId, false, ackMode, MessageFactoryRegistry.newDefaultRegistry(),
defaultPrefetchHigh, defaultPrefetchLow, null);
-
+
}
public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
@@ -92,9 +93,6 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
_qpidDtxSession.dtxSelect();
}
-
- // javax.njms.XASEssion API
-
/**
* Gets the session associated with this XASession.
*
@@ -192,4 +190,11 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
super.acknowledgeImpl() ;
}
}
+
+ @Override
+ void resubscribe() throws AMQException
+ {
+ super.resubscribe();
+ createSession();
+ }
}
diff --git a/qpid/java/ivy.retrieve.xml b/qpid/java/ivy.retrieve.xml
index fa701b270a..161c134c38 100644
--- a/qpid/java/ivy.retrieve.xml
+++ b/qpid/java/ivy.retrieve.xml
@@ -75,7 +75,7 @@
<!-- The following are optional dependencies, for modules providing optional functionlity or
for use in optional build/test steps. Their optional status is usually indicative of licences
which are not compatible with the Apache Licence -->
- <dependency org="com.sleepycat" name="je" rev="5.0.48" transitive="false" conf="bdbje"/>
+ <dependency org="com.sleepycat" name="je" rev="5.0.55" transitive="false" conf="bdbje"/>
<dependency org="jfree" name="jfreechart" rev="1.0.13" transitive="false" conf="jfree"/>
<dependency org="jfree" name="jcommon" rev="1.0.16" transitive="false" conf="jfree"/>
<dependency org="net.sourceforge.csvjdbc" name="csvjdbc" rev="1.0.8" transitive="false" conf="csvjdbc"/>
diff --git a/qpid/java/lib/poms/je-5.0.48.xml b/qpid/java/lib/poms/je-5.0.55.xml
index 66a9847a3a..0c40ea541a 100644
--- a/qpid/java/lib/poms/je-5.0.48.xml
+++ b/qpid/java/lib/poms/je-5.0.55.xml
@@ -18,5 +18,5 @@
<dep>
<groupId>com.sleepycat</groupId>
<artifactId>je</artifactId>
- <version>5.0.48</version>
+ <version>5.0.55</version>
</dep>