diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-21 19:58:14 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-21 19:58:14 +0000 |
| commit | 8368bc980e2989299f032749d62f7ae20ceec4da (patch) | |
| tree | 23c329c34d600dbe7714e195e080a32e456a991c | |
| parent | 8bce3305776bbfbe5b80fdf1a6120c30a89b1552 (diff) | |
| download | qpid-python-8368bc980e2989299f032749d62f7ae20ceec4da.tar.gz | |
QPID-3774 : Work around Java BDB issue with cursors and flushLog
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1234410 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java | 88 |
1 files changed, 25 insertions, 63 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 045fe3b1f2..8884a99923 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.bind.tuple.StringBinding; import com.sleepycat.je.*; @@ -899,7 +900,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore boolean complete = false; com.sleepycat.je.Transaction tx = null; - Cursor cursor = null; Random rand = null; int attempts = 0; try @@ -907,7 +907,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore do { tx = null; - cursor = null; try { tx = _environment.beginTransaction(null, null); @@ -936,76 +935,43 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore //now remove the content data from the store if there is any. - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - MessageContentKey_5 mck = new MessageContentKey_5(messageId,0); - - TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); - contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); - //Use a partial record for the value to prevent retrieving the - //data itself as we only need the key to identify what to remove. - DatabaseEntry value = new DatabaseEntry(); - value.setPartial(0, 0, true); - cursor = _messageContentDb.openCursor(tx, null); - - status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW); - while (status == OperationStatus.SUCCESS) + int offset = 0; + do { - mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); - - if(mck.getMessageId() != messageId) + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + MessageContentKey_5 mck = new MessageContentKey_5(messageId,offset); + TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); + contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); + //Use a partial record for the value to prevent retrieving the + //data itself as we only need the key to identify what to remove. + DatabaseEntry value = new DatabaseEntry(); + value.setPartial(0, 4, true); + + status = _messageContentDb.get(null,contentKeyEntry, value, LockMode.READ_COMMITTED); + + if(status == OperationStatus.SUCCESS) { - //we have exhausted all chunks for this message id, break - break; - } - else - { - status = cursor.delete(); - - if(status == OperationStatus.NOTFOUND) - { - cursor.close(); - cursor = null; - - tx.abort(); - throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId); - } + offset += IntegerBinding.entryToInt(value); + _messageContentDb.delete(tx, contentKeyEntry); if (_log.isDebugEnabled()) { _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); } } - - status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); } - - cursor.close(); - - cursor = null; + while (status == OperationStatus.SUCCESS); commit(tx, sync); complete = true; + tx = null; } catch (LockConflictException e) { try { - if(cursor != null) - { - cursor.close(); - } - } - catch(DatabaseException e1) - { - _log.warn("Unable to close cursor after LockConflictException", e1); - // rethrow the original log conflict exception, the secondary exception should already have - // been logged. - throw e; - } - try - { if(tx != null) { tx.abort(); @@ -1056,13 +1022,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { try { - if(cursor != null) - { - cursor.close(); - cursor = null; - } - tx.abort(); + tx = null; } catch (DatabaseException e1) { @@ -1074,15 +1035,16 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } finally { - if(cursor != null) + if (tx != null) { try { - cursor.close(); + tx.abort(); + tx = null; } - catch (DatabaseException e) + catch (DatabaseException e1) { - throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e); + throw new AMQStoreException("Error aborting transaction " + e1, e1); } } } |
