diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 28 |
1 files changed, 19 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py index d0e07d0..9cb4b48 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,11 +1,11 @@ +import collections import copy +import functools +import itertools import logging -import collections - +import time import kafka.common -from functools import partial -from itertools import count from kafka.common import (TopicAndPartition, ConnectionError, FailedPayloadsError, PartitionUnavailableError, @@ -21,7 +21,7 @@ log = logging.getLogger("kafka") class KafkaClient(object): CLIENT_ID = "kafka-python" - ID_GEN = count() + ID_GEN = itertools.count() # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a @@ -213,6 +213,16 @@ class KafkaClient(object): def has_metadata_for_topic(self, topic): return topic in self.topic_partitions + def ensure_topic_exists(self, topic, timeout = 30): + start_time = time.time() + + self.load_metadata_for_topics(topic) + while not self.has_metadata_for_topic(topic): + if time.time() > start_time + timeout: + raise KafkaTimeoutError("Unable to create topic {}".format(topic)) + self.load_metadata_for_topics(topic) + time.sleep(.5) + def close(self): for conn in self.conns.values(): conn.close() @@ -289,7 +299,7 @@ class KafkaClient(object): order of input payloads """ - encoder = partial( + encoder = functools.partial( KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout) @@ -321,7 +331,7 @@ class KafkaClient(object): to the same brokers. """ - encoder = partial(KafkaProtocol.encode_fetch_request, + encoder = functools.partial(KafkaProtocol.encode_fetch_request, max_wait_time=max_wait_time, min_bytes=min_bytes) @@ -359,7 +369,7 @@ class KafkaClient(object): def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): - encoder = partial(KafkaProtocol.encode_offset_commit_request, + encoder = functools.partial(KafkaProtocol.encode_offset_commit_request, group=group) decoder = KafkaProtocol.decode_offset_commit_response resps = self._send_broker_aware_request(payloads, encoder, decoder) @@ -378,7 +388,7 @@ class KafkaClient(object): def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): - encoder = partial(KafkaProtocol.encode_offset_fetch_request, + encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, group=group) decoder = KafkaProtocol.decode_offset_fetch_response resps = self._send_broker_aware_request(payloads, encoder, decoder) |