summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-07-26 08:39:44 -0400
committerDavid Arthur <mumrah@gmail.com>2013-07-26 08:39:44 -0400
commite297a7a8c8f9de341b0e91346236455d12ae7f82 (patch)
treef4db35ab88fa39684d4ab680428adf815000eaa5 /kafka/client.py
parent5684af438e6cf871540aa8ea8b556737f56e9798 (diff)
parent1d278f0f60cb0a7b76fbc6b80c8e112a0deb2e0c (diff)
downloadkafka-python-e297a7a8c8f9de341b0e91346236455d12ae7f82.tar.gz
Merge branch 'issue-35'
Conflicts: kafka/__init__.py kafka/consumer.py test/test_integration.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py14
1 files changed, 11 insertions, 3 deletions
diff --git a/kafka/client.py b/kafka/client.py
index b3f8667..2ec1f1f 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -180,6 +180,10 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.close()
+ def reinit(self):
+ for conn in self.conns.values():
+ conn.reinit()
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
@@ -230,15 +234,19 @@ class KafkaClient(object):
return out
def send_fetch_request(self, payloads=[], fail_on_error=True,
- callback=None):
+ callback=None, max_wait_time=100, min_bytes=4096):
"""
Encode and send a FetchRequest
Payloads are grouped by topic and partition so they can be pipelined
to the same brokers.
"""
- resps = self._send_broker_aware_request(payloads,
- KafkaProtocol.encode_fetch_request,
+
+ encoder = partial(KafkaProtocol.encode_fetch_request,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes)
+
+ resps = self._send_broker_aware_request(payloads, encoder,
KafkaProtocol.decode_fetch_response)
out = []