diff options
-rw-r--r-- | kafka/conn.py | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 5afd946..7979ba7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -143,9 +143,9 @@ class BrokerConnection(object): return future.failure(Errors.ConnectionError()) if not self.can_send_more(): return future.failure(Errors.TooManyInFlightRequests()) - self._correlation_id += 1 + correlation_id = self._next_correlation_id() header = RequestHeader(request, - correlation_id=self._correlation_id, + correlation_id=correlation_id, client_id=self._client_id) message = b''.join([header.encode(), request.encode()]) size = Int32.encode(len(message)) @@ -163,11 +163,11 @@ class BrokerConnection(object): log.exception("Error sending %s to %s", request, self) self.close(error=e) return future.failure(e) - log.debug('%s Request %d: %s', self, self._correlation_id, request) + log.debug('%s Request %d: %s', self, correlation_id, request) if expect_response: ifr = InFlightRequest(request=request, - correlation_id=self._correlation_id, + correlation_id=correlation_id, response_type=request.RESPONSE_TYPE, future=future, timestamp=time.time()) @@ -299,6 +299,10 @@ class BrokerConnection(object): return True return False + def _next_correlation_id(self): + self._correlation_id = (self._correlation_id + 1) % 2**31 + return self._correlation_id + def __repr__(self): return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) |