diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 12:41:00 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 14:05:50 -0700 |
commit | 7a2ec3332b0a83dcaaab4a402db13ed9d56d89e8 (patch) | |
tree | 2dfcb14558f5fbdbe242fc9e593bb7575ab65e86 /benchmarks/producer_performance.py | |
parent | bea1a2adacc662abe2b041bc38bfc452bb12caab (diff) | |
download | kafka-python-benchmarks.tar.gz |
Adapt benchmark scripts from https://github.com/mrafayaleem/kafka-jythonbenchmarks
Diffstat (limited to 'benchmarks/producer_performance.py')
-rwxr-xr-x | benchmarks/producer_performance.py | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/benchmarks/producer_performance.py b/benchmarks/producer_performance.py new file mode 100755 index 0000000..e958735 --- /dev/null +++ b/benchmarks/producer_performance.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# Adapted from https://github.com/mrafayaleem/kafka-jython + +from __future__ import absolute_import, print_function + +import argparse +import pprint +import sys +import threading +import traceback + +from kafka import KafkaProducer +from test.fixtures import KafkaFixture, ZookeeperFixture + + +def start_brokers(n): + print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n)) + print('-> 1 Zookeeper') + zk = ZookeeperFixture.instance() + print('---> {0}:{1}'.format(zk.host, zk.port)) + print() + + partitions = min(n, 3) + replicas = min(n, 3) + print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas)) + brokers = [ + KafkaFixture.instance(i, zk.host, zk.port, zk_chroot='', + partitions=partitions, replicas=replicas) + for i in range(n) + ] + for broker in brokers: + print('---> {0}:{1}'.format(broker.host, broker.port)) + print() + return brokers + + +class ProducerPerformance(object): + + @staticmethod + def run(args): + try: + props = {} + for prop in args.producer_config: + k, v = prop.split('=') + try: + v = int(v) + except ValueError: + pass + if v == 'None': + v = None + props[k] = v + + if args.brokers: + brokers = start_brokers(args.brokers) + props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port) + for broker in brokers] + print("---> bootstrap_servers={0}".format(props['bootstrap_servers'])) + print() + print('-> OK!') + print() + + print('Initializing producer...') + record = bytes(bytearray(args.record_size)) + props['metrics_sample_window_ms'] = args.stats_interval * 1000 + + producer = KafkaProducer(**props) + for k, v in props.items(): + print('---> {0}={1}'.format(k, v)) + print('---> send {0} byte records'.format(args.record_size)) + print('---> report stats every {0} secs'.format(args.stats_interval)) + print('---> raw metrics? {0}'.format(args.raw_metrics)) + timer_stop = threading.Event() + timer = StatsReporter(args.stats_interval, producer, + event=timer_stop, + raw_metrics=args.raw_metrics) + timer.start() + print('-> OK!') + print() + + for i in xrange(args.num_records): + producer.send(topic=args.topic, value=record) + producer.flush() + + timer_stop.set() + + except Exception: + exc_info = sys.exc_info() + traceback.print_exception(*exc_info) + sys.exit(1) + + +class StatsReporter(threading.Thread): + def __init__(self, interval, producer, event=None, raw_metrics=False): + super(StatsReporter, self).__init__() + self.interval = interval + self.producer = producer + self.event = event + self.raw_metrics = raw_metrics + + def print_stats(self): + metrics = self.producer.metrics() + if self.raw_metrics: + pprint.pprint(metrics) + else: + print('{record-send-rate} records/sec ({byte-rate} B/sec),' + ' {request-latency-avg} latency,' + ' {record-size-avg} record size,' + ' {batch-size-avg} batch size,' + ' {records-per-request-avg} records/req' + .format(**metrics['producer-metrics'])) + + def print_final(self): + self.print_stats() + + def run(self): + while self.event and not self.event.wait(self.interval): + self.print_stats() + else: + self.print_final() + + +def get_args_parser(): + parser = argparse.ArgumentParser( + description='This tool is used to verify the producer performance.') + + parser.add_argument( + '--topic', type=str, + help='Topic name for test', + default='kafka-python-benchmark-test') + parser.add_argument( + '--num-records', type=long, + help='number of messages to produce', + default=1000000) + parser.add_argument( + '--record-size', type=int, + help='message size in bytes', + default=100) + parser.add_argument( + '--producer-config', type=str, nargs='+', default=(), + help='kafka producer related configuaration properties like ' + 'bootstrap_servers,client_id etc..') + parser.add_argument( + '--brokers', type=int, + help='Number of kafka brokers to start', + default=0) + parser.add_argument( + '--stats-interval', type=int, + help='Interval in seconds for stats reporting to console', + default=5) + parser.add_argument( + '--raw-metrics', action='store_true', + help='Enable this flag to print full metrics dict on each interval') + return parser + + +if __name__ == '__main__': + args = get_args_parser().parse_args() + ProducerPerformance.run(args) |