diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2011-12-15 13:54:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2011-12-15 13:54:36 +0000 |
| commit | 45066f17e52eb44dbb9c8f0250200d4ef974b54c (patch) | |
| tree | 54489ee4f3c48f8e055304f905651484f5a45926 /qpid/java/broker/src/test | |
| parent | 759e158c316c0ace2e166c0aa9cf0ab76352cfd5 (diff) | |
| download | qpid-python-45066f17e52eb44dbb9c8f0250200d4ef974b54c.tar.gz | |
QPID-3687 : Improve Java Broker performance
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1214760 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
4 files changed, 126 insertions, 20 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 5a411c6807..ab41158548 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -43,6 +43,8 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TestNetworkConnection; @@ -120,6 +122,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { } + public ClientDeliveryMethod createDeliveryMethod(int channelId) + { + return new InternalWriteDeliverMethod(channelId); + } + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) { } @@ -213,4 +220,42 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr ((AMQChannel)session).getProtocolSession().closeSession(); } + + private class InternalWriteDeliverMethod implements ClientDeliveryMethod + { + private int _channelId; + + public InternalWriteDeliverMethod(int channelId) + { + _channelId = channelId; + } + + + @Override + public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException + { + _deliveryCount.incrementAndGet(); + + synchronized (_channelDelivers) + { + Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId); + + if (consumers == null) + { + consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); + _channelDelivers.put(_channelId, consumers); + } + + LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getConsumerTag()); + + if (consumerDelivers == null) + { + consumerDelivers = new LinkedList<DeliveryPair>(); + consumers.put(sub.getConsumerTag(), consumerDelivers); + } + + consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage())); + } + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 0f5374b3e5..5d559c9d0d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -30,6 +30,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.util.InternalBrokerBaseCase; @@ -143,16 +144,19 @@ public class AckTest extends InternalBrokerBaseCase qs.add(_queue); msg.enqueue(qs); MessageMetaData mmd = msg.headersReceived(); - msg.setStoredMessage(_messageStore.addMessage(mmd)); + final StoredMessage storedMessage = _messageStore.addMessage(mmd); + msg.setStoredMessage(storedMessage); + final AMQMessage message = new AMQMessage(storedMessage); if(msg.allContentReceived()) { ServerTransaction txn = new AutoCommitTransaction(_messageStore); - txn.enqueue(_queue, msg, new ServerTransaction.Action() { + txn.enqueue(_queue, message, new ServerTransaction.Action() { public void postCommit() { try { - _queue.enqueue(new AMQMessage(msg.getStoredMessage())); + + _queue.enqueue(message); } catch (AMQException e) { @@ -170,6 +174,15 @@ public class AckTest extends InternalBrokerBaseCase // we manually send the message to the subscription //_subscription.send(new QueueEntry(_queue,msg), _queue); } + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + e.printStackTrace(); //TODO. + } + } /** @@ -181,9 +194,8 @@ public class AckTest extends InternalBrokerBaseCase _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount, true); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertEquals("",msgCount,map.size()); + assertEquals("Unextpected size for unacknowledge message map",msgCount,map.size()); Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -206,7 +218,6 @@ public class AckTest extends InternalBrokerBaseCase _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); assertTrue(_messageStore.getMessageCount() == 0); @@ -243,7 +254,7 @@ public class AckTest extends InternalBrokerBaseCase _channel.acknowledgeMessage(5, false); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == msgCount - 1); + assertEquals("Map not expected size",msgCount - 1,map.size()); Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -270,6 +281,8 @@ public class AckTest extends InternalBrokerBaseCase final int msgCount = 10; publishMessages(msgCount); + + _channel.acknowledgeMessage(5, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 5); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index e4ed232f13..6c7094cac0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -44,7 +44,6 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; @@ -189,6 +188,13 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); + try + { + Thread.sleep(2000L); + } + catch(InterruptedException e) + { + } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry); @@ -430,6 +436,13 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue @@ -723,7 +736,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); // call processQueue to deliver the messages - testQueue.processQueue(new QueueRunner(testQueue, 1) + testQueue.processQueue(new QueueRunner(testQueue) { @Override public void run() @@ -826,7 +839,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message is not copied as part of invocation of - * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)} + * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)} */ public void testCopyMessagesWithDequeuedEntry() { @@ -844,7 +857,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase SimpleAMQQueue queue = createQueue(anotherQueueName); // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); // copy messages into another queue _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); @@ -876,7 +889,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message is not moved as part of invocation of - * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)} + * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)} */ public void testMovedMessagesWithDequeuedEntry() { @@ -894,7 +907,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase SimpleAMQQueue queue = createQueue(anotherQueueName); // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); // move messages into another queue _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); @@ -927,7 +940,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that messages in given range including dequeued one are deleted * from the queue on invocation of - * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)} + * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long)} */ public void testRemoveMessagesFromQueueWithDequeuedEntry() { @@ -954,7 +967,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message on the top is not accounted and next message * is deleted from the queue on invocation of - * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)} + * {@link SimpleAMQQueue#deleteMessageFromTop()} */ public void testDeleteMessageFromTopWithDequeuedEntryOnTop() { @@ -983,7 +996,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that all messages including dequeued one are deleted from the queue - * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)} + * on invocation of {@link SimpleAMQQueue#clearQueue()} */ public void testClearQueueWithDequeuedEntry() { @@ -1049,10 +1062,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase { /** * Send a message and decrement latch + * @param entry + * @param batch */ - public void send(QueueEntry msg) throws AMQException + public void send(QueueEntry entry, boolean batch) throws AMQException { - super.send(msg); + super.send(entry, batch); latch.countDown(); } }; @@ -1063,7 +1078,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase testQueue.registerSubscription(subscription, false); // process queue - testQueue.processQueue(new QueueRunner(testQueue, 1) + testQueue.processQueue(new QueueRunner(testQueue) { public void run() { @@ -1127,6 +1142,19 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase { return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0); } + + @Override + public boolean acquire(Subscription sub) + { + if(((AMQMessage) message).getMessageId().longValue() % 2 == 0) + { + return false; + } + else + { + return super.acquire(sub); + } + } }; } }; @@ -1243,6 +1271,14 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase fail("Failure to put message on queue:" + e.getMessage()); } } + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 1efe1028db..c5a7ddd691 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -125,6 +125,12 @@ public class MockSubscription implements Subscription return queue; } + public boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } + + public void getSendLock() { _stateChangeLock.lock(); @@ -216,7 +222,7 @@ public class MockSubscription implements Subscription { } - public void send(QueueEntry entry) throws AMQException + public void send(QueueEntry entry, boolean batch) throws AMQException { if (messages.contains(entry)) { @@ -225,6 +231,12 @@ public class MockSubscription implements Subscription messages.add(entry); } + @Override + public void flushBatched() + { + + } + public void setQueueContext(AMQQueue.Context queueContext) { _queueContext = queueContext; |
