summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-05-22 12:06:38 -0700
committerMark Roberts <wizzat@gmail.com>2014-05-22 12:06:38 -0700
commit35a14e18c631508e195f9377a6b5a4861966b3a2 (patch)
treebdff5da1110b9b97150571e46280dbe76307b49d /kafka/client.py
parentae6b49aca13d2d1df7e7f884b2a99c34aa839e18 (diff)
downloadkafka-python-35a14e18c631508e195f9377a6b5a4861966b3a2.tar.gz
Handle New Topic Creation
Adds ensure_topic_exists to KafkaClient, redirects test case to use that. Fixes #113 and fixes #150.
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py28
1 files changed, 19 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py
index d0e07d0..9cb4b48 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,11 +1,11 @@
+import collections
import copy
+import functools
+import itertools
import logging
-import collections
-
+import time
import kafka.common
-from functools import partial
-from itertools import count
from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError,
@@ -21,7 +21,7 @@ log = logging.getLogger("kafka")
class KafkaClient(object):
CLIENT_ID = "kafka-python"
- ID_GEN = count()
+ ID_GEN = itertools.count()
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
@@ -213,6 +213,16 @@ class KafkaClient(object):
def has_metadata_for_topic(self, topic):
return topic in self.topic_partitions
+ def ensure_topic_exists(self, topic, timeout = 30):
+ start_time = time.time()
+
+ self.load_metadata_for_topics(topic)
+ while not self.has_metadata_for_topic(topic):
+ if time.time() > start_time + timeout:
+ raise KafkaTimeoutError("Unable to create topic {}".format(topic))
+ self.load_metadata_for_topics(topic)
+ time.sleep(.5)
+
def close(self):
for conn in self.conns.values():
conn.close()
@@ -289,7 +299,7 @@ class KafkaClient(object):
order of input payloads
"""
- encoder = partial(
+ encoder = functools.partial(
KafkaProtocol.encode_produce_request,
acks=acks,
timeout=timeout)
@@ -321,7 +331,7 @@ class KafkaClient(object):
to the same brokers.
"""
- encoder = partial(KafkaProtocol.encode_fetch_request,
+ encoder = functools.partial(KafkaProtocol.encode_fetch_request,
max_wait_time=max_wait_time,
min_bytes=min_bytes)
@@ -359,7 +369,7 @@ class KafkaClient(object):
def send_offset_commit_request(self, group, payloads=[],
fail_on_error=True, callback=None):
- encoder = partial(KafkaProtocol.encode_offset_commit_request,
+ encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
group=group)
decoder = KafkaProtocol.decode_offset_commit_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
@@ -378,7 +388,7 @@ class KafkaClient(object):
def send_offset_fetch_request(self, group, payloads=[],
fail_on_error=True, callback=None):
- encoder = partial(KafkaProtocol.encode_offset_fetch_request,
+ encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
group=group)
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)