diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-07-20 16:42:29 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-07-20 16:42:29 +0000 |
| commit | f02d9fa05619959e3d92686f7daf3a5eb997862f (patch) | |
| tree | d71b762a497d3980dde7a4dc18dbc95e7a30996b /qpid/java/bdbstore/src | |
| parent | d9653e77fa93275cb9782581796d0a3bcf39b569 (diff) | |
| download | qpid-python-f02d9fa05619959e3d92686f7daf3a5eb997862f.tar.gz | |
QPID-5907 : [Java Broker] Add ability for broker to startup while persistent queues are still being recovered
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1612118 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
| -rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java | 101 |
1 files changed, 97 insertions, 4 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 4072c483b2..cf187fe1e9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -18,10 +18,9 @@ */ package org.apache.qpid.server.store.berkeleydb; -import static org.apache.qpid.server.store.berkeleydb.BDBUtils.*; +import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG; import static org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely; import static org.apache.qpid.server.store.berkeleydb.BDBUtils.closeCursorSafely; -import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; @@ -122,7 +121,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) { - long newMessageId = getNextMessageSequenceNumber(); + long newMessageId = getNextMessageId(); if (metaData.isPersistent()) { @@ -134,7 +133,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - private long getNextMessageSequenceNumber() + public long getNextMessageId() { long newMessageId; try @@ -182,6 +181,73 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override + public StoredMessage<?> getMessage(final long messageId) + { + checkMessageStoreOpen(); + return getMessageInternal(messageId, getEnvironmentFacade()); + } + + @Override + public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key); + + if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + if(entry.getQueueId().equals(queue.getId())) + { + entries.add(entry); + } + while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + entry = keyBinding.entryToObject(key); + if(entry.getQueueId().equals(queue.getId())) + { + entries.add(entry); + } + else + { + break; + } + } + } + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(queueId, messageId)) + { + break; + } + } + + } + + + + @Override public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -542,6 +608,33 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } + + private StoredBDBMessage<?> getMessageInternal(long messageId, EnvironmentFacade environmentFacade) + { + try + { + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); + LongBinding.longToEntry(messageId, key); + if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS) + { + StorableMessageMetaData metaData = valueBinding.entryToObject(value); + StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); + return message; + } + else + { + return null; + } + + } + catch (DatabaseException e) + { + throw environmentFacade.handleDatabaseException("Cannot visit messages", e); + } + } + /** * Stores a chunk of message data. * |
