summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-09 15:31:48 -0800
committerDana Powers <dana.powers@rd.io>2015-12-09 15:31:48 -0800
commit17d6a68f6ececea3b4d0290dc84c4c2fc9508e9a (patch)
tree72674dc39221f351e9de1f0c0a6a86ae583d0dfb
parentd0de279459a92e787730f5c85a2cf6f2741cbd97 (diff)
downloadkafka-python-17d6a68f6ececea3b4d0290dc84c4c2fc9508e9a.tar.gz
Add client_id and correlation_id to BrokerConnection constructor kwargs
-rw-r--r--kafka/conn.py10
1 files changed, 7 insertions, 3 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index ab44073..84a72aa 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -21,14 +21,16 @@ DEFAULT_KAFKA_PORT = 9092
class BrokerConnection(local):
- def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
+ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
+ client_id='kafka-python-0.10.0', correlation_id=0):
super(BrokerConnection, self).__init__()
self.host = host
self.port = port
self.timeout = timeout
self._write_fd = None
self._read_fd = None
- self.correlation_id = 0
+ self.correlation_id = correlation_id
+ self.client_id = client_id
self.in_flight_requests = deque()
def connect(self):
@@ -63,7 +65,9 @@ class BrokerConnection(local):
if not self.connected() and not self.connect():
return None
self.correlation_id += 1
- header = RequestHeader(request, correlation_id=self.correlation_id)
+ header = RequestHeader(request,
+ correlation_id=self.correlation_id,
+ client_id=self.client_id)
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
try: