diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-05-22 12:06:38 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-05-22 12:06:38 -0700 |
commit | 35a14e18c631508e195f9377a6b5a4861966b3a2 (patch) | |
tree | bdff5da1110b9b97150571e46280dbe76307b49d /kafka/client.py | |
parent | ae6b49aca13d2d1df7e7f884b2a99c34aa839e18 (diff) | |
download | kafka-python-35a14e18c631508e195f9377a6b5a4861966b3a2.tar.gz |
Handle New Topic Creation
Adds ensure_topic_exists to KafkaClient, redirects test case to use
that. Fixes #113 and fixes #150.
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 28 |
1 files changed, 19 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py index d0e07d0..9cb4b48 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,11 +1,11 @@ +import collections import copy +import functools +import itertools import logging -import collections - +import time import kafka.common -from functools import partial -from itertools import count from kafka.common import (TopicAndPartition, ConnectionError, FailedPayloadsError, PartitionUnavailableError, @@ -21,7 +21,7 @@ log = logging.getLogger("kafka") class KafkaClient(object): CLIENT_ID = "kafka-python" - ID_GEN = count() + ID_GEN = itertools.count() # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a @@ -213,6 +213,16 @@ class KafkaClient(object): def has_metadata_for_topic(self, topic): return topic in self.topic_partitions + def ensure_topic_exists(self, topic, timeout = 30): + start_time = time.time() + + self.load_metadata_for_topics(topic) + while not self.has_metadata_for_topic(topic): + if time.time() > start_time + timeout: + raise KafkaTimeoutError("Unable to create topic {}".format(topic)) + self.load_metadata_for_topics(topic) + time.sleep(.5) + def close(self): for conn in self.conns.values(): conn.close() @@ -289,7 +299,7 @@ class KafkaClient(object): order of input payloads """ - encoder = partial( + encoder = functools.partial( KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout) @@ -321,7 +331,7 @@ class KafkaClient(object): to the same brokers. """ - encoder = partial(KafkaProtocol.encode_fetch_request, + encoder = functools.partial(KafkaProtocol.encode_fetch_request, max_wait_time=max_wait_time, min_bytes=min_bytes) @@ -359,7 +369,7 @@ class KafkaClient(object): def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): - encoder = partial(KafkaProtocol.encode_offset_commit_request, + encoder = functools.partial(KafkaProtocol.encode_offset_commit_request, group=group) decoder = KafkaProtocol.decode_offset_commit_response resps = self._send_broker_aware_request(payloads, encoder, decoder) @@ -378,7 +388,7 @@ class KafkaClient(object): def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): - encoder = partial(KafkaProtocol.encode_offset_fetch_request, + encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, group=group) decoder = KafkaProtocol.decode_offset_fetch_response resps = self._send_broker_aware_request(payloads, encoder, decoder) |