summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-29 19:07:44 -0800
committerDana Powers <dana.powers@rd.io>2015-12-29 19:07:44 -0800
commite5c7d81e7c35e6b013cece347ef42d9f21d03aa6 (patch)
treefabd6ca3656add5197d0a442598bbe7a13ea9f84
parent8f0d1c1716205d82c8ee2c22baf60413936650c9 (diff)
downloadkafka-python-e5c7d81e7c35e6b013cece347ef42d9f21d03aa6.tar.gz
Use _next_correlation_id() method to avoid int overflows
-rw-r--r--kafka/conn.py12
1 files changed, 8 insertions, 4 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 5afd946..7979ba7 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -143,9 +143,9 @@ class BrokerConnection(object):
return future.failure(Errors.ConnectionError())
if not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests())
- self._correlation_id += 1
+ correlation_id = self._next_correlation_id()
header = RequestHeader(request,
- correlation_id=self._correlation_id,
+ correlation_id=correlation_id,
client_id=self._client_id)
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
@@ -163,11 +163,11 @@ class BrokerConnection(object):
log.exception("Error sending %s to %s", request, self)
self.close(error=e)
return future.failure(e)
- log.debug('%s Request %d: %s', self, self._correlation_id, request)
+ log.debug('%s Request %d: %s', self, correlation_id, request)
if expect_response:
ifr = InFlightRequest(request=request,
- correlation_id=self._correlation_id,
+ correlation_id=correlation_id,
response_type=request.RESPONSE_TYPE,
future=future,
timestamp=time.time())
@@ -299,6 +299,10 @@ class BrokerConnection(object):
return True
return False
+ def _next_correlation_id(self):
+ self._correlation_id = (self._correlation_id + 1) % 2**31
+ return self._correlation_id
+
def __repr__(self):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)