diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-12-02 13:33:30 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-12-02 13:33:30 -0800 |
commit | 149d43cf6cfe121347c15b39600299b29b583dc1 (patch) | |
tree | ced113ef0ab7a747f5952cf4888b55b44aead6c1 | |
parent | 0ba9d3d78250c91203147c97246fa166465b183d (diff) | |
parent | 1f857351391f1d50dd57b404cec313e61cf312f3 (diff) | |
download | kafka-python-149d43cf6cfe121347c15b39600299b29b583dc1.tar.gz |
Merge pull request #435 from docker-hub/fix-producer-cleanup-logic
Reworked the if statement logic to only call stop() on not-stopped producer objects
-rw-r--r-- | kafka/producer/base.py | 2 | ||||
-rw-r--r-- | test/test_producer.py | 14 |
2 files changed, 15 insertions, 1 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index d5c013a..e572ab9 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -322,7 +322,7 @@ class Producer(object): self.thread.start() def cleanup(obj): - if obj.stopped: + if not obj.stopped: obj.stop() self._cleanup_func = cleanup atexit.register(cleanup, self) diff --git a/test/test_producer.py b/test/test_producer.py index e681e43..3c026e8 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -111,6 +111,20 @@ class TestKafkaProducer(unittest.TestCase): with self.assertRaises(FailedPayloadsError): producer.send_messages('foobar', b'test message') + def test_cleanup_stop_is_called_on_not_stopped_object(self): + producer = Producer(MagicMock(), async=True) + producer.stopped = True + with patch('kafka.producer.base.Producer.stop') as base_stop: + producer._cleanup_func(producer) + self.assertEqual(base_stop.call_count, 0) + + def test_cleanup_stop_is_not_called_on_stopped_object(self): + producer = Producer(MagicMock(), async=True) + producer.stopped = False + with patch('kafka.producer.base.Producer.stop') as base_stop: + producer._cleanup_func(producer) + self.assertEqual(base_stop.call_count, 1) + class TestKafkaProducerSendUpstream(unittest.TestCase): |