summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-12-15 13:54:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-12-15 13:54:36 +0000
commit45066f17e52eb44dbb9c8f0250200d4ef974b54c (patch)
tree54489ee4f3c48f8e055304f905651484f5a45926 /qpid/java/broker/src/test
parent759e158c316c0ace2e166c0aa9cf0ab76352cfd5 (diff)
downloadqpid-python-45066f17e52eb44dbb9c8f0250200d4ef974b54c.tar.gz
QPID-3687 : Improve Java Broker performance
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1214760 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java45
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java27
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java60
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java14
4 files changed, 126 insertions, 20 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 5a411c6807..ab41158548 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -43,6 +43,8 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TestNetworkConnection;
@@ -120,6 +122,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
}
+ public ClientDeliveryMethod createDeliveryMethod(int channelId)
+ {
+ return new InternalWriteDeliverMethod(channelId);
+ }
+
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
}
@@ -213,4 +220,42 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
((AMQChannel)session).getProtocolSession().closeSession();
}
+
+ private class InternalWriteDeliverMethod implements ClientDeliveryMethod
+ {
+ private int _channelId;
+
+ public InternalWriteDeliverMethod(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+
+ @Override
+ public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException
+ {
+ _deliveryCount.incrementAndGet();
+
+ synchronized (_channelDelivers)
+ {
+ Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
+
+ if (consumers == null)
+ {
+ consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ _channelDelivers.put(_channelId, consumers);
+ }
+
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getConsumerTag());
+
+ if (consumerDelivers == null)
+ {
+ consumerDelivers = new LinkedList<DeliveryPair>();
+ consumers.put(sub.getConsumerTag(), consumerDelivers);
+ }
+
+ consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
+ }
+ }
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 0f5374b3e5..5d559c9d0d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
@@ -143,16 +144,19 @@ public class AckTest extends InternalBrokerBaseCase
qs.add(_queue);
msg.enqueue(qs);
MessageMetaData mmd = msg.headersReceived();
- msg.setStoredMessage(_messageStore.addMessage(mmd));
+ final StoredMessage storedMessage = _messageStore.addMessage(mmd);
+ msg.setStoredMessage(storedMessage);
+ final AMQMessage message = new AMQMessage(storedMessage);
if(msg.allContentReceived())
{
ServerTransaction txn = new AutoCommitTransaction(_messageStore);
- txn.enqueue(_queue, msg, new ServerTransaction.Action() {
+ txn.enqueue(_queue, message, new ServerTransaction.Action() {
public void postCommit()
{
try
{
- _queue.enqueue(new AMQMessage(msg.getStoredMessage()));
+
+ _queue.enqueue(message);
}
catch (AMQException e)
{
@@ -170,6 +174,15 @@ public class AckTest extends InternalBrokerBaseCase
// we manually send the message to the subscription
//_subscription.send(new QueueEntry(_queue,msg), _queue);
}
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
}
/**
@@ -181,9 +194,8 @@ public class AckTest extends InternalBrokerBaseCase
_subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount, true);
-
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertEquals("",msgCount,map.size());
+ assertEquals("Unextpected size for unacknowledge message map",msgCount,map.size());
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -206,7 +218,6 @@ public class AckTest extends InternalBrokerBaseCase
_subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
-
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
assertTrue(_messageStore.getMessageCount() == 0);
@@ -243,7 +254,7 @@ public class AckTest extends InternalBrokerBaseCase
_channel.acknowledgeMessage(5, false);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount - 1);
+ assertEquals("Map not expected size",msgCount - 1,map.size());
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -270,6 +281,8 @@ public class AckTest extends InternalBrokerBaseCase
final int msgCount = 10;
publishMessages(msgCount);
+
+
_channel.acknowledgeMessage(5, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index e4ed232f13..6c7094cac0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -44,7 +44,6 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
@@ -189,6 +188,13 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch(InterruptedException e)
+ {
+ }
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry);
@@ -430,6 +436,13 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e)
+ {
+ }
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
@@ -723,7 +736,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
// call processQueue to deliver the messages
- testQueue.processQueue(new QueueRunner(testQueue, 1)
+ testQueue.processQueue(new QueueRunner(testQueue)
{
@Override
public void run()
@@ -826,7 +839,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that dequeued message is not copied as part of invocation of
- * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)}
+ * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)}
*/
public void testCopyMessagesWithDequeuedEntry()
{
@@ -844,7 +857,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
SimpleAMQQueue queue = createQueue(anotherQueueName);
// create transaction
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
// copy messages into another queue
_queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
@@ -876,7 +889,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that dequeued message is not moved as part of invocation of
- * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)}
+ * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)}
*/
public void testMovedMessagesWithDequeuedEntry()
{
@@ -894,7 +907,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
SimpleAMQQueue queue = createQueue(anotherQueueName);
// create transaction
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
// move messages into another queue
_queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
@@ -927,7 +940,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that messages in given range including dequeued one are deleted
* from the queue on invocation of
- * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)}
+ * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long)}
*/
public void testRemoveMessagesFromQueueWithDequeuedEntry()
{
@@ -954,7 +967,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that dequeued message on the top is not accounted and next message
* is deleted from the queue on invocation of
- * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)}
+ * {@link SimpleAMQQueue#deleteMessageFromTop()}
*/
public void testDeleteMessageFromTopWithDequeuedEntryOnTop()
{
@@ -983,7 +996,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that all messages including dequeued one are deleted from the queue
- * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)}
+ * on invocation of {@link SimpleAMQQueue#clearQueue()}
*/
public void testClearQueueWithDequeuedEntry()
{
@@ -1049,10 +1062,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
/**
* Send a message and decrement latch
+ * @param entry
+ * @param batch
*/
- public void send(QueueEntry msg) throws AMQException
+ public void send(QueueEntry entry, boolean batch) throws AMQException
{
- super.send(msg);
+ super.send(entry, batch);
latch.countDown();
}
};
@@ -1063,7 +1078,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
testQueue.registerSubscription(subscription, false);
// process queue
- testQueue.processQueue(new QueueRunner(testQueue, 1)
+ testQueue.processQueue(new QueueRunner(testQueue)
{
public void run()
{
@@ -1127,6 +1142,19 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0);
}
+
+ @Override
+ public boolean acquire(Subscription sub)
+ {
+ if(((AMQMessage) message).getMessageId().longValue() % 2 == 0)
+ {
+ return false;
+ }
+ else
+ {
+ return super.acquire(sub);
+ }
+ }
};
}
};
@@ -1243,6 +1271,14 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
fail("Failure to put message on queue:" + e.getMessage());
}
}
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
}
/**
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 1efe1028db..c5a7ddd691 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -125,6 +125,12 @@ public class MockSubscription implements Subscription
return queue;
}
+ public boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
+
public void getSendLock()
{
_stateChangeLock.lock();
@@ -216,7 +222,7 @@ public class MockSubscription implements Subscription
{
}
- public void send(QueueEntry entry) throws AMQException
+ public void send(QueueEntry entry, boolean batch) throws AMQException
{
if (messages.contains(entry))
{
@@ -225,6 +231,12 @@ public class MockSubscription implements Subscription
messages.add(entry);
}
+ @Override
+ public void flushBatched()
+ {
+
+ }
+
public void setQueueContext(AMQQueue.Context queueContext)
{
_queueContext = queueContext;