summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py12
1 files changed, 9 insertions, 3 deletions
diff --git a/kafka/client.py b/kafka/client.py
index f8fe555..48a534e 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -15,6 +15,7 @@ from kafka.common import (TopicAndPartition, BrokerMetadata,
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
+from kafka.util import kafka_bytestring
log = logging.getLogger("kafka")
@@ -30,7 +31,7 @@ class KafkaClient(object):
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
- self.client_id = client_id
+ self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)
@@ -85,7 +86,7 @@ class KafkaClient(object):
self.load_metadata_for_topics(topic)
# If the partition doesn't actually exist, raise
- if partition not in self.topic_partitions[topic]:
+ if partition not in self.topic_partitions.get(topic, []):
raise UnknownTopicOrPartitionError(key)
# If there's no leader for the partition, raise
@@ -177,8 +178,13 @@ class KafkaClient(object):
# Send the request, recv the response
try:
conn.send(requestId, request)
+
+ # decoder_fn=None signal that the server is expected to not
+ # send a response. This probably only applies to
+ # ProduceRequest w/ acks = 0
if decoder_fn is None:
continue
+
try:
response = conn.recv(requestId)
except ConnectionError as e:
@@ -259,7 +265,7 @@ class KafkaClient(object):
def get_partition_ids_for_topic(self, topic):
if topic not in self.topic_partitions:
- return None
+ return []
return sorted(list(self.topic_partitions[topic]))