From 8bcfb7bb278644a547bddf4719265d806ea69d72 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 3 Apr 2015 22:21:05 +0000 Subject: QPID-6476 : [Java Broker] Refactor MessageStore to allow more efficient implementations git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1671184 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/AbstractBDBMessageStore.java | 453 +++++++++++++-------- .../berkeleydb/entry/PreparedTransaction.java | 10 +- .../tuple/PreparedTransactionBinding.java | 67 ++- .../store/berkeleydb/BDBMessageStoreTest.java | 27 +- 4 files changed, 359 insertions(+), 198 deletions(-) (limited to 'qpid/java/bdbstore/src') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index c668cc8595..b030b6c091 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -26,6 +26,7 @@ import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.UUID; @@ -48,6 +49,8 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.EventManager; +import org.apache.qpid.server.store.MessageEnqueueRecord; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; @@ -114,7 +117,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public StoredMessage addMessage(T metaData) + public MessageHandle addMessage(T metaData) { long newMessageId = getNextMessageId(); @@ -163,154 +166,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void visitMessages(final MessageHandler handler) throws StoreException + public MessageStoreReader newMessageStoreReader() { - checkMessageStoreOpen(); - visitMessagesInternal(handler, getEnvironmentFacade()); - } - - @Override - public StoredMessage getMessage(final long messageId) - { - checkMessageStoreOpen(); - return getMessageInternal(messageId, getEnvironmentFacade()); - } - - @Override - public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - List entries = new ArrayList(); - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key); - - if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - QueueEntryKey entry = keyBinding.entryToObject(key); - if(entry.getQueueId().equals(queue.getId())) - { - entries.add(entry); - } - while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - entry = keyBinding.entryToObject(key); - if(entry.getQueueId().equals(queue.getId())) - { - entries.add(entry); - } - else - { - break; - } - } - } - } - catch (RuntimeException e) - { - throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); - } - finally - { - closeCursorSafely(cursor, getEnvironmentFacade()); - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - if (!handler.handle(queueId, messageId)) - { - break; - } - } - - } - - - - @Override - public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - List entries = new ArrayList(); - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - QueueEntryKey entry = keyBinding.entryToObject(key); - entries.add(entry); - } - } - catch (RuntimeException e) - { - throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); - } - finally - { - closeCursorSafely(cursor, getEnvironmentFacade()); - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - if (!handler.handle(queueId, messageId)) - { - break; - } - } - - } - - @Override - public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - try - { - cursor = getXidDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - XidBinding keyBinding = XidBinding.getInstance(); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - Xid xid = keyBinding.entryToObject(key); - PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); - if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), - preparedTransaction.getEnqueues(), preparedTransaction.getDequeues())) - { - break; - } - } - - } - catch (RuntimeException e) - { - throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e); - } - finally - { - closeCursorSafely(cursor, getEnvironmentFacade()); - } + return new BDBMessageStoreReader(); } /** @@ -793,24 +651,23 @@ public abstract class AbstractBDBMessageStore implements MessageStore * Extracts a message from a specified queue, in a given transaction. * * @param tx The transaction for the operation. - * @param queue The queue to take the message from. + * @param queueId The id of the queue to take the message from. * @param messageId The message to dequeue. * * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. */ - private void dequeueMessage(final Transaction tx, final TransactionLogResource queue, + private void dequeueMessage(final Transaction tx, final UUID queueId, long messageId) throws StoreException { DatabaseEntry key = new DatabaseEntry(); QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); - UUID id = queue.getId(); + QueueEntryKey queueEntryKey = new QueueEntryKey(queueId, messageId); + UUID id = queueId; keyBinding.objectToEntry(queueEntryKey, key); if (getLogger().isDebugEnabled()) { - getLogger().debug("Dequeue message id " + messageId + " from queue " - + queue.getName() + " with id " + id); + getLogger().debug("Dequeue message id " + messageId + " from queue with id " + id); } try @@ -819,19 +676,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore OperationStatus status = getDeliveryDb().delete(tx, key); if (status == OperationStatus.NOTFOUND) { - throw new StoreException("Unable to find message with id " + messageId + " on queue " - + queue.getName() + " with id " + id); + throw new StoreException("Unable to find message with id " + messageId + " on queue with id " + id); } else if (status != OperationStatus.SUCCESS) { - throw new StoreException("Unable to remove message with id " + messageId + " on queue" - + queue.getName() + " with id " + id); + throw new StoreException("Unable to remove message with id " + messageId + " on queue with id " + id); } if (getLogger().isDebugEnabled()) { - getLogger().debug("Removed message " + messageId + " on queue " - + queue.getName() + " with id " + id); + getLogger().debug("Removed message " + messageId + " on queue with id " + id); } } @@ -849,8 +703,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore long format, byte[] globalId, byte[] branchId, - org.apache.qpid.server.store.Transaction.Record[] enqueues, - org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException + org.apache.qpid.server.store.Transaction.EnqueueRecord[] enqueues, + org.apache.qpid.server.store.Transaction.DequeueRecord[] dequeues) throws StoreException { DatabaseEntry key = new DatabaseEntry(); Xid xid = new Xid(format, globalId, branchId); @@ -862,7 +716,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); valueBinding.objectToEntry(preparedTransaction, value); List postActions = new ArrayList<>(); - for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues) + for(org.apache.qpid.server.store.Transaction.EnqueueRecord enqueue : enqueues) { StoredMessage storedMessage = enqueue.getMessage().getStoredMessage(); if(storedMessage instanceof StoredBDBMessage) @@ -1187,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - final class StoredBDBMessage implements StoredMessage + final class StoredBDBMessage implements StoredMessage, MessageHandle { private final long _messageId; @@ -1234,7 +1088,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void addContent(int offsetInMessage, ByteBuffer src) + public void addContent(ByteBuffer src) { src = src.slice(); byte[] data = _messageDataRef.getData(); @@ -1258,6 +1112,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore } + @Override + public StoredMessage allContentAdded() + { + return this; + } + @Override public int getContent(int offsetInMessage, ByteBuffer dst) { @@ -1466,7 +1326,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException { checkMessageStoreOpen(); @@ -1487,14 +1347,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); + return new BDBEnqueueRecord(queue.getId(), message.getMessageNumber()); } @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + public void dequeueMessage(final MessageEnqueueRecord enqueueRecord) { checkMessageStoreOpen(); - AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); + AbstractBDBMessageStore.this.dequeueMessage(_txn, enqueueRecord.getQueueId(), + enqueueRecord.getMessageNumber()); } @Override @@ -1546,21 +1408,268 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException + public void removeXid(final StoredXidRecord record) { checkMessageStoreOpen(); - AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId); + AbstractBDBMessageStore.this.removeXid(_txn, record.getFormat(), record.getGlobalId(), record.getBranchId()); } @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, - Record[] dequeues) throws StoreException + public StoredXidRecord recordXid(final long format, final byte[] globalId, final byte[] branchId, final EnqueueRecord[] enqueues, + final DequeueRecord[] dequeues) throws StoreException { checkMessageStoreOpen(); _postCommitActions.addAll(AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues)); + return new BDBStoredXidRecord(format, globalId, branchId); } + } + private static class BDBStoredXidRecord implements org.apache.qpid.server.store.Transaction.StoredXidRecord + { + private final long _format; + private final byte[] _globalId; + private final byte[] _branchId; + + public BDBStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId) + { + _format = format; + _globalId = globalId; + _branchId = branchId; + } + + @Override + public long getFormat() + { + return _format; + } + + @Override + public byte[] getGlobalId() + { + return _globalId; + } + + @Override + public byte[] getBranchId() + { + return _branchId; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final BDBStoredXidRecord that = (BDBStoredXidRecord) o; + + return _format == that._format + && Arrays.equals(_globalId, that._globalId) + && Arrays.equals(_branchId, that._branchId); + + } + + @Override + public int hashCode() + { + int result = (int) (_format ^ (_format >>> 32)); + result = 31 * result + Arrays.hashCode(_globalId); + result = 31 * result + Arrays.hashCode(_branchId); + return result; + } + } + public static class BDBEnqueueRecord implements MessageEnqueueRecord + { + private final UUID _queueId; + + private final long _messageNumber; + + public BDBEnqueueRecord(final UUID queueid, final long messageNumber) + { + _queueId = queueid; + _messageNumber = messageNumber; + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public UUID getQueueId() + { + return _queueId; + } + + } + + private class BDBMessageStoreReader implements MessageStoreReader + { + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + checkMessageStoreOpen(); + visitMessagesInternal(handler, getEnvironmentFacade()); + } + + @Override + public StoredMessage getMessage(final long messageId) + { + checkMessageStoreOpen(); + return getMessageInternal(messageId, getEnvironmentFacade()); + } + + @Override + public void close() + { + + } + + @Override + public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + List entries = new ArrayList(); + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key); + + if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + if(entry.getQueueId().equals(queue.getId())) + { + entries.add(entry); + } + while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + entry = keyBinding.entryToObject(key); + if(entry.getQueueId().equals(queue.getId())) + { + entries.add(entry); + } + else + { + break; + } + } + } + } + catch (RuntimeException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(new BDBEnqueueRecord(queueId, messageId))) + { + break; + } + } + + } + + + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + List entries = new ArrayList(); + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + entries.add(entry); + } + } + catch (RuntimeException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(new BDBEnqueueRecord(queueId, messageId))) + { + break; + } + } + + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + try + { + cursor = getXidDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + XidBinding keyBinding = XidBinding.getInstance(); + PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + Xid xid = keyBinding.entryToObject(key); + PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); + if (!handler.handle(new BDBStoredXidRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId()), + preparedTransaction.getEnqueues(), preparedTransaction.getDequeues())) + { + break; + } + } + + } + catch (RuntimeException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + } + + + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java index eb5c4677ff..9ecc5b3283 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java @@ -25,21 +25,21 @@ import org.apache.qpid.server.store.Transaction; public class PreparedTransaction { - private final Transaction.Record[] _enqueues; - private final Transaction.Record[] _dequeues; + private final Transaction.EnqueueRecord[] _enqueues; + private final Transaction.DequeueRecord[] _dequeues; - public PreparedTransaction(Transaction.Record[] enqueues, Transaction.Record[] dequeues) + public PreparedTransaction(Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues) { _enqueues = enqueues; _dequeues = dequeues; } - public Transaction.Record[] getEnqueues() + public Transaction.EnqueueRecord[] getEnqueues() { return _enqueues; } - public Transaction.Record[] getDequeues() + public Transaction.DequeueRecord[] getDequeues() { return _dequeues; } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index 1f4cf45ce1..4d111e5c4f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -28,9 +28,11 @@ import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; public class PreparedTransactionBinding extends TupleBinding @@ -38,23 +40,34 @@ public class PreparedTransactionBinding extends TupleBinding storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); + MessageHandle messageHandle_0_8 = bdbStore.addMessage(messageMetaData_0_8); long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - storedMessage_0_8.addContent(0, firstContentBytes_0_8); - storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); - ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); + messageHandle_0_8.addContent(firstContentBytes_0_8); + messageHandle_0_8.addContent(secondContentBytes_0_8); + final StoredMessage storedMessage_0_8 = messageHandle_0_8.allContentAdded(); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_8).flushToStore(); /* * Create and insert a 0-10 message (metadata and content) @@ -124,13 +126,14 @@ public class BDBMessageStoreTest extends MessageStoreTestCase MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); - StoredMessage storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); + MessageHandle messageHandle_0_10 = bdbStore.addMessage(messageMetaData_0_10); long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); - long messageid_0_10 = storedMessage_0_10.getMessageNumber(); - storedMessage_0_10.addContent(0, completeContentBody_0_10); - ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore(); + messageHandle_0_10.addContent(completeContentBody_0_10); + final StoredMessage storedMessage_0_10 = messageHandle_0_10.allContentAdded(); + long messageid_0_10 = storedMessage_0_10.getMessageNumber(); + ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_10).flushToStore(); /* * reload the store only (read-only) @@ -352,12 +355,12 @@ public class BDBMessageStoreTest extends MessageStoreTestCase ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - StoredMessage storedMessage_0_8 = store.addMessage(messageMetaData_0_8); + MessageHandle storedMessage_0_8 = store.addMessage(messageMetaData_0_8); - storedMessage_0_8.addContent(0, chunk1); + storedMessage_0_8.addContent(chunk1); ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); - return storedMessage_0_8; + return storedMessage_0_8.allContentAdded(); } public void testOnDelete() throws Exception -- cgit v1.2.1