summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py8
1 files changed, 7 insertions, 1 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index db86bab..f16b526 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import
from collections import namedtuple
from copy import deepcopy
import logging
+import random
import sys
import time
@@ -568,7 +569,12 @@ class KafkaConsumer(object):
self._topics.append((topic, partition))
def _refresh_metadata_on_error(self):
- sleep_ms = self._config['refresh_leader_backoff_ms']
+ refresh_ms = self._config['refresh_leader_backoff_ms']
+ jitter_pct = 0.20
+ sleep_ms = random.randint(
+ int((1.0 - 0.5 * jitter_pct) * refresh_ms),
+ int((1.0 + 0.5 * jitter_pct) * refresh_ms)
+ )
while True:
logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
time.sleep(sleep_ms / 1000.0)