diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-01-21 13:33:00 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-01-21 13:33:00 +0000 |
| commit | 403b5a871b67f2939bbca2568c340c97f616edfe (patch) | |
| tree | b8a798e94e0acf910fdde03d5906b64934bbe0e1 /java/perftests/src | |
| parent | e64a022647f07937144a400f0ef1185b084a892b (diff) | |
| download | qpid-python-403b5a871b67f2939bbca2568c340c97f616edfe.tar.gz | |
Imporved topic perf tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@613883 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src')
3 files changed, 100 insertions, 14 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java b/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java index a9bdc61e0e..4a726c19ea 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java +++ b/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java @@ -28,6 +28,7 @@ import java.util.Properties; import java.util.Random; import java.util.List; import java.util.ArrayList; +import java.io.FileWriter; public class Client { @@ -74,6 +75,9 @@ public class Client Properties properties=new Properties(); properties.load(this.getClass().getResourceAsStream("topic.properties")); + String logFilePath = System.getProperty("logFilePath", "./"); + FileWriter file = new FileWriter(logFilePath + "client-" + System.currentTimeMillis() + ".cvs",true); + //Create the initial context Context ctx=new InitialContext(properties); @@ -99,6 +103,14 @@ public class Client // Create a session on the connection // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queueCompleted = session.createQueue("completed"); + Queue queueStarted = session.createQueue("started"); + MessageProducer prod = session.createProducer(queueCompleted); + MessageConsumer cons = session.createConsumer(queueStarted); + cons.receive(); + _logger.info("Starting producing messages"); + _message=TestMessageFactory.newBytesMessage(session, 1024); Random random=new Random(); @@ -108,6 +120,7 @@ public class Client long intervalThroughput; long totalThroughput; long numProducers=1; + String info; startNewProducer(session, random); while (testDuration < duration) { @@ -121,17 +134,21 @@ public class Client intervalThroughput=(totalMessagesProduced - messagesProducedLastInterval) / 5; totalThroughput=totalMessagesProduced / testDuration; messagesProducedLastInterval=totalMessagesProduced; - _logger.info("Number of producers " + numProducers + " | This interval throughput = " + - intervalThroughput + " | Total throughput = " + totalThroughput); + info = "Number of producers " + numProducers + " | This interval throughput = " + + intervalThroughput + " | Total throughput = " + totalThroughput; + _logger.info(info); + file.write(info + "\n"); startNewProducer(session, random); numProducers++; } + file.close(); // stop all the producers for (Runner runner : _runners) { runner.stop(); } - + _logger.info("Stopping server"); + prod.send(session.createTextMessage("stop")); } catch (Exception e) { @@ -144,11 +161,13 @@ public class Client { // select a random topic int topicNumber=random.nextInt(50); + _logger.info("creating producer for topic: topic- " + topicNumber); Topic topic=session.createTopic("topic-" + topicNumber); MessageProducer prod=session.createProducer(topic); Runner runner=new Runner(prod); _runners.add(runner); Thread thread=new Thread(runner); + thread.setDaemon(true); thread.start(); } @@ -156,7 +175,6 @@ public class Client { MessageProducer _prod; boolean _produce=true; - private Runner(MessageProducer prod) { _prod=prod; diff --git a/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java b/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java index aeebe5976e..883a7465a1 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java +++ b/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java @@ -26,6 +26,7 @@ import javax.naming.InitialContext; import javax.jms.*; import java.util.Properties; +import java.io.FileWriter; public class Server @@ -33,9 +34,14 @@ public class Server /** * This class logger */ - private static final Logger _logger =LoggerFactory.getLogger(Server.class); + private static final Logger _logger=LoggerFactory.getLogger(Server.class); + private final Object _lock=new Object(); + private long _numMessages=0; + public FileWriter _file; + public boolean _running=true; + public static void main(String[] args) { (new Server()).runServer(); @@ -49,6 +55,9 @@ public class Server Properties properties=new Properties(); properties.load(this.getClass().getResourceAsStream("topic.properties")); + String logFilePath=System.getProperty("logFilePath", "./"); + _file=new FileWriter(logFilePath + "server-" + System.currentTimeMillis() + ".cvs", true); + //Create the initial context Context ctx=new InitialContext(properties); @@ -70,10 +79,11 @@ public class Server // Create a session on the connection // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. - Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + // Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); for (int i=0; i < 50; i++) { + Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic=session.createTopic("topic-" + i); TopicSubscriber dursub=session.createDurableSubscriber(topic, "durable-" + i); dursub.setMessageListener(new MyListener()); @@ -81,11 +91,31 @@ public class Server // Now the messageConsumer is set up we can start the connection connection.start(); - synchronized (connection) - { - connection.wait(); - } - + _logger.info("Ready to consume messages"); + // listen for the termination message + Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queueCompleted=session.createQueue("completed"); + Queue queueStarted=session.createQueue("started"); + MessageProducer prod=session.createProducer(queueStarted); + + Thread logger=new Thread(new MyLogger()); + logger.setDaemon(true); + logger.start(); + + prod.send(session.createTextMessage("start")); + long startTime=System.currentTimeMillis(); + MessageConsumer cons=session.createConsumer(queueCompleted); + cons.receive(); + + _running=false; + + long endTime=System.currentTimeMillis(); + session.close(); + _logger.info("Received " + _numMessages); + _file.write("Received " + _numMessages + "\n"); + _logger.info("Throughput " + _numMessages / (endTime - startTime) * 1000 + "msg/s"); + _file.write("Throughput " + _numMessages / (endTime - startTime) * 1000 + "msg/s"); + _file.close(); } catch (Exception e) { @@ -97,7 +127,45 @@ public class Server { public void onMessage(Message message) { - _logger.debug("Received a message"); + synchronized (_lock) + { + _numMessages++; + /*if(_numMessages % 1000 == 0) + { + _logger.info("received: " + _numMessages); + } */ + } + } + } + + private class MyLogger implements Runnable + { + public void run() + { + long endTime=0; + while (_running) + { + synchronized (_lock) + { + try + { + _lock.wait(5000); + if (_running) + { + endTime=endTime + 5; + String s="Throughput " + _numMessages / endTime + " msg/s"; + _logger.info(s); + _file.write(s + "\n"); + } + + } + catch (Exception e) + { + e.printStackTrace(); + } + + } + } } } } diff --git a/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties b/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties index 070054cf5b..cff5275e36 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties +++ b/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties @@ -18,7 +18,7 @@ # java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory -#connectionfactory.local = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' +connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' # A 0.10 connection factory -connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 +#connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 |
