summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-20 16:42:29 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-20 16:42:29 +0000
commitf02d9fa05619959e3d92686f7daf3a5eb997862f (patch)
treed71b762a497d3980dde7a4dc18dbc95e7a30996b /qpid/java/bdbstore/src
parentd9653e77fa93275cb9782581796d0a3bcf39b569 (diff)
downloadqpid-python-f02d9fa05619959e3d92686f7daf3a5eb997862f.tar.gz
QPID-5907 : [Java Broker] Add ability for broker to startup while persistent queues are still being recovered
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1612118 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java101
1 files changed, 97 insertions, 4 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 4072c483b2..cf187fe1e9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -18,10 +18,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import static org.apache.qpid.server.store.berkeleydb.BDBUtils.*;
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG;
import static org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely;
import static org.apache.qpid.server.store.berkeleydb.BDBUtils.closeCursorSafely;
-import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
@@ -122,7 +121,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
- long newMessageId = getNextMessageSequenceNumber();
+ long newMessageId = getNextMessageId();
if (metaData.isPersistent())
{
@@ -134,7 +133,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- private long getNextMessageSequenceNumber()
+ public long getNextMessageId()
{
long newMessageId;
try
@@ -182,6 +181,73 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
+ public StoredMessage<?> getMessage(final long messageId)
+ {
+ checkMessageStoreOpen();
+ return getMessageInternal(messageId, getEnvironmentFacade());
+ }
+
+ @Override
+ public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key);
+
+ if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ if(entry.getQueueId().equals(queue.getId()))
+ {
+ entries.add(entry);
+ }
+ while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ entry = keyBinding.entryToObject(key);
+ if(entry.getQueueId().equals(queue.getId()))
+ {
+ entries.add(entry);
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor, getEnvironmentFacade());
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ if (!handler.handle(queueId, messageId))
+ {
+ break;
+ }
+ }
+
+ }
+
+
+
+ @Override
public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -542,6 +608,33 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
+
+ private StoredBDBMessage<?> getMessageInternal(long messageId, EnvironmentFacade environmentFacade)
+ {
+ try
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+ LongBinding.longToEntry(messageId, key);
+ if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS)
+ {
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+ return message;
+ }
+ else
+ {
+ return null;
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ throw environmentFacade.handleDatabaseException("Cannot visit messages", e);
+ }
+ }
+
/**
* Stores a chunk of message data.
*