From 55ccbf149980b06c7b7effa36871ffbdf50550fa Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 28 Dec 2011 13:02:41 +0000 Subject: QPID-3714 : [Java] Performance Improvements Persistence: Store message in same transaction as enqueue if possible Memory: Remove unnecessary (un)boxing Reduce unnecessary copying of message data Cache short strings Cache queues for a given routing key on an Exchange (0-9) Use a fixed size buffer for preparing frames to write out Other: Reduce calls to System.currentTimeMillis (0-10) Special case immutable RangeSets, in particular RangeSets of a single range/point (0-10) Special case delivery properties and message properties in headers (0-9) send commit-ok as soon as data committed to store Cache publishing access control queries (0-9) Optimised long and int typed values for FieldTables (0-9) Retain FieldTable encoded form (0-9) Cache queue and topic destinations git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1225178 13f79535-47bb-0310-9956-ffa450edef68 --- .../exchange/AbstractHeadersExchangeTestBase.java | 60 +++++----- .../qpid/server/queue/AMQPriorityQueueTest.java | 18 +-- .../org/apache/qpid/server/queue/MockAMQQueue.java | 124 ++++++++++----------- .../qpid/server/queue/MockStoredMessage.java | 13 ++- .../qpid/server/queue/QueueEntryListTestBase.java | 2 +- .../qpid/server/queue/SimpleAMQQueueTest.java | 4 +- .../server/queue/SimpleQueueEntryListTest.java | 4 +- .../server/queue/SortedQueueEntryListTest.java | 2 +- .../apache/qpid/server/store/MessageStoreTest.java | 8 +- .../qpid/server/store/SkeletonMessageStore.java | 88 +++------------ .../qpid/server/store/TestMemoryMessageStore.java | 6 + .../server/store/TestableMemoryMessageStore.java | 16 ++- .../qpid/server/txn/AutoCommitTransactionTest.java | 12 +- .../qpid/server/txn/LocalTransactionTest.java | 12 +- .../apache/qpid/server/txn/MockServerMessage.java | 14 ++- .../qpid/server/txn/MockStoreTransaction.java | 41 +++++-- .../qpid/server/virtualhost/MockVirtualHost.java | 5 - 17 files changed, 204 insertions(+), 225 deletions(-) (limited to 'qpid/java/broker/src/test') diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index ea2fe90da6..24c790d799 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -72,7 +72,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase /** * Not used in this test, just there to stub out the routing calls */ - private MessageStore _store = new MemoryMessageStore(); + private MemoryMessageStore _store = new MemoryMessageStore(); BindingFactory bindingFactory = new BindingFactory(new DurableConfigurationStore.Source() @@ -310,7 +310,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase * @throws AMQException */ @Override - public void enqueue(ServerMessage msg, PostEnqueueAction action) throws AMQException + public void enqueue(ServerMessage msg, boolean sync, PostEnqueueAction action) throws AMQException { messages.add( new HeadersExchangeTest.Message((AMQMessage) msg)); final QueueEntry queueEntry = new QueueEntry() @@ -318,47 +318,47 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase public AMQQueue getQueue() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public AMQMessage getMessage() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public long getSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public boolean getDeliveredToConsumer() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean expired() throws AMQException { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isAvailable() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isAcquired() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean acquire() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean acquire(Subscription sub) { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean delete() @@ -373,17 +373,17 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase public boolean acquiredBySubscription() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isAcquiredBy(Subscription subscription) { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public void release() { - //To change body of implemented methods use File | Settings | File Templates. + } public boolean releaseButRetain() @@ -393,82 +393,82 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase public boolean immediateAndNotDelivered() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public void setRedelivered() { - //To change body of implemented methods use File | Settings | File Templates. + } public AMQMessageHeader getMessageHeader() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public boolean isPersistent() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isRedelivered() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public Subscription getDeliveredSubscription() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void reject() { - //To change body of implemented methods use File | Settings | File Templates. + } public boolean isRejectedBy(long subscriptionId) { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public void dequeue() { - //To change body of implemented methods use File | Settings | File Templates. + } public void dispose() { - //To change body of implemented methods use File | Settings | File Templates. + } public void discard() { - //To change body of implemented methods use File | Settings | File Templates. + } public void routeToAlternate() { - //To change body of implemented methods use File | Settings | File Templates. + } public boolean isQueueDeleted() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public void addStateChangeListener(StateChangeListener listener) { - //To change body of implemented methods use File | Settings | File Templates. + } public boolean removeStateChangeListener(StateChangeListener listener) { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public int compareTo(final QueueEntry o) { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public boolean isDequeued() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 2ce43052d9..d5f8ef3d54 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -64,17 +64,17 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest ArrayList msgs = _subscription.getMessages(); try { - assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber()); - assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber()); - assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber()); + assertEquals(1L, msgs.get(0).getMessage().getMessageNumber()); + assertEquals(6L, msgs.get(1).getMessage().getMessageNumber()); + assertEquals(8L, msgs.get(2).getMessage().getMessageNumber()); - assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber()); - assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber()); - assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber()); + assertEquals(2L, msgs.get(3).getMessage().getMessageNumber()); + assertEquals(5L, msgs.get(4).getMessage().getMessageNumber()); + assertEquals(7L, msgs.get(5).getMessage().getMessageNumber()); - assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber()); - assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber()); - assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber()); + assertEquals(3L, msgs.get(6).getMessage().getMessageNumber()); + assertEquals(4L, msgs.get(7).getMessage().getMessageNumber()); + assertEquals(9L, msgs.get(8).getMessage().getMessageNumber()); } catch (AssertionFailedError afe) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 0daf79122c..f43af447ff 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -168,22 +168,22 @@ public class MockAMQQueue implements AMQQueue public UUID getId() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public QueueConfigType getConfigType() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public ConfiguredObject getParent() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public boolean isDurable() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isAutoDelete() @@ -199,7 +199,7 @@ public class MockAMQQueue implements AMQQueue public AMQShortString getOwner() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void setVirtualHost(VirtualHost virtualhost) @@ -219,22 +219,22 @@ public class MockAMQQueue implements AMQQueue public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + } public void unregisterSubscription(Subscription subscription) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + } public int getConsumerCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public int getActiveConsumerCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public boolean hasExclusiveSubscriber() @@ -244,37 +244,37 @@ public class MockAMQQueue implements AMQQueue public boolean isUnused() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isEmpty() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public int getMessageCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public int getUndeliveredMessageCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public long getQueueDepth() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public long getReceivedMessageCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public long getOldestMessageArrivalTime() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public boolean isDeleted() @@ -297,59 +297,58 @@ public class MockAMQQueue implements AMQQueue } + public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException + { + } + public void requeue(QueueEntry entry) { - //To change body of implemented methods use File | Settings | File Templates. } public void requeue(QueueEntryImpl storeContext, Subscription subscription) { - //To change body of implemented methods use File | Settings | File Templates. } public void dequeue(QueueEntry entry, Subscription sub) { - //To change body of implemented methods use File | Settings | File Templates. } public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public void addQueueDeleteTask(Task task) { - //To change body of implemented methods use File | Settings | File Templates. } public void removeQueueDeleteTask(final Task task) { - //To change body of implemented methods use File | Settings | File Templates. } public List getMessagesOnTheQueue() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public List getMessagesOnTheQueue(long fromMessageId, long toMessageId) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public List getMessagesOnTheQueue(int num) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public List getMessagesOnTheQueue(int num, int offest) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public QueueEntry getMessageOnTheQueue(long messageId) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public List getMessagesRangeOnTheQueue(long fromPosition, long toPosition) @@ -359,132 +358,123 @@ public class MockAMQQueue implements AMQQueue public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext) { - //To change body of implemented methods use File | Settings | File Templates. + } public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext) { - //To change body of implemented methods use File | Settings | File Templates. + } public void removeMessagesFromQueue(long fromMessageId, long toMessageId) { - //To change body of implemented methods use File | Settings | File Templates. + } public long getMaximumMessageSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setMaximumMessageSize(long value) { - //To change body of implemented methods use File | Settings | File Templates. + } public long getMaximumMessageCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setMaximumMessageCount(long value) { - //To change body of implemented methods use File | Settings | File Templates. + } public long getMaximumQueueDepth() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setMaximumQueueDepth(long value) { - //To change body of implemented methods use File | Settings | File Templates. + } public long getMaximumMessageAge() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setMaximumMessageAge(long maximumMessageAge) { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean getBlockOnQueueFull() - { - return false; - } - - public void setBlockOnQueueFull(boolean block) - { + } public long getMinimumAlertRepeatGap() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void deleteMessageFromTop() { - //To change body of implemented methods use File | Settings | File Templates. + } public long clearQueue() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void checkMessageStatus() throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + } public Set getNotificationChecks() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void flushSubscription(Subscription sub) throws AMQException { - //To change body of implemented methods use File | Settings | File Templates. + } public void deliverAsync(Subscription sub) { - //To change body of implemented methods use File | Settings | File Templates. + } public void deliverAsync() { - //To change body of implemented methods use File | Settings | File Templates. + } public void stop() { - //To change body of implemented methods use File | Settings | File Templates. + } public boolean isExclusive() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public Exchange getAlternateExchange() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void setAlternateExchange(Exchange exchange) { - //To change body of implemented methods use File | Settings | File Templates. + } public Map getArguments() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void checkCapacity(AMQChannel channel) @@ -493,12 +483,12 @@ public class MockAMQQueue implements AMQQueue public ManagedObject getManagedObject() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public int compareTo(AMQQueue o) { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setMinimumAlertRepeatGap(long value) @@ -508,22 +498,22 @@ public class MockAMQQueue implements AMQQueue public long getCapacity() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setCapacity(long capacity) { - //To change body of implemented methods use File | Settings | File Templates. + } public long getFlowResumeCapacity() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; } public void setFlowResumeCapacity(long flowResumeCapacity) { - //To change body of implemented methods use File | Settings | File Templates. + } public void configure(ConfigurationPlugin config) @@ -533,7 +523,7 @@ public class MockAMQQueue implements AMQQueue public ConfigurationPlugin getConfiguration() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public AuthorizationHolder getAuthorizationHolder() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java index 78ed3e9f34..75f633f2af 100755 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.framing.ContentHeaderBody; @@ -97,7 +96,17 @@ public class MockStoredMessage implements StoredMessage return src.limit(); } - public TransactionLog.StoreFuture flushToStore() + + + public ByteBuffer getContent(int offsetInMessage, int size) + { + ByteBuffer buf = ByteBuffer.allocate(size); + getContent(offsetInMessage, buf); + buf.position(0); + return buf; + } + + public MessageStore.StoreFuture flushToStore() { return MessageStore.IMMEDIATE_FUTURE; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index 7a3f6f701c..cf910208e7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -164,7 +164,7 @@ public abstract class QueueEntryListTestBase extends TestCase final QueueEntry head = getTestList().getHead(); assertNull("Head entry should not contain an actual message", head.getMessage()); assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head) - .getMessage().getMessageNumber().longValue()); + .getMessage().getMessageNumber()); } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 6c7094cac0..28d52f4fd1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -649,9 +649,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void onRollback() { } - }); - - + }, 0L); // Check that it is enqueued AMQQueue data = _store.getMessages().get(1L); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java index f3ba6a5495..a873739ca7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -162,8 +162,8 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase while (entry != null) { assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted()); - assertNotNull("QueueEntry was not found in the list of remaining entries", - remainingMessages.get(entry.getMessage().getMessageNumber().intValue())); + assertNotNull("QueueEntry "+entry.getMessage().getMessageNumber()+" was not found in the list of remaining entries " + remainingMessages, + remainingMessages.get((int)(entry.getMessage().getMessageNumber()))); count++; entry = entry.getNextNode(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index eca845644e..34ad0e5668 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -317,7 +317,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase assertEquals("Sorted queue entry value is not as expected", expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY")); assertEquals("Sorted queue entry id is not as expected", - Long.valueOf(expectedMessageId), entry.getMessage().getMessageNumber()); + expectedMessageId, entry.getMessage().getMessageNumber()); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 1d0a9d6316..90adaa1319 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -558,7 +558,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase /** * Delete the Store Environment path * - * @param configuration The configuration that contains the store environment path. + * @param environmentPath The configuration that contains the store environment path. */ private void cleanup(File environmentPath) { @@ -636,7 +636,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase { //To change body of implemented methods use File | Settings | File Templates. } - }); + }, 0L); } } @@ -710,7 +710,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase if (queue.isDurable() && !queue.isAutoDelete()) { - getVirtualHost().getMessageStore().createQueue(queue, queueArguments); + getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments); } } catch (AMQException e) @@ -754,7 +754,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase getVirtualHost().getExchangeRegistry().registerExchange(exchange); if (durable) { - getVirtualHost().getMessageStore().createExchange(exchange); + getVirtualHost().getDurableConfigurationStore().createExchange(exchange); } } catch (AMQException e) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 5ff84557d8..44006df517 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.exchange.Exchange; @@ -42,18 +43,11 @@ import java.nio.ByteBuffer; */ public class SkeletonMessageStore implements MessageStore { - private final AtomicLong _messageId = new AtomicLong(1); - - public void configure(String base, Configuration config) throws Exception - { - } - public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config, LogSubject logSubject) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. } public void configureMessageStore(String name, @@ -61,7 +55,6 @@ public class SkeletonMessageStore implements MessageStore Configuration config, LogSubject logSubject) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. } public void close() throws Exception @@ -70,31 +63,28 @@ public class SkeletonMessageStore implements MessageStore public StoredMessage addMessage(M metaData) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } - public void removeMessage(Long messageId) - { - } public void createExchange(Exchange exchange) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public void removeExchange(Exchange exchange) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public void createQueue(AMQQueue queue) throws AMQStoreException @@ -105,63 +95,11 @@ public class SkeletonMessageStore implements MessageStore { } - - - - public List createQueues() throws AMQException - { - return null; - } - - public Long getNewMessageId() - { - return _messageId.getAndIncrement(); - } - - public void storeContentBodyChunk( - Long messageId, - int index, - ContentChunk contentBody, - boolean lastContentBody) throws AMQException - { - - } - - public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException - { - - } - - public MessageMetaData getMessageMetaData(Long messageId) throws AMQException - { - return null; - } - - public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException - { - return null; - } - public boolean isPersistent() { return false; } - public void storeMessageHeader(Long messageNumber, ServerMessage message) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void storeContent(Long messageNumber, long offset, ByteBuffer body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public ServerMessage getMessage(Long messageNumber) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - public void removeQueue(final AMQQueue queue) throws AMQStoreException { @@ -172,7 +110,7 @@ public class SkeletonMessageStore implements MessageStore Configuration storeConfiguration, LogSubject logSubject) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. + } public Transaction newTransaction() @@ -180,19 +118,19 @@ public class SkeletonMessageStore implements MessageStore return new Transaction() { - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public void commitTran() throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } public StoreFuture commitTranAsync() throws AMQStoreException @@ -213,7 +151,7 @@ public class SkeletonMessageStore implements MessageStore public void abortTran() throws AMQStoreException { - //To change body of implemented methods use File | Settings | File Templates. + } }; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java index 4dea13d391..fa698f4cf8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -82,6 +82,12 @@ public class TestMemoryMessageStore extends MemoryMessageStore return _storedMessage.getContent(offsetInMessage, dst); } + + public ByteBuffer getContent(int offsetInMessage, int size) + { + return _storedMessage.getContent(offsetInMessage, size); + } + public StoreFuture flushToStore() { return _storedMessage.flushToStore(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 3593297a05..3804d0dc8e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -25,6 +25,8 @@ import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; /** @@ -66,14 +68,14 @@ public class TestableMemoryMessageStore extends MemoryMessageStore private class TestableTransaction implements Transaction { - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - getMessages().put(messageId, (AMQQueue)queue); + getMessages().put(message.getMessageNumber(), (AMQQueue)queue); } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - getMessages().remove(messageId); + getMessages().remove(message.getMessageNumber()); } public void commitTran() throws AMQStoreException @@ -143,6 +145,12 @@ public class TestableMemoryMessageStore extends MemoryMessageStore return _storedMessage.getContent(offsetInMessage, dst); } + + public ByteBuffer getContent(int offsetInMessage, int size) + { + return _storedMessage.getContent(offsetInMessage, size); + } + public StoreFuture flushToStore() { return _storedMessage.flushToStore(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java index 9afed49922..98484db264 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockQueueEntry; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; @@ -44,7 +44,7 @@ public class AutoCommitTransactionTest extends QpidTestCase { private ServerTransaction _transaction = null; // Class under test - private TransactionLog _transactionLog; + private MessageStore _transactionLog; private AMQQueue _queue; private List _queues; private Collection _queueEntries; @@ -137,7 +137,7 @@ public class AutoCommitTransactionTest extends QpidTestCase _message = createTestMessage(false); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action); + _transaction.enqueue(_queues, _message, _action, 0L); assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -157,7 +157,7 @@ public class AutoCommitTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action); + _transaction.enqueue(_queues, _message, _action, 0L); assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -175,7 +175,7 @@ public class AutoCommitTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, true, false, true}); - _transaction.enqueue(_queues, _message, _action); + _transaction.enqueue(_queues, _message, _action, 0L); assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); @@ -198,7 +198,7 @@ public class AutoCommitTransactionTest extends QpidTestCase try { - _transaction.enqueue(_queues, _message, _action); + _transaction.enqueue(_queues, _message, _action, 0L); fail("Exception not thrown"); } catch (RuntimeException re) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index e81fd8e3f1..484beb8fb4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockQueueEntry; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; @@ -51,7 +51,7 @@ public class LocalTransactionTest extends QpidTestCase private MockAction _action1; private MockAction _action2; private MockStoreTransaction _storeTransaction; - private TransactionLog _transactionLog; + private MessageStore _transactionLog; @Override @@ -140,7 +140,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(false); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action1); + _transaction.enqueue(_queues, _message, _action1, 0L); assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -156,7 +156,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action1); + _transaction.enqueue(_queues, _message, _action1, 0L); assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -173,7 +173,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, true, false, true}); - _transaction.enqueue(_queues, _message, _action1); + _transaction.enqueue(_queues, _message, _action1, 0L); assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); @@ -196,7 +196,7 @@ public class LocalTransactionTest extends QpidTestCase try { - _transaction.enqueue(_queues, _message, _action1); + _transaction.enqueue(_queues, _message, _action1, 0L); fail("Exception not thrown"); } catch (RuntimeException re) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 422105e410..f3d71c6dea 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.StoredMessage; /** * Mock Server Message allowing its persistent flag to be controlled from test. @@ -81,6 +82,11 @@ class MockServerMessage implements ServerMessage throw new NotImplementedException(); } + public StoredMessage getStoredMessage() + { + throw new NotImplementedException(); + } + public long getExpiration() { throw new NotImplementedException(); @@ -91,12 +97,18 @@ class MockServerMessage implements ServerMessage throw new NotImplementedException(); } + + public ByteBuffer getContent(int offset, int size) + { + throw new NotImplementedException(); + } + public long getArrivalTime() { throw new NotImplementedException(); } - public Long getMessageNumber() + public long getMessageNumber() { return 0L; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index ff372532ac..bf8fda307a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -24,11 +24,11 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.NotImplementedException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.TransactionLog.StoreFuture; -import org.apache.qpid.server.store.TransactionLog.Transaction; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.*; +import org.apache.qpid.server.store.MessageStore.StoreFuture; +import org.apache.qpid.server.store.MessageStore.Transaction; /** * Mock implementation of a (Store) Transaction allow its state to be observed. @@ -61,7 +61,7 @@ class MockStoreTransaction implements Transaction return _state; } - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { if (_throwExceptionOnQueueOp) { @@ -82,7 +82,7 @@ class MockStoreTransaction implements Transaction return _numberOfEnqueuedMessages; } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { if (_throwExceptionOnQueueOp) { @@ -107,10 +107,33 @@ class MockStoreTransaction implements Transaction _state = TransactionState.ABORTED; } - public static TransactionLog createTestTransactionLog(final MockStoreTransaction storeTransaction) + public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction) { - return new TransactionLog() + return new MessageStore() { + public void configureMessageStore(final String name, + final MessageStoreRecoveryHandler recoveryHandler, + final Configuration config, + final LogSubject logSubject) throws Exception + { + //TODO. + } + + public void close() throws Exception + { + //TODO. + } + + public StoredMessage addMessage(final T metaData) + { + return null; //TODO. + } + + public boolean isPersistent() + { + return false; //TODO. + } + public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, Configuration storeConfiguration, LogSubject logSubject) throws Exception { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 7aa314bf22..153371c8d9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -41,7 +41,6 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TransactionLog; public class MockVirtualHost implements VirtualHost { @@ -159,10 +158,6 @@ public class MockVirtualHost implements VirtualHost return null; } - public TransactionLog getTransactionLog() - { - return null; - } public void removeBrokerConnection(BrokerLink brokerLink) { -- cgit v1.2.1