diff options
author | Dana Powers <dana.powers@rd.io> | 2015-03-29 16:45:20 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-03-29 18:46:21 -0700 |
commit | a4b439141b2ef35951e46716696e4c01bb88661c (patch) | |
tree | 3f330c107f16f7303f57ba13a7f424b916123a15 /kafka/client.py | |
parent | fd204dca174033e36899a0e20d2ce7ebccf11ddb (diff) | |
download | kafka-python-a4b439141b2ef35951e46716696e4c01bb88661c.tar.gz |
Rollover KafkaClient correlation ids at 2**31 to keep within int32 protocol encoding
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py index 48a534e..c36cd08 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -2,7 +2,6 @@ import binascii import collections import copy import functools -import itertools import logging import time import kafka.common @@ -23,17 +22,18 @@ log = logging.getLogger("kafka") class KafkaClient(object): CLIENT_ID = b"kafka-python" - ID_GEN = itertools.count() # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a # socket timeout. def __init__(self, hosts, client_id=CLIENT_ID, - timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, + correlation_id=0): # We need one connection to bootstrap self.client_id = kafka_bytestring(client_id) self.timeout = timeout self.hosts = collect_hosts(hosts) + self.correlation_id = correlation_id # create connections only when we need them self.conns = {} @@ -98,10 +98,10 @@ class KafkaClient(object): return self.brokers[meta.leader] def _next_id(self): - """ - Generate a new correlation id - """ - return next(KafkaClient.ID_GEN) + """Generate a new correlation id""" + # modulo to keep w/i int32 + self.correlation_id = (self.correlation_id + 1) % 2**31 + return self.correlation_id def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): """ |