diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-20 20:27:59 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-20 20:27:59 +0000 |
| commit | e57c95e7bb3b66961fedec5b6021d3326c468cf5 (patch) | |
| tree | 4e0ade8c90ce88f17a21c8b8244604df2d371634 /qpid/java/bdbstore | |
| parent | 159147fae8cf122ede397d5c83f16a9c6c3125e7 (diff) | |
| download | qpid-python-e57c95e7bb3b66961fedec5b6021d3326c468cf5.tar.gz | |
QPID-3774 : allow out of order completion of persistent enqueues / dequeues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1234111 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
| -rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java | 187 |
1 files changed, 130 insertions, 57 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 2e2d2f0b11..045fe3b1f2 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -94,6 +95,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _log = Logger.getLogger(BDBMessageStore.class); + private static final int LOCK_RETRY_ATTEMPTS = 5; + static final int DATABASE_FORMAT_VERSION = 5; private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version"; public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; @@ -893,91 +896,161 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { // _log.debug("public void removeMessage(Long messageId = " + messageId): called"); - + boolean complete = false; com.sleepycat.je.Transaction tx = null; Cursor cursor = null; + Random rand = null; + int attempts = 0; try { - tx = _environment.beginTransaction(null, null); + do + { + tx = null; + cursor = null; + try + { + tx = _environment.beginTransaction(null, null); - //remove the message meta data from the store - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); + //remove the message meta data from the store + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); - if (_log.isDebugEnabled()) - { - _log.debug("Removing message id " + messageId); - } + if (_log.isDebugEnabled()) + { + _log.debug("Removing message id " + messageId); + } - OperationStatus status = _messageMetaDataDb.delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " + - messageId); - } + OperationStatus status = _messageMetaDataDb.delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " + + messageId); + } - if (_log.isDebugEnabled()) - { - _log.debug("Deleted metadata for message " + messageId); - } + if (_log.isDebugEnabled()) + { + _log.debug("Deleted metadata for message " + messageId); + } - //now remove the content data from the store if there is any. + //now remove the content data from the store if there is any. - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - MessageContentKey_5 mck = new MessageContentKey_5(messageId,0); + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + MessageContentKey_5 mck = new MessageContentKey_5(messageId,0); - TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); - contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); + TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); + contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); - //Use a partial record for the value to prevent retrieving the - //data itself as we only need the key to identify what to remove. - DatabaseEntry value = new DatabaseEntry(); - value.setPartial(0, 0, true); + //Use a partial record for the value to prevent retrieving the + //data itself as we only need the key to identify what to remove. + DatabaseEntry value = new DatabaseEntry(); + value.setPartial(0, 0, true); - cursor = _messageContentDb.openCursor(tx, null); + cursor = _messageContentDb.openCursor(tx, null); - status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW); - while (status == OperationStatus.SUCCESS) - { - mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); + status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW); + while (status == OperationStatus.SUCCESS) + { + mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); - if(mck.getMessageId() != messageId) - { - //we have exhausted all chunks for this message id, break - break; + if(mck.getMessageId() != messageId) + { + //we have exhausted all chunks for this message id, break + break; + } + else + { + status = cursor.delete(); + + if(status == OperationStatus.NOTFOUND) + { + cursor.close(); + cursor = null; + + tx.abort(); + throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId); + } + + if (_log.isDebugEnabled()) + { + _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); + } + } + + status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); + } + + cursor.close(); + + cursor = null; + + commit(tx, sync); + complete = true; } - else + catch (LockConflictException e) { - status = cursor.delete(); + try + { + if(cursor != null) + { + cursor.close(); + } + } + catch(DatabaseException e1) + { + _log.warn("Unable to close cursor after LockConflictException", e1); + // rethrow the original log conflict exception, the secondary exception should already have + // been logged. + throw e; + } + try + { + if(tx != null) + { + tx.abort(); + } + } + catch(DatabaseException e2) + { + _log.warn("Unable to abort transaction after LockConflictExcption", e2); + // rethrow the original log conflict exception, the secondary exception should already have + // been logged. + throw e; + } - if(status == OperationStatus.NOTFOUND) + + _log.warn("Lock timeout exception. Retrying (attempt " + + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e); + + if(++attempts < LOCK_RETRY_ATTEMPTS) { - cursor.close(); - cursor = null; + if(rand == null) + { + rand = new Random(); + } - tx.abort(); - throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId); - } + try + { + Thread.sleep(500l + (long)(500l * rand.nextDouble())); + } + catch (InterruptedException e1) + { - if (_log.isDebugEnabled()) + } + } + else { - _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); + // rethrow the lock conflict exception since we could not solve by retrying + throw e; } } - - status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); } - - cursor.close(); - cursor = null; - - commit(tx, sync); + while(!complete); } catch (DatabaseException e) { - e.printStackTrace(); + _log.error("Unexpected BDB exception", e); if (tx != null) { @@ -1009,7 +1082,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } catch (DatabaseException e) { - throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e); + throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e); } } } @@ -2073,7 +2146,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { // RHM-7 Periodically wake up and check, just in case we // missed a notification. Don't want to lock the broker hard. - _lock.wait(250); + _lock.wait(1000); } catch (InterruptedException e) { |
