summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py14
1 files changed, 11 insertions, 3 deletions
diff --git a/kafka/client.py b/kafka/client.py
index b3f8667..2ec1f1f 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -180,6 +180,10 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.close()
+ def reinit(self):
+ for conn in self.conns.values():
+ conn.reinit()
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
@@ -230,15 +234,19 @@ class KafkaClient(object):
return out
def send_fetch_request(self, payloads=[], fail_on_error=True,
- callback=None):
+ callback=None, max_wait_time=100, min_bytes=4096):
"""
Encode and send a FetchRequest
Payloads are grouped by topic and partition so they can be pipelined
to the same brokers.
"""
- resps = self._send_broker_aware_request(payloads,
- KafkaProtocol.encode_fetch_request,
+
+ encoder = partial(KafkaProtocol.encode_fetch_request,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes)
+
+ resps = self._send_broker_aware_request(payloads, encoder,
KafkaProtocol.decode_fetch_response)
out = []