summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-04-03 22:21:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-04-03 22:21:05 +0000
commit8bcfb7bb278644a547bddf4719265d806ea69d72 (patch)
tree6aa69e2381cb2c7d71cbe59cbc8ed8005667937b /qpid/java/bdbstore/src
parent47f4f5148f7a6f4fa3c214cc2efd4e4a3f44641c (diff)
downloadqpid-python-8bcfb7bb278644a547bddf4719265d806ea69d72.tar.gz
QPID-6476 : [Java Broker] Refactor MessageStore to allow more efficient implementations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1671184 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java453
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java10
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java67
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java27
4 files changed, 359 insertions, 198 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index c668cc8595..b030b6c091 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -26,6 +26,7 @@ import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -48,6 +49,8 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
@@ -114,7 +117,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData)
{
long newMessageId = getNextMessageId();
@@ -163,154 +166,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void visitMessages(final MessageHandler handler) throws StoreException
+ public MessageStoreReader newMessageStoreReader()
{
- checkMessageStoreOpen();
- visitMessagesInternal(handler, getEnvironmentFacade());
- }
-
- @Override
- public StoredMessage<?> getMessage(final long messageId)
- {
- checkMessageStoreOpen();
- return getMessageInternal(messageId, getEnvironmentFacade());
- }
-
- @Override
- public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
-
- Cursor cursor = null;
- List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
- try
- {
- cursor = getDeliveryDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key);
-
- if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS)
- {
- QueueEntryKey entry = keyBinding.entryToObject(key);
- if(entry.getQueueId().equals(queue.getId()))
- {
- entries.add(entry);
- }
- while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
- {
- entry = keyBinding.entryToObject(key);
- if(entry.getQueueId().equals(queue.getId()))
- {
- entries.add(entry);
- }
- else
- {
- break;
- }
- }
- }
- }
- catch (RuntimeException e)
- {
- throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
- }
- finally
- {
- closeCursorSafely(cursor, getEnvironmentFacade());
- }
-
- for(QueueEntryKey entry : entries)
- {
- UUID queueId = entry.getQueueId();
- long messageId = entry.getMessageId();
- if (!handler.handle(queueId, messageId))
- {
- break;
- }
- }
-
- }
-
-
-
- @Override
- public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
-
- Cursor cursor = null;
- List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
- try
- {
- cursor = getDeliveryDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
- {
- QueueEntryKey entry = keyBinding.entryToObject(key);
- entries.add(entry);
- }
- }
- catch (RuntimeException e)
- {
- throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
- }
- finally
- {
- closeCursorSafely(cursor, getEnvironmentFacade());
- }
-
- for(QueueEntryKey entry : entries)
- {
- UUID queueId = entry.getQueueId();
- long messageId = entry.getMessageId();
- if (!handler.handle(queueId, messageId))
- {
- break;
- }
- }
-
- }
-
- @Override
- public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
-
- Cursor cursor = null;
- try
- {
- cursor = getXidDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- XidBinding keyBinding = XidBinding.getInstance();
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Xid xid = keyBinding.entryToObject(key);
- PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
- if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- preparedTransaction.getEnqueues(), preparedTransaction.getDequeues()))
- {
- break;
- }
- }
-
- }
- catch (RuntimeException e)
- {
- throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e);
- }
- finally
- {
- closeCursorSafely(cursor, getEnvironmentFacade());
- }
+ return new BDBMessageStoreReader();
}
/**
@@ -793,24 +651,23 @@ public abstract class AbstractBDBMessageStore implements MessageStore
* Extracts a message from a specified queue, in a given transaction.
*
* @param tx The transaction for the operation.
- * @param queue The queue to take the message from.
+ * @param queueId The id of the queue to take the message from.
* @param messageId The message to dequeue.
*
* @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- private void dequeueMessage(final Transaction tx, final TransactionLogResource queue,
+ private void dequeueMessage(final Transaction tx, final UUID queueId,
long messageId) throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
- UUID id = queue.getId();
+ QueueEntryKey queueEntryKey = new QueueEntryKey(queueId, messageId);
+ UUID id = queueId;
keyBinding.objectToEntry(queueEntryKey, key);
if (getLogger().isDebugEnabled())
{
- getLogger().debug("Dequeue message id " + messageId + " from queue "
- + queue.getName() + " with id " + id);
+ getLogger().debug("Dequeue message id " + messageId + " from queue with id " + id);
}
try
@@ -819,19 +676,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
OperationStatus status = getDeliveryDb().delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
- throw new StoreException("Unable to find message with id " + messageId + " on queue "
- + queue.getName() + " with id " + id);
+ throw new StoreException("Unable to find message with id " + messageId + " on queue with id " + id);
}
else if (status != OperationStatus.SUCCESS)
{
- throw new StoreException("Unable to remove message with id " + messageId + " on queue"
- + queue.getName() + " with id " + id);
+ throw new StoreException("Unable to remove message with id " + messageId + " on queue with id " + id);
}
if (getLogger().isDebugEnabled())
{
- getLogger().debug("Removed message " + messageId + " on queue "
- + queue.getName() + " with id " + id);
+ getLogger().debug("Removed message " + messageId + " on queue with id " + id);
}
}
@@ -849,8 +703,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
long format,
byte[] globalId,
byte[] branchId,
- org.apache.qpid.server.store.Transaction.Record[] enqueues,
- org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
+ org.apache.qpid.server.store.Transaction.EnqueueRecord[] enqueues,
+ org.apache.qpid.server.store.Transaction.DequeueRecord[] dequeues) throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
Xid xid = new Xid(format, globalId, branchId);
@@ -862,7 +716,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
valueBinding.objectToEntry(preparedTransaction, value);
List<Runnable> postActions = new ArrayList<>();
- for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues)
+ for(org.apache.qpid.server.store.Transaction.EnqueueRecord enqueue : enqueues)
{
StoredMessage storedMessage = enqueue.getMessage().getStoredMessage();
if(storedMessage instanceof StoredBDBMessage)
@@ -1187,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
{
private final long _messageId;
@@ -1234,7 +1088,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
+ public void addContent(ByteBuffer src)
{
src = src.slice();
byte[] data = _messageDataRef.getData();
@@ -1259,6 +1113,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
+ public StoredMessage<T> allContentAdded()
+ {
+ return this;
+ }
+
+ @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
byte[] data = _messageDataRef.getData();
@@ -1466,7 +1326,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
{
checkMessageStoreOpen();
@@ -1487,14 +1347,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
+ return new BDBEnqueueRecord(queue.getId(), message.getMessageNumber());
}
@Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ public void dequeueMessage(final MessageEnqueueRecord enqueueRecord)
{
checkMessageStoreOpen();
- AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
+ AbstractBDBMessageStore.this.dequeueMessage(_txn, enqueueRecord.getQueueId(),
+ enqueueRecord.getMessageNumber());
}
@Override
@@ -1546,21 +1408,268 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
+ public void removeXid(final StoredXidRecord record)
{
checkMessageStoreOpen();
- AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
+ AbstractBDBMessageStore.this.removeXid(_txn, record.getFormat(), record.getGlobalId(), record.getBranchId());
}
@Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
- Record[] dequeues) throws StoreException
+ public StoredXidRecord recordXid(final long format, final byte[] globalId, final byte[] branchId, final EnqueueRecord[] enqueues,
+ final DequeueRecord[] dequeues) throws StoreException
{
checkMessageStoreOpen();
_postCommitActions.addAll(AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues));
+ return new BDBStoredXidRecord(format, globalId, branchId);
}
+
}
+ private static class BDBStoredXidRecord implements org.apache.qpid.server.store.Transaction.StoredXidRecord
+ {
+ private final long _format;
+ private final byte[] _globalId;
+ private final byte[] _branchId;
+
+ public BDBStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId)
+ {
+ _format = format;
+ _globalId = globalId;
+ _branchId = branchId;
+ }
+
+ @Override
+ public long getFormat()
+ {
+ return _format;
+ }
+
+ @Override
+ public byte[] getGlobalId()
+ {
+ return _globalId;
+ }
+
+ @Override
+ public byte[] getBranchId()
+ {
+ return _branchId;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final BDBStoredXidRecord that = (BDBStoredXidRecord) o;
+
+ return _format == that._format
+ && Arrays.equals(_globalId, that._globalId)
+ && Arrays.equals(_branchId, that._branchId);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = (int) (_format ^ (_format >>> 32));
+ result = 31 * result + Arrays.hashCode(_globalId);
+ result = 31 * result + Arrays.hashCode(_branchId);
+ return result;
+ }
+ }
+ public static class BDBEnqueueRecord implements MessageEnqueueRecord
+ {
+ private final UUID _queueId;
+
+ private final long _messageNumber;
+
+ public BDBEnqueueRecord(final UUID queueid, final long messageNumber)
+ {
+ _queueId = queueid;
+ _messageNumber = messageNumber;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+
+ public UUID getQueueId()
+ {
+ return _queueId;
+ }
+
+ }
+
+ private class BDBMessageStoreReader implements MessageStoreReader
+ {
+ @Override
+ public void visitMessages(final MessageHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+ visitMessagesInternal(handler, getEnvironmentFacade());
+ }
+
+ @Override
+ public StoredMessage<?> getMessage(final long messageId)
+ {
+ checkMessageStoreOpen();
+ return getMessageInternal(messageId, getEnvironmentFacade());
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key);
+
+ if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ if(entry.getQueueId().equals(queue.getId()))
+ {
+ entries.add(entry);
+ }
+ while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ entry = keyBinding.entryToObject(key);
+ if(entry.getQueueId().equals(queue.getId()))
+ {
+ entries.add(entry);
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
+ catch (RuntimeException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor, getEnvironmentFacade());
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ if (!handler.handle(new BDBEnqueueRecord(queueId, messageId)))
+ {
+ break;
+ }
+ }
+
+ }
+
+
+
+ @Override
+ public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ entries.add(entry);
+ }
+ }
+ catch (RuntimeException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor, getEnvironmentFacade());
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ if (!handler.handle(new BDBEnqueueRecord(queueId, messageId)))
+ {
+ break;
+ }
+ }
+
+ }
+
+ @Override
+ public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getXidDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ XidBinding keyBinding = XidBinding.getInstance();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ Xid xid = keyBinding.entryToObject(key);
+ PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+ if (!handler.handle(new BDBStoredXidRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId()),
+ preparedTransaction.getEnqueues(), preparedTransaction.getDequeues()))
+ {
+ break;
+ }
+ }
+
+ }
+ catch (RuntimeException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor, getEnvironmentFacade());
+ }
+ }
+
+
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
index eb5c4677ff..9ecc5b3283 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
@@ -25,21 +25,21 @@ import org.apache.qpid.server.store.Transaction;
public class PreparedTransaction
{
- private final Transaction.Record[] _enqueues;
- private final Transaction.Record[] _dequeues;
+ private final Transaction.EnqueueRecord[] _enqueues;
+ private final Transaction.DequeueRecord[] _dequeues;
- public PreparedTransaction(Transaction.Record[] enqueues, Transaction.Record[] dequeues)
+ public PreparedTransaction(Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues)
{
_enqueues = enqueues;
_dequeues = dequeues;
}
- public Transaction.Record[] getEnqueues()
+ public Transaction.EnqueueRecord[] getEnqueues()
{
return _enqueues;
}
- public Transaction.Record[] getDequeues()
+ public Transaction.DequeueRecord[] getDequeues()
{
return _dequeues;
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
index 1f4cf45ce1..4d111e5c4f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
@@ -28,9 +28,11 @@ import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction>
@@ -38,23 +40,34 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
@Override
public PreparedTransaction entryToObject(TupleInput input)
{
- Transaction.Record[] enqueues = readRecords(input);
+ Transaction.EnqueueRecord[] enqueues = readEnqueueRecords(input);
- Transaction.Record[] dequeues = readRecords(input);
+ Transaction.DequeueRecord[] dequeues = readDequeueRecords(input);
return new PreparedTransaction(enqueues, dequeues);
}
- private Transaction.Record[] readRecords(TupleInput input)
+ private Transaction.EnqueueRecord[] readEnqueueRecords(TupleInput input)
{
- Transaction.Record[] records = new Transaction.Record[input.readInt()];
+ Transaction.EnqueueRecord[] records = new Transaction.EnqueueRecord[input.readInt()];
for(int i = 0; i < records.length; i++)
{
- records[i] = new RecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong());
+ records[i] = new EnqueueRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong());
}
return records;
}
+ private Transaction.DequeueRecord[] readDequeueRecords(TupleInput input)
+ {
+ Transaction.DequeueRecord[] records = new Transaction.DequeueRecord[input.readInt()];
+ for(int i = 0; i < records.length; i++)
+ {
+ records[i] = new DequeueRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong());
+ }
+ return records;
+ }
+
+
@Override
public void objectToEntry(PreparedTransaction preparedTransaction, TupleOutput output)
{
@@ -63,7 +76,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
- private void writeRecords(Transaction.Record[] records, TupleOutput output)
+ private void writeRecords(Transaction.EnqueueRecord[] records, TupleOutput output)
{
if(records == null)
{
@@ -72,7 +85,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
else
{
output.writeInt(records.length);
- for(Transaction.Record record : records)
+ for(Transaction.EnqueueRecord record : records)
{
UUID id = record.getResource().getId();
output.writeLong(id.getMostSignificantBits());
@@ -82,13 +95,32 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
}
- private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage
+ private void writeRecords(Transaction.DequeueRecord[] records, TupleOutput output)
+ {
+ if(records == null)
+ {
+ output.writeInt(0);
+ }
+ else
+ {
+ output.writeInt(records.length);
+ for(Transaction.DequeueRecord record : records)
+ {
+ UUID id = record.getEnqueueRecord().getQueueId();
+ output.writeLong(id.getMostSignificantBits());
+ output.writeLong(id.getLeastSignificantBits());
+ output.writeLong(record.getEnqueueRecord().getMessageNumber());
+ }
+ }
+ }
+
+ private static class EnqueueRecordImpl implements Transaction.EnqueueRecord, TransactionLogResource, EnqueueableMessage
{
private long _messageNumber;
private UUID _queueId;
- public RecordImpl(UUID queueId, long messageNumber)
+ public EnqueueRecordImpl(UUID queueId, long messageNumber)
{
_messageNumber = messageNumber;
_queueId = queueId;
@@ -137,4 +169,21 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
return MessageDurability.DEFAULT;
}
}
+
+ private static class DequeueRecordImpl implements Transaction.DequeueRecord
+ {
+
+ private final AbstractBDBMessageStore.BDBEnqueueRecord _record;
+
+ public DequeueRecordImpl(final UUID queueId, final long messageNumber)
+ {
+ _record = new AbstractBDBMessageStore.BDBEnqueueRecord(queueId, messageNumber);
+ }
+
+ @Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return _record;
+ }
+ }
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index fb0c11f6e5..3f8c1a7a99 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreTestCase;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -104,14 +105,15 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
- StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
+ MessageHandle<MessageMetaData> messageHandle_0_8 = bdbStore.addMessage(messageMetaData_0_8);
long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
- long messageid_0_8 = storedMessage_0_8.getMessageNumber();
- storedMessage_0_8.addContent(0, firstContentBytes_0_8);
- storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
- ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
+ messageHandle_0_8.addContent(firstContentBytes_0_8);
+ messageHandle_0_8.addContent(secondContentBytes_0_8);
+ final StoredMessage<MessageMetaData> storedMessage_0_8 = messageHandle_0_8.allContentAdded();
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+ ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_8).flushToStore();
/*
* Create and insert a 0-10 message (metadata and content)
@@ -124,13 +126,14 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
- StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10);
+ MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = bdbStore.addMessage(messageMetaData_0_10);
long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
- long messageid_0_10 = storedMessage_0_10.getMessageNumber();
- storedMessage_0_10.addContent(0, completeContentBody_0_10);
- ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore();
+ messageHandle_0_10.addContent(completeContentBody_0_10);
+ final StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = messageHandle_0_10.allContentAdded();
+ long messageid_0_10 = storedMessage_0_10.getMessageNumber();
+ ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_10).flushToStore();
/*
* reload the store only (read-only)
@@ -352,12 +355,12 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
- StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
+ MessageHandle<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
- storedMessage_0_8.addContent(0, chunk1);
+ storedMessage_0_8.addContent(chunk1);
((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
- return storedMessage_0_8;
+ return storedMessage_0_8.allContentAdded();
}
public void testOnDelete() throws Exception