summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-03-07 21:34:17 -0800
committerDana Powers <dana.powers@gmail.com>2015-03-07 21:34:17 -0800
commit327b734940098535102b22e5faec731684e9b6ba (patch)
treeebeb802da4aaf2334f2a586ff6ad9c300c9292b2
parente13eb0de8903541ec30050a8e7dd8a168752cea3 (diff)
parentc80fbd1451f0042a28eba537c7b804c92bfbd612 (diff)
downloadkafka-python-327b734940098535102b22e5faec731684e9b6ba.tar.gz
Merge pull request #332 from dpkp/kafka_client_edits
Kafka client edits
-rw-r--r--kafka/client.py12
-rw-r--r--kafka/consumer/kafka.py13
2 files changed, 11 insertions, 14 deletions
diff --git a/kafka/client.py b/kafka/client.py
index f8fe555..48a534e 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -15,6 +15,7 @@ from kafka.common import (TopicAndPartition, BrokerMetadata,
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
+from kafka.util import kafka_bytestring
log = logging.getLogger("kafka")
@@ -30,7 +31,7 @@ class KafkaClient(object):
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
- self.client_id = client_id
+ self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)
@@ -85,7 +86,7 @@ class KafkaClient(object):
self.load_metadata_for_topics(topic)
# If the partition doesn't actually exist, raise
- if partition not in self.topic_partitions[topic]:
+ if partition not in self.topic_partitions.get(topic, []):
raise UnknownTopicOrPartitionError(key)
# If there's no leader for the partition, raise
@@ -177,8 +178,13 @@ class KafkaClient(object):
# Send the request, recv the response
try:
conn.send(requestId, request)
+
+ # decoder_fn=None signal that the server is expected to not
+ # send a response. This probably only applies to
+ # ProduceRequest w/ acks = 0
if decoder_fn is None:
continue
+
try:
response = conn.recv(requestId)
except ConnectionError as e:
@@ -259,7 +265,7 @@ class KafkaClient(object):
def get_partition_ids_for_topic(self, topic):
if topic not in self.topic_partitions:
- return None
+ return []
return sorted(list(self.topic_partitions[topic]))
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 53ba0a7..49ffa7b 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -47,8 +47,6 @@ DEFAULT_CONSUMER_CONFIG = {
'rebalance_backoff_ms': 2000,
}
-BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id')
-
class KafkaConsumer(object):
"""
@@ -171,13 +169,6 @@ class KafkaConsumer(object):
raise KafkaConfigurationError('Unknown configuration key(s): ' +
str(list(configs.keys())))
- # Handle str/bytes conversions
- for config_key in BYTES_CONFIGURATION_KEYS:
- if isinstance(self._config[config_key], six.string_types):
- logger.warning("Converting configuration key '%s' to bytes" %
- config_key)
- self._config[config_key] = self._config[config_key].encode('utf-8')
-
if self._config['auto_commit_enable']:
if not self._config['group_id']:
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
@@ -558,7 +549,7 @@ class KafkaConsumer(object):
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
- resps = self._client.send_offset_commit_request(self._config['group_id'],
+ resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']),
commits,
fail_on_error=False)
@@ -622,7 +613,7 @@ class KafkaConsumer(object):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
- self._config['group_id'],
+ kafka_bytestring(self._config['group_id']),
[OffsetFetchRequest(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try: