diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2011-12-28 13:02:41 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2011-12-28 13:02:41 +0000 |
| commit | 55ccbf149980b06c7b7effa36871ffbdf50550fa (patch) | |
| tree | f5fc6181438968f82af0528c751af32ea8fef64e /qpid/java/bdbstore | |
| parent | f085f3b0ce89af428e75bf2ae3b8c65ecdd16ad6 (diff) | |
| download | qpid-python-55ccbf149980b06c7b7effa36871ffbdf50550fa.tar.gz | |
QPID-3714 : [Java] Performance Improvements
Persistence:
Store message in same transaction as enqueue if possible
Memory:
Remove unnecessary (un)boxing
Reduce unnecessary copying of message data
Cache short strings
Cache queues for a given routing key on an Exchange
(0-9) Use a fixed size buffer for preparing frames to write out
Other:
Reduce calls to System.currentTimeMillis
(0-10) Special case immutable RangeSets, in particular RangeSets of a single range/point
(0-10) Special case delivery properties and message properties in headers
(0-9) send commit-ok as soon as data committed to store
Cache publishing access control queries
(0-9) Optimised long and int typed values for FieldTables
(0-9) Retain FieldTable encoded form
(0-9) Cache queue and topic destinations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1225178 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
4 files changed, 301 insertions, 147 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index f900159808..1d8187401d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; import java.lang.ref.SoftReference; +import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.LinkedList; @@ -32,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.*; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; @@ -43,6 +46,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -70,17 +74,6 @@ import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory; import com.sleepycat.bind.EntryBinding; import com.sleepycat.bind.tuple.ByteBinding; import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.je.CheckpointConfig; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.LockMode; -import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.TransactionConfig; /** * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log. @@ -91,7 +84,7 @@ import com.sleepycat.je.TransactionConfig; * dequeue messages to queues. <tr><td> Generate message identifiers. </table> */ @SuppressWarnings({"unchecked"}) -public class BDBMessageStore implements MessageStore +public class BDBMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _log = Logger.getLogger(BDBMessageStore.class); @@ -205,18 +198,15 @@ public class BDBMessageStore implements MessageStore Configuration storeConfiguration, LogSubject logSubject) throws Exception { - _logSubject = logSubject; - CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName())); + CurrentActor.get().message(logSubject, ConfigStoreMessages.CREATED(this.getClass().getName())); - if(_configured) + if(!_configured) { - throw new Exception("ConfigStore already configured"); + _logSubject = logSubject; + configure(name,storeConfiguration); + _configured = true; + stateTransition(State.CONFIGURING, State.CONFIGURED); } - - configure(name,storeConfiguration); - - _configured = true; - stateTransition(State.CONFIGURING, State.CONFIGURED); recover(recoveryHandler); stateTransition(State.RECOVERING, State.STARTED); @@ -227,24 +217,31 @@ public class BDBMessageStore implements MessageStore Configuration storeConfiguration, LogSubject logSubject) throws Exception { - CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); + CurrentActor.get().message(logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); if(!_configured) { - throw new Exception("ConfigStore not configured"); + _logSubject = logSubject; + configure(name,storeConfiguration); + _configured = true; + stateTransition(State.CONFIGURING, State.CONFIGURED); } - + recoverMessages(recoveryHandler); } public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, Configuration storeConfiguration, LogSubject logSubject) throws Exception { - CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName())); + CurrentActor.get().message(logSubject, TransactionLogMessages.CREATED(this.getClass().getName())); + if(!_configured) { - throw new Exception("ConfigStore not configured"); + _logSubject = logSubject; + configure(name,storeConfiguration); + _configured = true; + stateTransition(State.CONFIGURING, State.CONFIGURED); } recoverQueueEntries(recoveryHandler); @@ -252,7 +249,7 @@ public class BDBMessageStore implements MessageStore } - public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction() + public org.apache.qpid.server.store.MessageStore.Transaction newTransaction() { return new BDBTransaction(); } @@ -686,8 +683,6 @@ public class BDBMessageStore implements MessageStore { cursor = _messageMetaDataDb.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);; - DatabaseEntry value = new DatabaseEntry(); EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance(); @@ -695,7 +690,7 @@ public class BDBMessageStore implements MessageStore while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { - long messageId = (Long) keyBinding.entryToObject(key); + long messageId = LongBinding.entryToLong(key); StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value); StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false); @@ -781,10 +776,15 @@ public class BDBMessageStore implements MessageStore * * @param messageId Identifies the message to remove. * - * @throws AMQInternalException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - public void removeMessage(Long messageId) throws AMQStoreException + public void removeMessage(long messageId) throws AMQStoreException + { + removeMessage(messageId, true); + } + public void removeMessage(long messageId, boolean sync) throws AMQStoreException { + // _log.debug("public void removeMessage(Long messageId = " + messageId): called"); com.sleepycat.je.Transaction tx = null; @@ -796,8 +796,7 @@ public class BDBMessageStore implements MessageStore //remove the message meta data from the store DatabaseEntry key = new DatabaseEntry(); - EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class); - metaKeyBindingTuple.objectToEntry(messageId, key); + LongBinding.longToEntry(messageId, key); if (_log.isDebugEnabled()) { @@ -808,9 +807,8 @@ public class BDBMessageStore implements MessageStore OperationStatus status = _messageMetaDataDb.delete(tx, key); if (status == OperationStatus.NOTFOUND) { - tx.abort(); - - throw new AMQStoreException("Message metadata not found for message id " + messageId); + _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " + + messageId); } if (_log.isDebugEnabled()) @@ -868,7 +866,7 @@ public class BDBMessageStore implements MessageStore cursor.close(); cursor = null; - commit(tx, true); + commit(tx, sync); } catch (DatabaseException e) { @@ -1174,11 +1172,12 @@ public class BDBMessageStore implements MessageStore * * @throws AMQStoreException If the operation fails for any reason. */ - public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException + public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, + long messageId) throws AMQStoreException { // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called"); - AMQShortString name = new AMQShortString(queue.getResourceName()); + AMQShortString name = AMQShortString.valueOf(queue.getResourceName()); DatabaseEntry key = new DatabaseEntry(); EntryBinding keyBinding = new QueueEntryTB(); @@ -1212,7 +1211,8 @@ public class BDBMessageStore implements MessageStore * * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. */ - public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException + public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, + long messageId) throws AMQStoreException { AMQShortString name = new AMQShortString(queue.getResourceName()); @@ -1383,7 +1383,7 @@ public class BDBMessageStore implements MessageStore * * @return A fresh message id. */ - public Long getNewMessageId() + public long getNewMessageId() { return _messageId.incrementAndGet(); } @@ -1398,7 +1398,7 @@ public class BDBMessageStore implements MessageStore * * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. */ - protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset, + protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset, ByteBuffer contentBody) throws AMQStoreException { DatabaseEntry key = new DatabaseEntry(); @@ -1436,7 +1436,8 @@ public class BDBMessageStore implements MessageStore * * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. */ - private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData) + private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId, + StorableMessageMetaData messageMetaData) throws AMQStoreException { if (_log.isDebugEnabled()) @@ -1446,8 +1447,7 @@ public class BDBMessageStore implements MessageStore } DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class); - keyBinding.objectToEntry(messageId, key); + LongBinding.longToEntry(messageId, key); DatabaseEntry value = new DatabaseEntry(); TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance(); @@ -1475,7 +1475,7 @@ public class BDBMessageStore implements MessageStore * * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. */ - public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException + public StorableMessageMetaData getMessageMetaData(long messageId) throws AMQStoreException { if (_log.isDebugEnabled()) { @@ -1484,8 +1484,7 @@ public class BDBMessageStore implements MessageStore } DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class); - keyBinding.objectToEntry(messageId, key); + LongBinding.longToEntry(messageId, key); DatabaseEntry value = new DatabaseEntry(); TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance(); @@ -1519,7 +1518,7 @@ public class BDBMessageStore implements MessageStore * * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. */ - public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException + public int getContent(long messageId, int offset, ByteBuffer dst) throws AMQStoreException { DatabaseEntry contentKeyEntry = new DatabaseEntry(); @@ -1778,7 +1777,6 @@ public class BDBMessageStore implements MessageStore { _log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); } - _complete = true; notifyAll(); @@ -1799,7 +1797,7 @@ public class BDBMessageStore implements MessageStore { //_log.debug("public void commit(): called"); - _commitThread.addJob(this); + _commitThread.addJob(this, _syncCommit); if(!_syncCommit) { @@ -1807,28 +1805,14 @@ public class BDBMessageStore implements MessageStore return; } - synchronized (BDBCommitFuture.this) - { - while (!_complete) - { - try - { - wait(250); - } - catch (InterruptedException e) - { - // _log.error("Unexpected thread interruption: " + e, e); - throw new RuntimeException(e); - } - } + waitForCompletion(); + // _log.debug("Commit completed, _databaseException = " + _databaseException); - // _log.debug("Commit completed, _databaseException = " + _databaseException); - - if (_databaseException != null) - { - throw _databaseException; - } + if (_databaseException != null) + { + throw _databaseException; } + } public synchronized boolean isComplete() @@ -1836,10 +1820,11 @@ public class BDBMessageStore implements MessageStore return _complete; } - public void waitForCompletion() + public synchronized void waitForCompletion() { while (!isComplete()) { + _commitThread.explicitNotify(); try { wait(250); @@ -1866,7 +1851,7 @@ public class BDBMessageStore implements MessageStore // private final Logger _log = Logger.getLogger(CommitThread.class); private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>()); + private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); private final CheckpointConfig _config = new CheckpointConfig(); private final Object _lock = new Object(); @@ -1877,6 +1862,14 @@ public class BDBMessageStore implements MessageStore } + public void explicitNotify() + { + synchronized (_lock) + { + _lock.notify(); + } + } + public void run() { while (!_stopped.get()) @@ -1905,24 +1898,25 @@ public class BDBMessageStore implements MessageStore { // _log.debug("private void processJobs(): called"); - // we replace the old queue atomically with a new one and this avoids any need to - // copy elements out of the queue - Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>()); + int size = _jobQueue.size(); try { - // _environment.checkpoint(_config); + //TODO - upgrade to BDB 5.0, then use: _environment.flushLog(true); _environment.sync(); - for (BDBCommitFuture commit : jobs) + for(int i = 0; i < size; i++) { + BDBCommitFuture commit = _jobQueue.poll(); commit.complete(); } + } catch (DatabaseException e) { - for (BDBCommitFuture commit : jobs) + for(int i = 0; i < size; i++) { + BDBCommitFuture commit = _jobQueue.poll(); commit.abort(e); } } @@ -1931,15 +1925,19 @@ public class BDBMessageStore implements MessageStore private boolean hasJobs() { - return !_jobQueue.get().isEmpty(); + return !_jobQueue.isEmpty(); } - public void addJob(BDBCommitFuture commit) + public void addJob(BDBCommitFuture commit, final boolean sync) { - synchronized (_lock) + + _jobQueue.add(commit); + if(sync) { - _jobQueue.get().add(commit); - _lock.notifyAll(); + synchronized (_lock) + { + _lock.notifyAll(); + } } } @@ -1959,7 +1957,10 @@ public class BDBMessageStore implements MessageStore private final long _messageId; private volatile SoftReference<StorableMessageMetaData> _metaDataRef; - private com.sleepycat.je.Transaction _txn; + + private StorableMessageMetaData _metaData; + private volatile SoftReference<byte[]> _dataRef; + private byte[] _data; StoredBDBMessage(long messageId, StorableMessageMetaData metaData) { @@ -1973,22 +1974,15 @@ public class BDBMessageStore implements MessageStore try { _messageId = messageId; + _metaData = metaData; _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - if(persist) - { - _txn = _environment.beginTransaction(null, null); - storeMetaData(_txn, messageId, metaData); - } + } catch (DatabaseException e) { throw new RuntimeException(e); } - catch (AMQStoreException e) - { - throw new RuntimeException(e); - } } @@ -2018,58 +2012,114 @@ public class BDBMessageStore implements MessageStore public void addContent(int offsetInMessage, java.nio.ByteBuffer src) { - try + src = src.slice(); + + if(_data == null) { - BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src); + _data = new byte[src.remaining()]; + _dataRef = new SoftReference<byte[]>(_data); + src.duplicate().get(_data); } - catch (AMQStoreException e) + else { - throw new RuntimeException(e); + byte[] oldData = _data; + _data = new byte[oldData.length + src.remaining()]; + _dataRef = new SoftReference<byte[]>(_data); + + System.arraycopy(oldData,0,_data,0,oldData.length); + src.duplicate().get(_data, oldData.length, src.remaining()); } + } public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) { - try + byte[] data = _dataRef == null ? null : _dataRef.get(); + if(data != null) { - return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; } - catch (AMQStoreException e) + else { - throw new RuntimeException(e); + try + { + return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } } } - public StoreFuture flushToStore() + public ByteBuffer getContent(int offsetInMessage, int size) { - try + byte[] data = _dataRef == null ? null : _dataRef.get(); + if(data != null) { - if(_txn != null) - { - //if(_log.isDebugEnabled()) - //{ - // _log.debug("Flushing message " + _messageId + " to store"); - //} - BDBMessageStore.this.commitTranImpl(_txn, true); - } + return ByteBuffer.wrap(data,offsetInMessage,size); } - catch (AMQStoreException e) + else { - throw new RuntimeException(e); + ByteBuffer buf = ByteBuffer.allocate(size); + getContent(offsetInMessage, buf); + buf.position(0); + return buf; } - finally + } + + synchronized void store(com.sleepycat.je.Transaction txn) + { + + if(_metaData != null) { - _txn = null; + try + { + _dataRef = new SoftReference<byte[]>(_data); + BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); + BDBMessageStore.this.addContent(txn, _messageId, 0, + _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + } + catch(DatabaseException e) + { + throw new RuntimeException(e); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + catch (RuntimeException e) + { + e.printStackTrace(); + throw e; + } + finally + { + _metaData = null; + _data = null; + } + } + } + + public synchronized StoreFuture flushToStore() + { + if(_metaData != null) + { + com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); + store(txn); + BDBMessageStore.this.commit(txn,true); + } return IMMEDIATE_FUTURE; } public void remove() { - flushToStore(); try { - BDBMessageStore.this.removeMessage(_messageId); + BDBMessageStore.this.removeMessage(_messageId, false); } catch (AMQStoreException e) { @@ -2094,12 +2144,27 @@ public class BDBMessageStore implements MessageStore } } - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + { + if(message.getStoredMessage() instanceof StoredBDBMessage) + { + ((StoredBDBMessage)message.getStoredMessage()).store(_txn); + } + + BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); + } + + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + { + BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); + } + + public void enqueueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException { BDBMessageStore.this.enqueueMessage(_txn, queue, messageId); } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void dequeueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException { BDBMessageStore.this.dequeueMessage(_txn, queue, messageId); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java index 975e558874..68f1e7ce6f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java @@ -33,7 +33,7 @@ public class QueueEntryTB extends TupleBinding<QueueEntryKey> public QueueEntryKey entryToObject(TupleInput tupleInput) { AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); - Long messageId = tupleInput.readLong(); + long messageId = tupleInput.readLong(); return new QueueEntryKey(queueName, messageId); } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index ef31b78cfe..6c890daaca 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -32,13 +32,11 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.message.MessageMetaData_0_10; +import org.apache.qpid.server.message.*; import org.apache.qpid.server.store.MessageMetaDataType; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -100,7 +98,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto */ MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); - Header header_0_10 = new Header(msgProps_0_10, delProps_0_10); + Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); @@ -162,7 +160,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); - DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class); + DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties(); assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); @@ -170,7 +168,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); - MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class); + MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties(); assertNotNull("MessageProperties were not returned", returnedMsgProps); assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); @@ -352,7 +350,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto */ public void testTranCommit() throws Exception { - TransactionLog log = getVirtualHost().getTransactionLog(); + MessageStore log = getVirtualHost().getMessageStore(); BDBMessageStore bdbStore = assertBDBStore(log); @@ -366,10 +364,10 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto } }; - TransactionLog.Transaction txn = log.newTransaction(); + MessageStore.Transaction txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, 1L); - txn.enqueueMessage(mockQueue, 5L); + txn.enqueueMessage(mockQueue, new MockMessage(1L)); + txn.enqueueMessage(mockQueue, new MockMessage(5L)); txn.commitTran(); List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); @@ -390,7 +388,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto */ public void testTranRollbackBeforeCommit() throws Exception { - TransactionLog log = getVirtualHost().getTransactionLog(); + MessageStore log = getVirtualHost().getMessageStore(); BDBMessageStore bdbStore = assertBDBStore(log); @@ -404,14 +402,14 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto } }; - TransactionLog.Transaction txn = log.newTransaction(); + MessageStore.Transaction txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, 21L); + txn.enqueueMessage(mockQueue, new MockMessage(21L)); txn.abortTran(); txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, 22L); - txn.enqueueMessage(mockQueue, 23L); + txn.enqueueMessage(mockQueue, new MockMessage(22L)); + txn.enqueueMessage(mockQueue, new MockMessage(23L)); txn.commitTran(); List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); @@ -431,7 +429,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto */ public void testTranRollbackAfterCommit() throws Exception { - TransactionLog log = getVirtualHost().getTransactionLog(); + MessageStore log = getVirtualHost().getMessageStore(); BDBMessageStore bdbStore = assertBDBStore(log); @@ -445,17 +443,17 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto } }; - TransactionLog.Transaction txn = log.newTransaction(); + MessageStore.Transaction txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, 30L); + txn.enqueueMessage(mockQueue, new MockMessage(30L)); txn.commitTran(); txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, 31L); + txn.enqueueMessage(mockQueue, new MockMessage(31L)); txn.abortTran(); txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, 32L); + txn.enqueueMessage(mockQueue, new MockMessage(32L)); txn.commitTran(); List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); @@ -467,4 +465,73 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto assertEquals("Second Message is incorrect", 32L, val.longValue()); } + private static class MockMessage implements ServerMessage, EnqueableMessage + { + private long _messageId; + + public MockMessage(long messageId) + { + _messageId = messageId; + } + + public String getRoutingKey() + { + return null; + } + + public AMQMessageHeader getMessageHeader() + { + return null; + } + + public StoredMessage getStoredMessage() + { + return null; + } + + public boolean isPersistent() + { + return true; + } + + public long getSize() + { + return 0; + } + + public boolean isImmediate() + { + return false; + } + + public long getExpiration() + { + return 0; + } + + public MessageReference newReference() + { + return null; + } + + public long getMessageNumber() + { + return _messageId; + } + + public long getArrivalTime() + { + return 0; + } + + public int getContent(ByteBuffer buf, int offset) + { + return 0; + } + + public ByteBuffer getContent(int offset, int length) + { + return null; + } + } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java index 8e55e79e01..6d7cca59cf 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -52,7 +52,9 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4; import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory; @@ -415,7 +417,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize); // add content entry to database - long messageId = store.getNewMessageId(); + final long messageId = store.getNewMessageId(); TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance(); MessageContentKey contentKey = null; if (storeVersion == VERSION_4) @@ -451,9 +453,29 @@ public class BDBUpgradeTest extends QpidBrokerTestCase return queueName.asString(); } }; - TransactionLog log = (TransactionLog) store; - TransactionLog.Transaction txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, messageId); + + EnqueableMessage mockMessage = new EnqueableMessage() + { + + public long getMessageNumber() + { + return messageId; + } + + public boolean isPersistent() + { + return true; + } + + public StoredMessage getStoredMessage() + { + return null; + } + }; + + MessageStore log = (MessageStore) store; + MessageStore.Transaction txn = log.newTransaction(); + txn.enqueueMessage(mockQueue, mockMessage); txn.commitTran(); } finally |
