summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-12-28 13:02:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-12-28 13:02:41 +0000
commit55ccbf149980b06c7b7effa36871ffbdf50550fa (patch)
treef5fc6181438968f82af0528c751af32ea8fef64e /qpid/java/bdbstore
parentf085f3b0ce89af428e75bf2ae3b8c65ecdd16ad6 (diff)
downloadqpid-python-55ccbf149980b06c7b7effa36871ffbdf50550fa.tar.gz
QPID-3714 : [Java] Performance Improvements
Persistence: Store message in same transaction as enqueue if possible Memory: Remove unnecessary (un)boxing Reduce unnecessary copying of message data Cache short strings Cache queues for a given routing key on an Exchange (0-9) Use a fixed size buffer for preparing frames to write out Other: Reduce calls to System.currentTimeMillis (0-10) Special case immutable RangeSets, in particular RangeSets of a single range/point (0-10) Special case delivery properties and message properties in headers (0-9) send commit-ok as soon as data committed to store Cache publishing access control queries (0-9) Optimised long and int typed values for FieldTables (0-9) Retain FieldTable encoded form (0-9) Cache queue and topic destinations git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1225178 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java307
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java107
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java32
4 files changed, 301 insertions, 147 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index f900159808..1d8187401d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -32,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.*;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
@@ -43,6 +46,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -70,17 +74,6 @@ import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.TransactionConfig;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -91,7 +84,7 @@ import com.sleepycat.je.TransactionConfig;
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
@SuppressWarnings({"unchecked"})
-public class BDBMessageStore implements MessageStore
+public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
@@ -205,18 +198,15 @@ public class BDBMessageStore implements MessageStore
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- _logSubject = logSubject;
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
+ CurrentActor.get().message(logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
- if(_configured)
+ if(!_configured)
{
- throw new Exception("ConfigStore already configured");
+ _logSubject = logSubject;
+ configure(name,storeConfiguration);
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
}
-
- configure(name,storeConfiguration);
-
- _configured = true;
- stateTransition(State.CONFIGURING, State.CONFIGURED);
recover(recoveryHandler);
stateTransition(State.RECOVERING, State.STARTED);
@@ -227,24 +217,31 @@ public class BDBMessageStore implements MessageStore
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+ CurrentActor.get().message(logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
if(!_configured)
{
- throw new Exception("ConfigStore not configured");
+ _logSubject = logSubject;
+ configure(name,storeConfiguration);
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
}
-
+
recoverMessages(recoveryHandler);
}
public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
Configuration storeConfiguration, LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+ CurrentActor.get().message(logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+
if(!_configured)
{
- throw new Exception("ConfigStore not configured");
+ _logSubject = logSubject;
+ configure(name,storeConfiguration);
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
}
recoverQueueEntries(recoveryHandler);
@@ -252,7 +249,7 @@ public class BDBMessageStore implements MessageStore
}
- public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction()
+ public org.apache.qpid.server.store.MessageStore.Transaction newTransaction()
{
return new BDBTransaction();
}
@@ -686,8 +683,6 @@ public class BDBMessageStore implements MessageStore
{
cursor = _messageMetaDataDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);;
-
DatabaseEntry value = new DatabaseEntry();
EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
@@ -695,7 +690,7 @@ public class BDBMessageStore implements MessageStore
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- long messageId = (Long) keyBinding.entryToObject(key);
+ long messageId = LongBinding.entryToLong(key);
StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
@@ -781,10 +776,15 @@ public class BDBMessageStore implements MessageStore
*
* @param messageId Identifies the message to remove.
*
- * @throws AMQInternalException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- public void removeMessage(Long messageId) throws AMQStoreException
+ public void removeMessage(long messageId) throws AMQStoreException
+ {
+ removeMessage(messageId, true);
+ }
+ public void removeMessage(long messageId, boolean sync) throws AMQStoreException
{
+
// _log.debug("public void removeMessage(Long messageId = " + messageId): called");
com.sleepycat.je.Transaction tx = null;
@@ -796,8 +796,7 @@ public class BDBMessageStore implements MessageStore
//remove the message meta data from the store
DatabaseEntry key = new DatabaseEntry();
- EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class);
- metaKeyBindingTuple.objectToEntry(messageId, key);
+ LongBinding.longToEntry(messageId, key);
if (_log.isDebugEnabled())
{
@@ -808,9 +807,8 @@ public class BDBMessageStore implements MessageStore
OperationStatus status = _messageMetaDataDb.delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
- tx.abort();
-
- throw new AMQStoreException("Message metadata not found for message id " + messageId);
+ _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
+ messageId);
}
if (_log.isDebugEnabled())
@@ -868,7 +866,7 @@ public class BDBMessageStore implements MessageStore
cursor.close();
cursor = null;
- commit(tx, true);
+ commit(tx, sync);
}
catch (DatabaseException e)
{
@@ -1174,11 +1172,12 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws AMQStoreException
{
// _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
- AMQShortString name = new AMQShortString(queue.getResourceName());
+ AMQShortString name = AMQShortString.valueOf(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
@@ -1212,7 +1211,8 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws AMQStoreException
{
AMQShortString name = new AMQShortString(queue.getResourceName());
@@ -1383,7 +1383,7 @@ public class BDBMessageStore implements MessageStore
*
* @return A fresh message id.
*/
- public Long getNewMessageId()
+ public long getNewMessageId()
{
return _messageId.incrementAndGet();
}
@@ -1398,7 +1398,7 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset,
+ protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
ByteBuffer contentBody) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
@@ -1436,7 +1436,8 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData)
+ private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
+ StorableMessageMetaData messageMetaData)
throws AMQStoreException
{
if (_log.isDebugEnabled())
@@ -1446,8 +1447,7 @@ public class BDBMessageStore implements MessageStore
}
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
- keyBinding.objectToEntry(messageId, key);
+ LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
@@ -1475,7 +1475,7 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException
+ public StorableMessageMetaData getMessageMetaData(long messageId) throws AMQStoreException
{
if (_log.isDebugEnabled())
{
@@ -1484,8 +1484,7 @@ public class BDBMessageStore implements MessageStore
}
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
- keyBinding.objectToEntry(messageId, key);
+ LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
@@ -1519,7 +1518,7 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException
+ public int getContent(long messageId, int offset, ByteBuffer dst) throws AMQStoreException
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
@@ -1778,7 +1777,6 @@ public class BDBMessageStore implements MessageStore
{
_log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
}
-
_complete = true;
notifyAll();
@@ -1799,7 +1797,7 @@ public class BDBMessageStore implements MessageStore
{
//_log.debug("public void commit(): called");
- _commitThread.addJob(this);
+ _commitThread.addJob(this, _syncCommit);
if(!_syncCommit)
{
@@ -1807,28 +1805,14 @@ public class BDBMessageStore implements MessageStore
return;
}
- synchronized (BDBCommitFuture.this)
- {
- while (!_complete)
- {
- try
- {
- wait(250);
- }
- catch (InterruptedException e)
- {
- // _log.error("Unexpected thread interruption: " + e, e);
- throw new RuntimeException(e);
- }
- }
+ waitForCompletion();
+ // _log.debug("Commit completed, _databaseException = " + _databaseException);
- // _log.debug("Commit completed, _databaseException = " + _databaseException);
-
- if (_databaseException != null)
- {
- throw _databaseException;
- }
+ if (_databaseException != null)
+ {
+ throw _databaseException;
}
+
}
public synchronized boolean isComplete()
@@ -1836,10 +1820,11 @@ public class BDBMessageStore implements MessageStore
return _complete;
}
- public void waitForCompletion()
+ public synchronized void waitForCompletion()
{
while (!isComplete())
{
+ _commitThread.explicitNotify();
try
{
wait(250);
@@ -1866,7 +1851,7 @@ public class BDBMessageStore implements MessageStore
// private final Logger _log = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>());
+ private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
private final CheckpointConfig _config = new CheckpointConfig();
private final Object _lock = new Object();
@@ -1877,6 +1862,14 @@ public class BDBMessageStore implements MessageStore
}
+ public void explicitNotify()
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+
public void run()
{
while (!_stopped.get())
@@ -1905,24 +1898,25 @@ public class BDBMessageStore implements MessageStore
{
// _log.debug("private void processJobs(): called");
- // we replace the old queue atomically with a new one and this avoids any need to
- // copy elements out of the queue
- Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>());
+ int size = _jobQueue.size();
try
{
- // _environment.checkpoint(_config);
+ //TODO - upgrade to BDB 5.0, then use: _environment.flushLog(true);
_environment.sync();
- for (BDBCommitFuture commit : jobs)
+ for(int i = 0; i < size; i++)
{
+ BDBCommitFuture commit = _jobQueue.poll();
commit.complete();
}
+
}
catch (DatabaseException e)
{
- for (BDBCommitFuture commit : jobs)
+ for(int i = 0; i < size; i++)
{
+ BDBCommitFuture commit = _jobQueue.poll();
commit.abort(e);
}
}
@@ -1931,15 +1925,19 @@ public class BDBMessageStore implements MessageStore
private boolean hasJobs()
{
- return !_jobQueue.get().isEmpty();
+ return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit)
+ public void addJob(BDBCommitFuture commit, final boolean sync)
{
- synchronized (_lock)
+
+ _jobQueue.add(commit);
+ if(sync)
{
- _jobQueue.get().add(commit);
- _lock.notifyAll();
+ synchronized (_lock)
+ {
+ _lock.notifyAll();
+ }
}
}
@@ -1959,7 +1957,10 @@ public class BDBMessageStore implements MessageStore
private final long _messageId;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
- private com.sleepycat.je.Transaction _txn;
+
+ private StorableMessageMetaData _metaData;
+ private volatile SoftReference<byte[]> _dataRef;
+ private byte[] _data;
StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
{
@@ -1973,22 +1974,15 @@ public class BDBMessageStore implements MessageStore
try
{
_messageId = messageId;
+ _metaData = metaData;
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
- {
- _txn = _environment.beginTransaction(null, null);
- storeMetaData(_txn, messageId, metaData);
- }
+
}
catch (DatabaseException e)
{
throw new RuntimeException(e);
}
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
}
@@ -2018,58 +2012,114 @@ public class BDBMessageStore implements MessageStore
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
- try
+ src = src.slice();
+
+ if(_data == null)
{
- BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src);
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
}
- catch (AMQStoreException e)
+ else
{
- throw new RuntimeException(e);
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData,0,_data,0,oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
}
+
}
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
- try
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
{
- return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
}
- catch (AMQStoreException e)
+ else
{
- throw new RuntimeException(e);
+ try
+ {
+ return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
- public StoreFuture flushToStore()
+ public ByteBuffer getContent(int offsetInMessage, int size)
{
- try
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
{
- if(_txn != null)
- {
- //if(_log.isDebugEnabled())
- //{
- // _log.debug("Flushing message " + _messageId + " to store");
- //}
- BDBMessageStore.this.commitTranImpl(_txn, true);
- }
+ return ByteBuffer.wrap(data,offsetInMessage,size);
}
- catch (AMQStoreException e)
+ else
{
- throw new RuntimeException(e);
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ getContent(offsetInMessage, buf);
+ buf.position(0);
+ return buf;
}
- finally
+ }
+
+ synchronized void store(com.sleepycat.je.Transaction txn)
+ {
+
+ if(_metaData != null)
{
- _txn = null;
+ try
+ {
+ _dataRef = new SoftReference<byte[]>(_data);
+ BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
+ BDBMessageStore.this.addContent(txn, _messageId, 0,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ catch(DatabaseException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+ }
+ }
+
+ public synchronized StoreFuture flushToStore()
+ {
+ if(_metaData != null)
+ {
+ com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
+ store(txn);
+ BDBMessageStore.this.commit(txn,true);
+
}
return IMMEDIATE_FUTURE;
}
public void remove()
{
- flushToStore();
try
{
- BDBMessageStore.this.removeMessage(_messageId);
+ BDBMessageStore.this.removeMessage(_messageId, false);
}
catch (AMQStoreException e)
{
@@ -2094,12 +2144,27 @@ public class BDBMessageStore implements MessageStore
}
}
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ if(message.getStoredMessage() instanceof StoredBDBMessage)
+ {
+ ((StoredBDBMessage)message.getStoredMessage()).store(_txn);
+ }
+
+ BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ public void enqueueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
{
BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
{
BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
index 975e558874..68f1e7ce6f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
@@ -33,7 +33,7 @@ public class QueueEntryTB extends TupleBinding<QueueEntryKey>
public QueueEntryKey entryToObject(TupleInput tupleInput)
{
AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- Long messageId = tupleInput.readLong();
+ long messageId = tupleInput.readLong();
return new QueueEntryKey(queueName, messageId);
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index ef31b78cfe..6c890daaca 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -32,13 +32,11 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.*;
import org.apache.qpid.server.store.MessageMetaDataType;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -100,7 +98,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
*/
MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
- Header header_0_10 = new Header(msgProps_0_10, delProps_0_10);
+ Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
@@ -162,7 +160,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
- DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class);
+ DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties();
assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
@@ -170,7 +168,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
- MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class);
+ MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties();
assertNotNull("MessageProperties were not returned", returnedMsgProps);
assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
@@ -352,7 +350,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
*/
public void testTranCommit() throws Exception
{
- TransactionLog log = getVirtualHost().getTransactionLog();
+ MessageStore log = getVirtualHost().getMessageStore();
BDBMessageStore bdbStore = assertBDBStore(log);
@@ -366,10 +364,10 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
}
};
- TransactionLog.Transaction txn = log.newTransaction();
+ MessageStore.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 1L);
- txn.enqueueMessage(mockQueue, 5L);
+ txn.enqueueMessage(mockQueue, new MockMessage(1L));
+ txn.enqueueMessage(mockQueue, new MockMessage(5L));
txn.commitTran();
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
@@ -390,7 +388,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
*/
public void testTranRollbackBeforeCommit() throws Exception
{
- TransactionLog log = getVirtualHost().getTransactionLog();
+ MessageStore log = getVirtualHost().getMessageStore();
BDBMessageStore bdbStore = assertBDBStore(log);
@@ -404,14 +402,14 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
}
};
- TransactionLog.Transaction txn = log.newTransaction();
+ MessageStore.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 21L);
+ txn.enqueueMessage(mockQueue, new MockMessage(21L));
txn.abortTran();
txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 22L);
- txn.enqueueMessage(mockQueue, 23L);
+ txn.enqueueMessage(mockQueue, new MockMessage(22L));
+ txn.enqueueMessage(mockQueue, new MockMessage(23L));
txn.commitTran();
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
@@ -431,7 +429,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
*/
public void testTranRollbackAfterCommit() throws Exception
{
- TransactionLog log = getVirtualHost().getTransactionLog();
+ MessageStore log = getVirtualHost().getMessageStore();
BDBMessageStore bdbStore = assertBDBStore(log);
@@ -445,17 +443,17 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
}
};
- TransactionLog.Transaction txn = log.newTransaction();
+ MessageStore.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 30L);
+ txn.enqueueMessage(mockQueue, new MockMessage(30L));
txn.commitTran();
txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 31L);
+ txn.enqueueMessage(mockQueue, new MockMessage(31L));
txn.abortTran();
txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 32L);
+ txn.enqueueMessage(mockQueue, new MockMessage(32L));
txn.commitTran();
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
@@ -467,4 +465,73 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
assertEquals("Second Message is incorrect", 32L, val.longValue());
}
+ private static class MockMessage implements ServerMessage, EnqueableMessage
+ {
+ private long _messageId;
+
+ public MockMessage(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public String getRoutingKey()
+ {
+ return null;
+ }
+
+ public AMQMessageHeader getMessageHeader()
+ {
+ return null;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public long getSize()
+ {
+ return 0;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public long getExpiration()
+ {
+ return 0;
+ }
+
+ public MessageReference newReference()
+ {
+ return null;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public long getArrivalTime()
+ {
+ return 0;
+ }
+
+ public int getContent(ByteBuffer buf, int offset)
+ {
+ return 0;
+ }
+
+ public ByteBuffer getContent(int offset, int length)
+ {
+ return null;
+ }
+ }
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index 8e55e79e01..6d7cca59cf 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -52,7 +52,9 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
@@ -415,7 +417,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
// add content entry to database
- long messageId = store.getNewMessageId();
+ final long messageId = store.getNewMessageId();
TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
MessageContentKey contentKey = null;
if (storeVersion == VERSION_4)
@@ -451,9 +453,29 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
return queueName.asString();
}
};
- TransactionLog log = (TransactionLog) store;
- TransactionLog.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, messageId);
+
+ EnqueableMessage mockMessage = new EnqueableMessage()
+ {
+
+ public long getMessageNumber()
+ {
+ return messageId;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+ };
+
+ MessageStore log = (MessageStore) store;
+ MessageStore.Transaction txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, mockMessage);
txn.commitTran();
}
finally