diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py index 48a534e..c36cd08 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -2,7 +2,6 @@ import binascii import collections import copy import functools -import itertools import logging import time import kafka.common @@ -23,17 +22,18 @@ log = logging.getLogger("kafka") class KafkaClient(object): CLIENT_ID = b"kafka-python" - ID_GEN = itertools.count() # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a # socket timeout. def __init__(self, hosts, client_id=CLIENT_ID, - timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, + correlation_id=0): # We need one connection to bootstrap self.client_id = kafka_bytestring(client_id) self.timeout = timeout self.hosts = collect_hosts(hosts) + self.correlation_id = correlation_id # create connections only when we need them self.conns = {} @@ -98,10 +98,10 @@ class KafkaClient(object): return self.brokers[meta.leader] def _next_id(self): - """ - Generate a new correlation id - """ - return next(KafkaClient.ID_GEN) + """Generate a new correlation id""" + # modulo to keep w/i int32 + self.correlation_id = (self.correlation_id + 1) % 2**31 + return self.correlation_id def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): """ |