diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-03-24 22:21:13 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:47 +0300 |
commit | 5e8dc6dcf55890a4e3a214a943ecc655faed3ecc (patch) | |
tree | f5ec10c67ec6c0e48971e2f973f4768807693ca7 | |
parent | b0a04595c6aee7f6fcaa8927fcdfcd9a04a9b7d3 (diff) | |
download | kafka-python-5e8dc6dcf55890a4e3a214a943ecc655faed3ecc.tar.gz |
Fixed tests and other issues after rebase
-rw-r--r-- | kafka/common.py | 1 | ||||
-rw-r--r-- | kafka/producer/base.py | 2 | ||||
-rw-r--r-- | test/test_producer.py | 49 |
3 files changed, 20 insertions, 32 deletions
diff --git a/kafka/common.py b/kafka/common.py index b3380d7..5c2b788 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -6,7 +6,6 @@ from collections import namedtuple # Structs # ############### -<<<<<<< HEAD # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI MetadataRequest = namedtuple("MetadataRequest", ["topics"]) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 44ffdf4..9bfe98b 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -86,7 +86,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, acks=req_acks, timeout=ack_timeout) except FailedPayloadsError as ex: - failed_reqs = ex.args[0] + failed_reqs = ex.failed_payloads log.exception("Failed payloads count %s" % len(failed_reqs)) # if no limit, retry all failed messages until success diff --git a/test/test_producer.py b/test/test_producer.py index 51a74b5..cc58fe4 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -12,11 +12,10 @@ from kafka.producer.base import _send_upstream from kafka.protocol import CODEC_NONE import threading -import multiprocessing as mp try: - from queue import Empty + from queue import Empty, Queue except ImportError: - from Queue import Empty + from Queue import Empty, Queue class TestKafkaProducer(unittest.TestCase): @@ -56,21 +55,13 @@ class TestKafkaProducer(unittest.TestCase): class TestKafkaProducerSendUpstream(unittest.TestCase): def setUp(self): - - # create a multiprocessing Value to store call counter - # (magicmock counters don't work with other processes) - self.send_calls_count = mp.Value('i', 0) - - def send_side_effect(*args, **kwargs): - self.send_calls_count.value += 1 - self.client = MagicMock() - self.client.send_produce_request.side_effect = send_side_effect - self.queue = mp.Queue() + self.queue = Queue() def _run_process(self, retries_limit=3, sleep_timeout=1): # run _send_upstream process with the queue - self.process = mp.Process( + stop_event = threading.Event() + self.thread = threading.Thread( target=_send_upstream, args=(self.queue, self.client, CODEC_NONE, 0.3, # batch time (seconds) @@ -78,11 +69,12 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): Producer.ACK_AFTER_LOCAL_WRITE, Producer.DEFAULT_ACK_TIMEOUT, 50, # retry backoff (ms) - retries_limit)) - self.process.daemon = True - self.process.start() + retries_limit, + stop_event)) + self.thread.daemon = True + self.thread.start() time.sleep(sleep_timeout) - self.process.terminate() + stop_event.set() def test_wo_retries(self): @@ -97,7 +89,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # there should be 4 non-void cals: # 3 batches of 3 msgs each + 1 batch of 1 message - self.assertEqual(self.send_calls_count.value, 4) + self.assertEqual(self.client.send_produce_request.call_count, 4) + def test_first_send_failed(self): @@ -106,11 +99,10 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): for i in range(10): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) - is_first_time = mp.Value('b', True) + self.client.is_first_time = True def send_side_effect(reqs, *args, **kwargs): - self.send_calls_count.value += 1 - if is_first_time.value: - is_first_time.value = False + if self.client.is_first_time: + self.client.is_first_time = False raise FailedPayloadsError(reqs) self.client.send_produce_request.side_effect = send_side_effect @@ -122,7 +114,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # there should be 5 non-void cals: 1st failed batch of 3 msgs # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 - self.assertEqual(self.send_calls_count.value, 5) + self.assertEqual(self.client.send_produce_request.call_count, 5) def test_with_limited_retries(self): @@ -132,7 +124,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) def send_side_effect(reqs, *args, **kwargs): - self.send_calls_count.value += 1 raise FailedPayloadsError(reqs) self.client.send_produce_request.side_effect = send_side_effect @@ -145,8 +136,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # there should be 16 non-void cals: # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed - self.assertEqual(self.send_calls_count.value, 16) - + self.assertEqual(self.client.send_produce_request.call_count, 16) def test_with_unlimited_retries(self): @@ -156,7 +146,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) def send_side_effect(reqs, *args, **kwargs): - self.send_calls_count.value += 1 raise FailedPayloadsError(reqs) self.client.send_produce_request.side_effect = send_side_effect @@ -174,5 +163,5 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): self.assertEqual(self.queue.empty(), True) # 1s / 50ms of backoff = 20 times max - self.assertTrue(self.send_calls_count.value > 10) - self.assertTrue(self.send_calls_count.value <= 20) + calls = self.client.send_produce_request.call_count + self.assertTrue(calls > 10 & calls <= 20) |