summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-21 19:58:14 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-21 19:58:14 +0000
commit8368bc980e2989299f032749d62f7ae20ceec4da (patch)
tree23c329c34d600dbe7714e195e080a32e456a991c
parent8bce3305776bbfbe5b80fdf1a6120c30a89b1552 (diff)
downloadqpid-python-8368bc980e2989299f032749d62f7ae20ceec4da.tar.gz
QPID-3774 : Work around Java BDB issue with cursors and flushLog
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1234410 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java88
1 files changed, 25 insertions, 63 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 045fe3b1f2..8884a99923 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.bind.tuple.StringBinding;
import com.sleepycat.je.*;
@@ -899,7 +900,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
boolean complete = false;
com.sleepycat.je.Transaction tx = null;
- Cursor cursor = null;
Random rand = null;
int attempts = 0;
try
@@ -907,7 +907,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
do
{
tx = null;
- cursor = null;
try
{
tx = _environment.beginTransaction(null, null);
@@ -936,76 +935,43 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
//now remove the content data from the store if there is any.
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
-
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
- //Use a partial record for the value to prevent retrieving the
- //data itself as we only need the key to identify what to remove.
- DatabaseEntry value = new DatabaseEntry();
- value.setPartial(0, 0, true);
- cursor = _messageContentDb.openCursor(tx, null);
-
- status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
- while (status == OperationStatus.SUCCESS)
+ int offset = 0;
+ do
{
- mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
-
- if(mck.getMessageId() != messageId)
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ MessageContentKey_5 mck = new MessageContentKey_5(messageId,offset);
+ TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+ //Use a partial record for the value to prevent retrieving the
+ //data itself as we only need the key to identify what to remove.
+ DatabaseEntry value = new DatabaseEntry();
+ value.setPartial(0, 4, true);
+
+ status = _messageContentDb.get(null,contentKeyEntry, value, LockMode.READ_COMMITTED);
+
+ if(status == OperationStatus.SUCCESS)
{
- //we have exhausted all chunks for this message id, break
- break;
- }
- else
- {
- status = cursor.delete();
-
- if(status == OperationStatus.NOTFOUND)
- {
- cursor.close();
- cursor = null;
-
- tx.abort();
- throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
- }
+ offset += IntegerBinding.entryToInt(value);
+ _messageContentDb.delete(tx, contentKeyEntry);
if (_log.isDebugEnabled())
{
_log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
}
}
-
- status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
-
- cursor.close();
-
- cursor = null;
+ while (status == OperationStatus.SUCCESS);
commit(tx, sync);
complete = true;
+ tx = null;
}
catch (LockConflictException e)
{
try
{
- if(cursor != null)
- {
- cursor.close();
- }
- }
- catch(DatabaseException e1)
- {
- _log.warn("Unable to close cursor after LockConflictException", e1);
- // rethrow the original log conflict exception, the secondary exception should already have
- // been logged.
- throw e;
- }
- try
- {
if(tx != null)
{
tx.abort();
@@ -1056,13 +1022,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
try
{
- if(cursor != null)
- {
- cursor.close();
- cursor = null;
- }
-
tx.abort();
+ tx = null;
}
catch (DatabaseException e1)
{
@@ -1074,15 +1035,16 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
finally
{
- if(cursor != null)
+ if (tx != null)
{
try
{
- cursor.close();
+ tx.abort();
+ tx = null;
}
- catch (DatabaseException e)
+ catch (DatabaseException e1)
{
- throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e);
+ throw new AMQStoreException("Error aborting transaction " + e1, e1);
}
}
}