summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-12-11 16:37:25 -0800
committerDana Powers <dana.powers@rd.io>2014-12-15 12:43:45 -0800
commit29cae3e40b1d89d1a21525864794de7de3700461 (patch)
tree31ee3fc34ea04eae4bff73506b82f7de81039d9d
parente3fd29cb37a9d661afcf913dfa2c4552638bb4fd (diff)
downloadkafka-python-29cae3e40b1d89d1a21525864794de7de3700461.tar.gz
Add some jitter to refresh_leader_backoff_ms, per wizzat review
-rw-r--r--kafka/consumer/kafka.py8
1 files changed, 7 insertions, 1 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index db86bab..f16b526 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import
from collections import namedtuple
from copy import deepcopy
import logging
+import random
import sys
import time
@@ -568,7 +569,12 @@ class KafkaConsumer(object):
self._topics.append((topic, partition))
def _refresh_metadata_on_error(self):
- sleep_ms = self._config['refresh_leader_backoff_ms']
+ refresh_ms = self._config['refresh_leader_backoff_ms']
+ jitter_pct = 0.20
+ sleep_ms = random.randint(
+ int((1.0 - 0.5 * jitter_pct) * refresh_ms),
+ int((1.0 + 0.5 * jitter_pct) * refresh_ms)
+ )
while True:
logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
time.sleep(sleep_ms / 1000.0)