diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-04-03 22:21:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-04-03 22:21:05 +0000 |
| commit | 8bcfb7bb278644a547bddf4719265d806ea69d72 (patch) | |
| tree | 6aa69e2381cb2c7d71cbe59cbc8ed8005667937b /qpid/java/bdbstore/src | |
| parent | 47f4f5148f7a6f4fa3c214cc2efd4e4a3f44641c (diff) | |
| download | qpid-python-8bcfb7bb278644a547bddf4719265d806ea69d72.tar.gz | |
QPID-6476 : [Java Broker] Refactor MessageStore to allow more efficient implementations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1671184 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
4 files changed, 359 insertions, 198 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index c668cc8595..b030b6c091 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -26,6 +26,7 @@ import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.UUID; @@ -48,6 +49,8 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.EventManager; +import org.apache.qpid.server.store.MessageEnqueueRecord; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; @@ -114,7 +117,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) + public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData) { long newMessageId = getNextMessageId(); @@ -163,154 +166,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void visitMessages(final MessageHandler handler) throws StoreException + public MessageStoreReader newMessageStoreReader() { - checkMessageStoreOpen(); - visitMessagesInternal(handler, getEnvironmentFacade()); - } - - @Override - public StoredMessage<?> getMessage(final long messageId) - { - checkMessageStoreOpen(); - return getMessageInternal(messageId, getEnvironmentFacade()); - } - - @Override - public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key); - - if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - QueueEntryKey entry = keyBinding.entryToObject(key); - if(entry.getQueueId().equals(queue.getId())) - { - entries.add(entry); - } - while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - entry = keyBinding.entryToObject(key); - if(entry.getQueueId().equals(queue.getId())) - { - entries.add(entry); - } - else - { - break; - } - } - } - } - catch (RuntimeException e) - { - throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); - } - finally - { - closeCursorSafely(cursor, getEnvironmentFacade()); - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - if (!handler.handle(queueId, messageId)) - { - break; - } - } - - } - - - - @Override - public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - QueueEntryKey entry = keyBinding.entryToObject(key); - entries.add(entry); - } - } - catch (RuntimeException e) - { - throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); - } - finally - { - closeCursorSafely(cursor, getEnvironmentFacade()); - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - if (!handler.handle(queueId, messageId)) - { - break; - } - } - - } - - @Override - public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - try - { - cursor = getXidDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - XidBinding keyBinding = XidBinding.getInstance(); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - Xid xid = keyBinding.entryToObject(key); - PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); - if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), - preparedTransaction.getEnqueues(), preparedTransaction.getDequeues())) - { - break; - } - } - - } - catch (RuntimeException e) - { - throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e); - } - finally - { - closeCursorSafely(cursor, getEnvironmentFacade()); - } + return new BDBMessageStoreReader(); } /** @@ -793,24 +651,23 @@ public abstract class AbstractBDBMessageStore implements MessageStore * Extracts a message from a specified queue, in a given transaction. * * @param tx The transaction for the operation. - * @param queue The queue to take the message from. + * @param queueId The id of the queue to take the message from. * @param messageId The message to dequeue. * * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. */ - private void dequeueMessage(final Transaction tx, final TransactionLogResource queue, + private void dequeueMessage(final Transaction tx, final UUID queueId, long messageId) throws StoreException { DatabaseEntry key = new DatabaseEntry(); QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); - UUID id = queue.getId(); + QueueEntryKey queueEntryKey = new QueueEntryKey(queueId, messageId); + UUID id = queueId; keyBinding.objectToEntry(queueEntryKey, key); if (getLogger().isDebugEnabled()) { - getLogger().debug("Dequeue message id " + messageId + " from queue " - + queue.getName() + " with id " + id); + getLogger().debug("Dequeue message id " + messageId + " from queue with id " + id); } try @@ -819,19 +676,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore OperationStatus status = getDeliveryDb().delete(tx, key); if (status == OperationStatus.NOTFOUND) { - throw new StoreException("Unable to find message with id " + messageId + " on queue " - + queue.getName() + " with id " + id); + throw new StoreException("Unable to find message with id " + messageId + " on queue with id " + id); } else if (status != OperationStatus.SUCCESS) { - throw new StoreException("Unable to remove message with id " + messageId + " on queue" - + queue.getName() + " with id " + id); + throw new StoreException("Unable to remove message with id " + messageId + " on queue with id " + id); } if (getLogger().isDebugEnabled()) { - getLogger().debug("Removed message " + messageId + " on queue " - + queue.getName() + " with id " + id); + getLogger().debug("Removed message " + messageId + " on queue with id " + id); } } @@ -849,8 +703,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore long format, byte[] globalId, byte[] branchId, - org.apache.qpid.server.store.Transaction.Record[] enqueues, - org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException + org.apache.qpid.server.store.Transaction.EnqueueRecord[] enqueues, + org.apache.qpid.server.store.Transaction.DequeueRecord[] dequeues) throws StoreException { DatabaseEntry key = new DatabaseEntry(); Xid xid = new Xid(format, globalId, branchId); @@ -862,7 +716,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); valueBinding.objectToEntry(preparedTransaction, value); List<Runnable> postActions = new ArrayList<>(); - for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues) + for(org.apache.qpid.server.store.Transaction.EnqueueRecord enqueue : enqueues) { StoredMessage storedMessage = enqueue.getMessage().getStoredMessage(); if(storedMessage instanceof StoredBDBMessage) @@ -1187,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T> { private final long _messageId; @@ -1234,7 +1088,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void addContent(int offsetInMessage, ByteBuffer src) + public void addContent(ByteBuffer src) { src = src.slice(); byte[] data = _messageDataRef.getData(); @@ -1259,6 +1113,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override + public StoredMessage<T> allContentAdded() + { + return this; + } + + @Override public int getContent(int offsetInMessage, ByteBuffer dst) { byte[] data = _messageDataRef.getData(); @@ -1466,7 +1326,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException { checkMessageStoreOpen(); @@ -1487,14 +1347,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); + return new BDBEnqueueRecord(queue.getId(), message.getMessageNumber()); } @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + public void dequeueMessage(final MessageEnqueueRecord enqueueRecord) { checkMessageStoreOpen(); - AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); + AbstractBDBMessageStore.this.dequeueMessage(_txn, enqueueRecord.getQueueId(), + enqueueRecord.getMessageNumber()); } @Override @@ -1546,21 +1408,268 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException + public void removeXid(final StoredXidRecord record) { checkMessageStoreOpen(); - AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId); + AbstractBDBMessageStore.this.removeXid(_txn, record.getFormat(), record.getGlobalId(), record.getBranchId()); } @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, - Record[] dequeues) throws StoreException + public StoredXidRecord recordXid(final long format, final byte[] globalId, final byte[] branchId, final EnqueueRecord[] enqueues, + final DequeueRecord[] dequeues) throws StoreException { checkMessageStoreOpen(); _postCommitActions.addAll(AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues)); + return new BDBStoredXidRecord(format, globalId, branchId); } + } + private static class BDBStoredXidRecord implements org.apache.qpid.server.store.Transaction.StoredXidRecord + { + private final long _format; + private final byte[] _globalId; + private final byte[] _branchId; + + public BDBStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId) + { + _format = format; + _globalId = globalId; + _branchId = branchId; + } + + @Override + public long getFormat() + { + return _format; + } + + @Override + public byte[] getGlobalId() + { + return _globalId; + } + + @Override + public byte[] getBranchId() + { + return _branchId; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final BDBStoredXidRecord that = (BDBStoredXidRecord) o; + + return _format == that._format + && Arrays.equals(_globalId, that._globalId) + && Arrays.equals(_branchId, that._branchId); + + } + + @Override + public int hashCode() + { + int result = (int) (_format ^ (_format >>> 32)); + result = 31 * result + Arrays.hashCode(_globalId); + result = 31 * result + Arrays.hashCode(_branchId); + return result; + } + } + public static class BDBEnqueueRecord implements MessageEnqueueRecord + { + private final UUID _queueId; + + private final long _messageNumber; + + public BDBEnqueueRecord(final UUID queueid, final long messageNumber) + { + _queueId = queueid; + _messageNumber = messageNumber; + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public UUID getQueueId() + { + return _queueId; + } + + } + + private class BDBMessageStoreReader implements MessageStoreReader + { + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + checkMessageStoreOpen(); + visitMessagesInternal(handler, getEnvironmentFacade()); + } + + @Override + public StoredMessage<?> getMessage(final long messageId) + { + checkMessageStoreOpen(); + return getMessageInternal(messageId, getEnvironmentFacade()); + } + + @Override + public void close() + { + + } + + @Override + public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key); + + if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + if(entry.getQueueId().equals(queue.getId())) + { + entries.add(entry); + } + while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + entry = keyBinding.entryToObject(key); + if(entry.getQueueId().equals(queue.getId())) + { + entries.add(entry); + } + else + { + break; + } + } + } + } + catch (RuntimeException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(new BDBEnqueueRecord(queueId, messageId))) + { + break; + } + } + + } + + + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + entries.add(entry); + } + } + catch (RuntimeException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(new BDBEnqueueRecord(queueId, messageId))) + { + break; + } + } + + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + try + { + cursor = getXidDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + XidBinding keyBinding = XidBinding.getInstance(); + PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + Xid xid = keyBinding.entryToObject(key); + PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); + if (!handler.handle(new BDBStoredXidRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId()), + preparedTransaction.getEnqueues(), preparedTransaction.getDequeues())) + { + break; + } + } + + } + catch (RuntimeException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + } + + + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java index eb5c4677ff..9ecc5b3283 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java @@ -25,21 +25,21 @@ import org.apache.qpid.server.store.Transaction; public class PreparedTransaction { - private final Transaction.Record[] _enqueues; - private final Transaction.Record[] _dequeues; + private final Transaction.EnqueueRecord[] _enqueues; + private final Transaction.DequeueRecord[] _dequeues; - public PreparedTransaction(Transaction.Record[] enqueues, Transaction.Record[] dequeues) + public PreparedTransaction(Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues) { _enqueues = enqueues; _dequeues = dequeues; } - public Transaction.Record[] getEnqueues() + public Transaction.EnqueueRecord[] getEnqueues() { return _enqueues; } - public Transaction.Record[] getDequeues() + public Transaction.DequeueRecord[] getDequeues() { return _dequeues; } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index 1f4cf45ce1..4d111e5c4f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -28,9 +28,11 @@ import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction> @@ -38,23 +40,34 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction @Override public PreparedTransaction entryToObject(TupleInput input) { - Transaction.Record[] enqueues = readRecords(input); + Transaction.EnqueueRecord[] enqueues = readEnqueueRecords(input); - Transaction.Record[] dequeues = readRecords(input); + Transaction.DequeueRecord[] dequeues = readDequeueRecords(input); return new PreparedTransaction(enqueues, dequeues); } - private Transaction.Record[] readRecords(TupleInput input) + private Transaction.EnqueueRecord[] readEnqueueRecords(TupleInput input) { - Transaction.Record[] records = new Transaction.Record[input.readInt()]; + Transaction.EnqueueRecord[] records = new Transaction.EnqueueRecord[input.readInt()]; for(int i = 0; i < records.length; i++) { - records[i] = new RecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong()); + records[i] = new EnqueueRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong()); } return records; } + private Transaction.DequeueRecord[] readDequeueRecords(TupleInput input) + { + Transaction.DequeueRecord[] records = new Transaction.DequeueRecord[input.readInt()]; + for(int i = 0; i < records.length; i++) + { + records[i] = new DequeueRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong()); + } + return records; + } + + @Override public void objectToEntry(PreparedTransaction preparedTransaction, TupleOutput output) { @@ -63,7 +76,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction } - private void writeRecords(Transaction.Record[] records, TupleOutput output) + private void writeRecords(Transaction.EnqueueRecord[] records, TupleOutput output) { if(records == null) { @@ -72,7 +85,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction else { output.writeInt(records.length); - for(Transaction.Record record : records) + for(Transaction.EnqueueRecord record : records) { UUID id = record.getResource().getId(); output.writeLong(id.getMostSignificantBits()); @@ -82,13 +95,32 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction } } - private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage + private void writeRecords(Transaction.DequeueRecord[] records, TupleOutput output) + { + if(records == null) + { + output.writeInt(0); + } + else + { + output.writeInt(records.length); + for(Transaction.DequeueRecord record : records) + { + UUID id = record.getEnqueueRecord().getQueueId(); + output.writeLong(id.getMostSignificantBits()); + output.writeLong(id.getLeastSignificantBits()); + output.writeLong(record.getEnqueueRecord().getMessageNumber()); + } + } + } + + private static class EnqueueRecordImpl implements Transaction.EnqueueRecord, TransactionLogResource, EnqueueableMessage { private long _messageNumber; private UUID _queueId; - public RecordImpl(UUID queueId, long messageNumber) + public EnqueueRecordImpl(UUID queueId, long messageNumber) { _messageNumber = messageNumber; _queueId = queueId; @@ -137,4 +169,21 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction return MessageDurability.DEFAULT; } } + + private static class DequeueRecordImpl implements Transaction.DequeueRecord + { + + private final AbstractBDBMessageStore.BDBEnqueueRecord _record; + + public DequeueRecordImpl(final UUID queueId, final long messageNumber) + { + _record = new AbstractBDBMessageStore.BDBEnqueueRecord(queueId, messageNumber); + } + + @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return _record; + } + } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index fb0c11f6e5..3f8c1a7a99 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreTestCase; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -104,14 +105,15 @@ public class BDBMessageStoreTest extends MessageStoreTestCase ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); + MessageHandle<MessageMetaData> messageHandle_0_8 = bdbStore.addMessage(messageMetaData_0_8); long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - storedMessage_0_8.addContent(0, firstContentBytes_0_8); - storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); - ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); + messageHandle_0_8.addContent(firstContentBytes_0_8); + messageHandle_0_8.addContent(secondContentBytes_0_8); + final StoredMessage<MessageMetaData> storedMessage_0_8 = messageHandle_0_8.allContentAdded(); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_8).flushToStore(); /* * Create and insert a 0-10 message (metadata and content) @@ -124,13 +126,14 @@ public class BDBMessageStoreTest extends MessageStoreTestCase MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); - StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); + MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = bdbStore.addMessage(messageMetaData_0_10); long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); - long messageid_0_10 = storedMessage_0_10.getMessageNumber(); - storedMessage_0_10.addContent(0, completeContentBody_0_10); - ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore(); + messageHandle_0_10.addContent(completeContentBody_0_10); + final StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = messageHandle_0_10.allContentAdded(); + long messageid_0_10 = storedMessage_0_10.getMessageNumber(); + ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_10).flushToStore(); /* * reload the store only (read-only) @@ -352,12 +355,12 @@ public class BDBMessageStoreTest extends MessageStoreTestCase ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); + MessageHandle<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); - storedMessage_0_8.addContent(0, chunk1); + storedMessage_0_8.addContent(chunk1); ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); - return storedMessage_0_8; + return storedMessage_0_8.allContentAdded(); } public void testOnDelete() throws Exception |
