summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-02 13:33:30 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-02 13:33:30 -0800
commit149d43cf6cfe121347c15b39600299b29b583dc1 (patch)
treeced113ef0ab7a747f5952cf4888b55b44aead6c1
parent0ba9d3d78250c91203147c97246fa166465b183d (diff)
parent1f857351391f1d50dd57b404cec313e61cf312f3 (diff)
downloadkafka-python-149d43cf6cfe121347c15b39600299b29b583dc1.tar.gz
Merge pull request #435 from docker-hub/fix-producer-cleanup-logic
Reworked the if statement logic to only call stop() on not-stopped producer objects
-rw-r--r--kafka/producer/base.py2
-rw-r--r--test/test_producer.py14
2 files changed, 15 insertions, 1 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index d5c013a..e572ab9 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -322,7 +322,7 @@ class Producer(object):
self.thread.start()
def cleanup(obj):
- if obj.stopped:
+ if not obj.stopped:
obj.stop()
self._cleanup_func = cleanup
atexit.register(cleanup, self)
diff --git a/test/test_producer.py b/test/test_producer.py
index e681e43..3c026e8 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -111,6 +111,20 @@ class TestKafkaProducer(unittest.TestCase):
with self.assertRaises(FailedPayloadsError):
producer.send_messages('foobar', b'test message')
+ def test_cleanup_stop_is_called_on_not_stopped_object(self):
+ producer = Producer(MagicMock(), async=True)
+ producer.stopped = True
+ with patch('kafka.producer.base.Producer.stop') as base_stop:
+ producer._cleanup_func(producer)
+ self.assertEqual(base_stop.call_count, 0)
+
+ def test_cleanup_stop_is_not_called_on_stopped_object(self):
+ producer = Producer(MagicMock(), async=True)
+ producer.stopped = False
+ with patch('kafka.producer.base.Producer.stop') as base_stop:
+ producer._cleanup_func(producer)
+ self.assertEqual(base_stop.call_count, 1)
+
class TestKafkaProducerSendUpstream(unittest.TestCase):