diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:31:48 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:31:48 -0800 |
commit | 17d6a68f6ececea3b4d0290dc84c4c2fc9508e9a (patch) | |
tree | 72674dc39221f351e9de1f0c0a6a86ae583d0dfb | |
parent | d0de279459a92e787730f5c85a2cf6f2741cbd97 (diff) | |
download | kafka-python-17d6a68f6ececea3b4d0290dc84c4c2fc9508e9a.tar.gz |
Add client_id and correlation_id to BrokerConnection constructor kwargs
-rw-r--r-- | kafka/conn.py | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index ab44073..84a72aa 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -21,14 +21,16 @@ DEFAULT_KAFKA_PORT = 9092 class BrokerConnection(local): - def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, + client_id='kafka-python-0.10.0', correlation_id=0): super(BrokerConnection, self).__init__() self.host = host self.port = port self.timeout = timeout self._write_fd = None self._read_fd = None - self.correlation_id = 0 + self.correlation_id = correlation_id + self.client_id = client_id self.in_flight_requests = deque() def connect(self): @@ -63,7 +65,9 @@ class BrokerConnection(local): if not self.connected() and not self.connect(): return None self.correlation_id += 1 - header = RequestHeader(request, correlation_id=self.correlation_id) + header = RequestHeader(request, + correlation_id=self.correlation_id, + client_id=self.client_id) message = b''.join([header.encode(), request.encode()]) size = Int32.encode(len(message)) try: |