diff options
author | David Arthur <mumrah@gmail.com> | 2013-07-26 08:39:44 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-07-26 08:39:44 -0400 |
commit | e297a7a8c8f9de341b0e91346236455d12ae7f82 (patch) | |
tree | f4db35ab88fa39684d4ab680428adf815000eaa5 /kafka/client.py | |
parent | 5684af438e6cf871540aa8ea8b556737f56e9798 (diff) | |
parent | 1d278f0f60cb0a7b76fbc6b80c8e112a0deb2e0c (diff) | |
download | kafka-python-e297a7a8c8f9de341b0e91346236455d12ae7f82.tar.gz |
Merge branch 'issue-35'
Conflicts:
kafka/__init__.py
kafka/consumer.py
test/test_integration.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/kafka/client.py b/kafka/client.py index b3f8667..2ec1f1f 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -180,6 +180,10 @@ class KafkaClient(object): for conn in self.conns.values(): conn.close() + def reinit(self): + for conn in self.conns.values(): + conn.reinit() + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ @@ -230,15 +234,19 @@ class KafkaClient(object): return out def send_fetch_request(self, payloads=[], fail_on_error=True, - callback=None): + callback=None, max_wait_time=100, min_bytes=4096): """ Encode and send a FetchRequest Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ - resps = self._send_broker_aware_request(payloads, - KafkaProtocol.encode_fetch_request, + + encoder = partial(KafkaProtocol.encode_fetch_request, + max_wait_time=max_wait_time, + min_bytes=min_bytes) + + resps = self._send_broker_aware_request(payloads, encoder, KafkaProtocol.decode_fetch_response) out = [] |