diff options
author | Dana Powers <dana.powers@rd.io> | 2014-12-11 16:37:25 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:43:45 -0800 |
commit | 29cae3e40b1d89d1a21525864794de7de3700461 (patch) | |
tree | 31ee3fc34ea04eae4bff73506b82f7de81039d9d | |
parent | e3fd29cb37a9d661afcf913dfa2c4552638bb4fd (diff) | |
download | kafka-python-29cae3e40b1d89d1a21525864794de7de3700461.tar.gz |
Add some jitter to refresh_leader_backoff_ms, per wizzat review
-rw-r--r-- | kafka/consumer/kafka.py | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index db86bab..f16b526 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -3,6 +3,7 @@ from __future__ import absolute_import from collections import namedtuple from copy import deepcopy import logging +import random import sys import time @@ -568,7 +569,12 @@ class KafkaConsumer(object): self._topics.append((topic, partition)) def _refresh_metadata_on_error(self): - sleep_ms = self._config['refresh_leader_backoff_ms'] + refresh_ms = self._config['refresh_leader_backoff_ms'] + jitter_pct = 0.20 + sleep_ms = random.randint( + int((1.0 - 0.5 * jitter_pct) * refresh_ms), + int((1.0 + 0.5 * jitter_pct) * refresh_ms) + ) while True: logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms) time.sleep(sleep_ms / 1000.0) |