diff options
| author | Benn Roth <TheAtomicOption@users.noreply.github.com> | 2017-10-21 14:21:55 -0700 | 
|---|---|---|
| committer | Dana Powers <dana.powers@gmail.com> | 2017-10-21 14:21:55 -0700 | 
| commit | faf1749f3866a52b6d659a39dd04d0b635dd6a3d (patch) | |
| tree | bf794aefc70d9b0960ae639ac5418e4e8b9b9bd5 /example.py | |
| parent | 146b893e0fbac21150f74a8ba2f17cc64e1714ad (diff) | |
| download | kafka-python-faf1749f3866a52b6d659a39dd04d0b635dd6a3d.tar.gz | |
Added controlled thread shutdown to example.py (#1268)
Diffstat (limited to 'example.py')
| -rwxr-xr-x | example.py | 40 | 
1 files changed, 32 insertions, 8 deletions
| @@ -6,29 +6,46 @@ from kafka import KafkaConsumer, KafkaProducer  class Producer(threading.Thread): -    daemon = True +    def __init__(self): +        threading.Thread.__init__(self) +        self.stop_event = threading.Event() +         +    def stop(self): +        self.stop_event.set()      def run(self):          producer = KafkaProducer(bootstrap_servers='localhost:9092') -        while True: +        while not self.stop_event.is_set():              producer.send('my-topic', b"test")              producer.send('my-topic', b"\xc2Hola, mundo!")              time.sleep(1) +        producer.close()  class Consumer(multiprocessing.Process): -    daemon = True - +    def __init__(self): +        multiprocessing.Process.__init__(self) +        self.stop_event = multiprocessing.Event() +         +    def stop(self): +        self.stop_event.set() +              def run(self):          consumer = KafkaConsumer(bootstrap_servers='localhost:9092', -                                 auto_offset_reset='earliest') +                                 auto_offset_reset='earliest', +                                 consumer_timeout_ms=1000)          consumer.subscribe(['my-topic']) -        for message in consumer: -            print (message) - +        while not self.stop_event.is_set(): +            for message in consumer: +                print(message) +                if self.stop_event.is_set(): +                    break +        consumer.close() +         +          def main():      tasks = [          Producer(), @@ -39,7 +56,14 @@ def main():          t.start()      time.sleep(10) +     +    for task in tasks: +        task.stop() +    for task in tasks: +        task.join() +         +          if __name__ == "__main__":      logging.basicConfig(          format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', | 
