summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-20 20:27:59 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-20 20:27:59 +0000
commite57c95e7bb3b66961fedec5b6021d3326c468cf5 (patch)
tree4e0ade8c90ce88f17a21c8b8244604df2d371634 /qpid/java/bdbstore
parent159147fae8cf122ede397d5c83f16a9c6c3125e7 (diff)
downloadqpid-python-e57c95e7bb3b66961fedec5b6021d3326c468cf5.tar.gz
QPID-3774 : allow out of order completion of persistent enqueues / dequeues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1234111 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java187
1 files changed, 130 insertions, 57 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 2e2d2f0b11..045fe3b1f2 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -30,6 +30,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -94,6 +95,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
+ private static final int LOCK_RETRY_ATTEMPTS = 5;
+
static final int DATABASE_FORMAT_VERSION = 5;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
@@ -893,91 +896,161 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
// _log.debug("public void removeMessage(Long messageId = " + messageId): called");
-
+ boolean complete = false;
com.sleepycat.je.Transaction tx = null;
Cursor cursor = null;
+ Random rand = null;
+ int attempts = 0;
try
{
- tx = _environment.beginTransaction(null, null);
+ do
+ {
+ tx = null;
+ cursor = null;
+ try
+ {
+ tx = _environment.beginTransaction(null, null);
- //remove the message meta data from the store
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
- if (_log.isDebugEnabled())
- {
- _log.debug("Removing message id " + messageId);
- }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Removing message id " + messageId);
+ }
- OperationStatus status = _messageMetaDataDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
- messageId);
- }
+ OperationStatus status = _messageMetaDataDb.delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
+ messageId);
+ }
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted metadata for message " + messageId);
- }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted metadata for message " + messageId);
+ }
- //now remove the content data from the store if there is any.
+ //now remove the content data from the store if there is any.
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+ TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
- //Use a partial record for the value to prevent retrieving the
- //data itself as we only need the key to identify what to remove.
- DatabaseEntry value = new DatabaseEntry();
- value.setPartial(0, 0, true);
+ //Use a partial record for the value to prevent retrieving the
+ //data itself as we only need the key to identify what to remove.
+ DatabaseEntry value = new DatabaseEntry();
+ value.setPartial(0, 0, true);
- cursor = _messageContentDb.openCursor(tx, null);
+ cursor = _messageContentDb.openCursor(tx, null);
- status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
- while (status == OperationStatus.SUCCESS)
- {
- mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
+ status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+ while (status == OperationStatus.SUCCESS)
+ {
+ mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
- if(mck.getMessageId() != messageId)
- {
- //we have exhausted all chunks for this message id, break
- break;
+ if(mck.getMessageId() != messageId)
+ {
+ //we have exhausted all chunks for this message id, break
+ break;
+ }
+ else
+ {
+ status = cursor.delete();
+
+ if(status == OperationStatus.NOTFOUND)
+ {
+ cursor.close();
+ cursor = null;
+
+ tx.abort();
+ throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
+ }
+ }
+
+ status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
+ }
+
+ cursor.close();
+
+ cursor = null;
+
+ commit(tx, sync);
+ complete = true;
}
- else
+ catch (LockConflictException e)
{
- status = cursor.delete();
+ try
+ {
+ if(cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ catch(DatabaseException e1)
+ {
+ _log.warn("Unable to close cursor after LockConflictException", e1);
+ // rethrow the original log conflict exception, the secondary exception should already have
+ // been logged.
+ throw e;
+ }
+ try
+ {
+ if(tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch(DatabaseException e2)
+ {
+ _log.warn("Unable to abort transaction after LockConflictExcption", e2);
+ // rethrow the original log conflict exception, the secondary exception should already have
+ // been logged.
+ throw e;
+ }
- if(status == OperationStatus.NOTFOUND)
+
+ _log.warn("Lock timeout exception. Retrying (attempt "
+ + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
+
+ if(++attempts < LOCK_RETRY_ATTEMPTS)
{
- cursor.close();
- cursor = null;
+ if(rand == null)
+ {
+ rand = new Random();
+ }
- tx.abort();
- throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
- }
+ try
+ {
+ Thread.sleep(500l + (long)(500l * rand.nextDouble()));
+ }
+ catch (InterruptedException e1)
+ {
- if (_log.isDebugEnabled())
+ }
+ }
+ else
{
- _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
+ // rethrow the lock conflict exception since we could not solve by retrying
+ throw e;
}
}
-
- status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
-
- cursor.close();
- cursor = null;
-
- commit(tx, sync);
+ while(!complete);
}
catch (DatabaseException e)
{
- e.printStackTrace();
+ _log.error("Unexpected BDB exception", e);
if (tx != null)
{
@@ -1009,7 +1082,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
catch (DatabaseException e)
{
- throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e);
+ throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e);
}
}
}
@@ -2073,7 +2146,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
// RHM-7 Periodically wake up and check, just in case we
// missed a notification. Don't want to lock the broker hard.
- _lock.wait(250);
+ _lock.wait(1000);
}
catch (InterruptedException e)
{