diff options
-rw-r--r-- | kafka/consumer/kafka.py | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index db86bab..f16b526 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -3,6 +3,7 @@ from __future__ import absolute_import from collections import namedtuple from copy import deepcopy import logging +import random import sys import time @@ -568,7 +569,12 @@ class KafkaConsumer(object): self._topics.append((topic, partition)) def _refresh_metadata_on_error(self): - sleep_ms = self._config['refresh_leader_backoff_ms'] + refresh_ms = self._config['refresh_leader_backoff_ms'] + jitter_pct = 0.20 + sleep_ms = random.randint( + int((1.0 - 0.5 * jitter_pct) * refresh_ms), + int((1.0 + 0.5 * jitter_pct) * refresh_ms) + ) while True: logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms) time.sleep(sleep_ms / 1000.0) |