From 0425f1a831e6f7c56ee7936a95d3327328b4a74b Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Tue, 2 Oct 2007 12:28:37 +0000 Subject: QPID-616. Corrected pending message count and pending data size calculations for pubsub testing. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@581207 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/requestreply/PingPongProducer.java | 99 +++++++++++++--------- 1 file changed, 58 insertions(+), 41 deletions(-) (limited to 'java/perftests') diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 6bb531e24f..ba962e1135 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -408,7 +408,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti protected Object _sendPauseMonitor = new Object(); /** Keeps a count of the number of message currently sent but not received. */ - protected AtomicInteger _unreceived = new AtomicInteger(0); + protected static AtomicInteger _unreceived = new AtomicInteger(0); /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); @@ -486,7 +486,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ public PingPongProducer(Properties overrides) throws Exception { - // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); + log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); // Create a set of parsed properties from the defaults overriden by the passed in values. ParsedProperties properties = new ParsedProperties(defaults); @@ -694,12 +694,12 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ public void createProducer() throws JMSException { - // log.debug("public void createProducer(): called"); + log.debug("public void createProducer(): called"); _producer = (MessageProducer) _producerSession.createProducer(null); _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); + log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); } /** @@ -717,14 +717,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, boolean durable) throws JMSException, AMQException { - /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " - + durable + "): called");*/ + + durable + "): called"); _pingDestinations = new ArrayList(); // Create the desired number of ping destinations and consumers for them. - // log.debug("Creating " + noOfDestinations + " destinations to ping."); + log.debug("Creating " + noOfDestinations + " destinations to ping."); for (int i = 0; i < noOfDestinations; i++) { @@ -735,12 +735,12 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. if (unique) { - // log.debug("Creating unique destinations."); + log.debug("Creating unique destinations."); id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); } else { - // log.debug("Creating shared destinations."); + log.debug("Creating shared destinations."); id = "_" + _queueSharedID.incrementAndGet(); } @@ -750,14 +750,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti if (!durable) { destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id); - // log.debug("Created non-durable topic " + destination); + log.debug("Created non-durable topic " + destination); } else { destination = AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id), _clientID, (AMQConnection) _connection); - // log.debug("Created durable topic " + destination); + log.debug("Created durable topic " + destination); } } // Otherwise this is a p2p pinger, in which case create queues. @@ -771,7 +771,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null, ExchangeDefaults.DIRECT_EXCHANGE_NAME); - // log.debug("Created queue " + destination); + log.debug("Created queue " + destination); } // Keep the destination. @@ -831,24 +831,24 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ public void onMessageWithConsumerNo(Message message, int consumerNo) { - // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); + log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); try { long now = System.nanoTime(); long timestamp = getTimestamp(message); long pingTime = now - timestamp; - // NDC.push("cons" + consumerNo); + NDC.push("cons" + consumerNo); // Extract the messages correlation id. String correlationID = message.getJMSCorrelationID(); - // log.debug("correlationID = " + correlationID); + log.debug("correlationID = " + correlationID); int num = message.getIntProperty("MSG_NUM"); - // log.info("Message " + num + " received."); + log.info("Message " + num + " received."); boolean isRedelivered = message.getJMSRedelivered(); - // log.debug("isRedelivered = " + isRedelivered); + log.debug("isRedelivered = " + isRedelivered); if (!isRedelivered) { @@ -862,7 +862,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Restart the timeout timer on every message. perCorrelationId.timeOutStart = System.nanoTime(); - // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); + log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); // Decrement the countdown latch. Before this point, it is possible that two threads might enter this // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block @@ -879,7 +879,12 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Decrement the count of sent but not yet received messages. int unreceived = _unreceived.decrementAndGet(); - int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); + int unreceivedSize = + (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) + / (_isPubSub ? getConsumersPerDestination() : 1); + + log.debug("unreceived = " + unreceived); + log.debug("unreceivedSize = " + unreceivedSize); // Release a waiting sender if there is one. synchronized (_sendPauseMonitor) @@ -890,22 +895,23 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti } } - // NDC.push("/rem" + remainingCount); + NDC.push("/rem" + remainingCount); - // log.debug("remainingCount = " + remainingCount); - // log.debug("trueCount = " + trueCount); + log.debug("remainingCount = " + remainingCount); + log.debug("trueCount = " + trueCount); // Commit on transaction batch size boundaries. At this point in time the waiting producer remains // blocked, even on the last message. // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on // each batch boundary. For pub/sub each consumer gets every message so no division is done. long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); - // log.debug("commitCount = " + commitCount); + log.debug("commitCount = " + commitCount); if ((commitCount % _txBatchSize) == 0) { // log.debug("Trying commit for consumer " + consumerNo + "."); commitTx(_consumerSession[consumerNo]); + log.info("Tx committed on consumer " + consumerNo); } // Forward the message and remaining count to any interested chained message listener. @@ -950,8 +956,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti } finally { - // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); - // NDC.clear(); + log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); + NDC.clear(); } } @@ -1122,8 +1128,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ protected boolean sendMessage(int i, Message message) throws JMSException { - // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); - // log.debug("_txBatchSize = " + _txBatchSize); + log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); + log.debug("_txBatchSize = " + _txBatchSize); // Round robin the destinations as the messages are sent. Destination destination = _pingDestinations.get(i % _pingDestinations.size()); @@ -1154,15 +1160,16 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti { // Get the size estimate of sent but not yet received messages. int unreceived = _unreceived.get(); - int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); + int unreceivedSize = + (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) / (_isPubSub ? getConsumersPerDestination() : 1); - // log.debug("unreceived = " + unreceived); - // log.debug("unreceivedSize = " + unreceivedSize); - // log.debug("_maxPendingSize = " + _maxPendingSize); + log.debug("unreceived = " + unreceived); + log.debug("unreceivedSize = " + unreceivedSize); + log.debug("_maxPendingSize = " + _maxPendingSize); if (unreceivedSize > _maxPendingSize) { - // log.debug("unreceived size estimate over limit = " + unreceivedSize); + log.debug("unreceived size estimate over limit = " + unreceivedSize); // Wait on the send pause barrier for the limit to be re-established. try @@ -1202,7 +1209,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti message.setIntProperty("MSG_NUM", num); setTimestamp(message); _producer.send(message); - // log.info("Message " + num + " sent."); + log.info("Message " + num + " sent."); } else { @@ -1210,11 +1217,15 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti message.setIntProperty("MSG_NUM", num); setTimestamp(message); _producer.send(destination, message); - // log.info("Message " + num + " sent."); + log.info("Message " + num + " sent."); } - // Increase the unreceived size, this may actually happen aftern the message is received. - _unreceived.getAndIncrement(); + // Increase the unreceived size, this may actually happen after the message is received. + // The unreceived size is incremented by the number of consumers that will get a copy of the message, + // in pub/sub mode. + // _unreceived.getAndIncrement(); + int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); + log.debug("newUnreceivedCount = " + newUnreceivedCount); // Apply message rate throttling if a rate limit has been set up. if (_rateLimiter != null) @@ -1340,11 +1351,15 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti public void start() throws JMSException { + log.debug("public void start(): called"); + _connection.start(); + log.debug("Producer started."); for (int i = 0; i < _noOfConsumers; i++) { _consumerConnection[i].start(); + log.debug("Consumer " + i + " started."); } } @@ -1394,22 +1409,24 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ public void close() throws JMSException { - // log.debug("public void close(): called"); + log.debug("public void close(): called"); try { if (_connection != null) { + log.debug("Before close producer connection."); _connection.close(); - // log.debug("Close connection."); + log.debug("Closed producer connection."); } for (int i = 0; i < _noOfConsumers; i++) { if (_consumerConnection[i] != null) { + log.debug("Before close consumer connection " + i + "."); _consumerConnection[i].close(); - // log.debug("Closed consumer connection."); + log.debug("Closed consumer connection " + i + "."); } } } @@ -1449,7 +1466,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ protected boolean commitTx(Session session) throws JMSException { - // log.debug("protected void commitTx(Session session): called"); + log.debug("protected void commitTx(Session session): called"); boolean committed = false; @@ -1486,7 +1503,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti long start = System.nanoTime(); session.commit(); committed = true; - // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); + log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); if (_failAfterCommit) { -- cgit v1.2.1