diff options
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 16 |
1 files changed, 7 insertions, 9 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index fa70124..cdf8760 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -17,7 +17,6 @@ from kafka.common import ( OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError ) -from kafka.util import kafka_bytestring logger = logging.getLogger(__name__) @@ -193,14 +192,14 @@ class KafkaConsumer(object): # Topic name str -- all partitions if isinstance(arg, (six.string_types, six.binary_type)): - topic = kafka_bytestring(arg) + topic = arg for partition in self._client.get_partition_ids_for_topic(topic): self._consume_topic_partition(topic, partition) # (topic, partition [, offset]) tuple elif isinstance(arg, tuple): - topic = kafka_bytestring(arg[0]) + topic = arg[0] partition = arg[1] self._consume_topic_partition(topic, partition) if len(arg) == 3: @@ -213,7 +212,7 @@ class KafkaConsumer(object): # key can be string (a topic) if isinstance(key, (six.string_types, six.binary_type)): - topic = kafka_bytestring(key) + topic = key # topic: partition if isinstance(value, int): @@ -231,7 +230,7 @@ class KafkaConsumer(object): # (topic, partition): offset elif isinstance(key, tuple): - topic = kafka_bytestring(key[0]) + topic = key[0] partition = key[1] self._consume_topic_partition(topic, partition) self._offsets.fetch[(topic, partition)] = value @@ -354,7 +353,7 @@ class KafkaConsumer(object): self._refresh_metadata_on_error() continue - topic = kafka_bytestring(resp.topic) + topic = resp.topic partition = resp.partition try: check_error(resp) @@ -553,7 +552,7 @@ class KafkaConsumer(object): if commits: logger.info('committing consumer offsets to group %s', self._config['group_id']) resps = self._client.send_offset_commit_request( - kafka_bytestring(self._config['group_id']), commits, + self._config['group_id'], commits, fail_on_error=False ) @@ -577,7 +576,6 @@ class KafkaConsumer(object): # def _consume_topic_partition(self, topic, partition): - topic = kafka_bytestring(topic) if not isinstance(partition, int): raise KafkaConfigurationError('Unknown partition type (%s) ' '-- expected int' % type(partition)) @@ -617,7 +615,7 @@ class KafkaConsumer(object): logger.info("Consumer fetching stored offsets") for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( - kafka_bytestring(self._config['group_id']), + self._config['group_id'], [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])], fail_on_error=False) try: |