summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py37
1 files changed, 18 insertions, 19 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 3ef106c..29ddd0e 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -9,14 +9,14 @@ import time
import six
-from kafka.client import KafkaClient
+from kafka import SimpleClient
from kafka.common import (
- OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
+ OffsetFetchRequestPayload, OffsetCommitRequestPayload,
+ OffsetRequestPayload, FetchRequestPayload,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
)
-from kafka.util import kafka_bytestring
logger = logging.getLogger(__name__)
@@ -136,7 +136,7 @@ class KafkaConsumer(object):
'bootstrap_servers required to configure KafkaConsumer'
)
- self._client = KafkaClient(
+ self._client = SimpleClient(
self._config['bootstrap_servers'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0)
@@ -192,14 +192,14 @@ class KafkaConsumer(object):
# Topic name str -- all partitions
if isinstance(arg, (six.string_types, six.binary_type)):
- topic = kafka_bytestring(arg)
+ topic = arg
for partition in self._client.get_partition_ids_for_topic(topic):
self._consume_topic_partition(topic, partition)
# (topic, partition [, offset]) tuple
elif isinstance(arg, tuple):
- topic = kafka_bytestring(arg[0])
+ topic = arg[0]
partition = arg[1]
self._consume_topic_partition(topic, partition)
if len(arg) == 3:
@@ -212,7 +212,7 @@ class KafkaConsumer(object):
# key can be string (a topic)
if isinstance(key, (six.string_types, six.binary_type)):
- topic = kafka_bytestring(key)
+ topic = key
# topic: partition
if isinstance(value, int):
@@ -230,7 +230,7 @@ class KafkaConsumer(object):
# (topic, partition): offset
elif isinstance(key, tuple):
- topic = kafka_bytestring(key[0])
+ topic = key[0]
partition = key[1]
self._consume_topic_partition(topic, partition)
self._offsets.fetch[(topic, partition)] = value
@@ -333,9 +333,9 @@ class KafkaConsumer(object):
'No fetch offsets found when calling fetch_messages'
)
- fetches = [FetchRequest(topic, partition,
- self._offsets.fetch[(topic, partition)],
- max_bytes)
+ fetches = [FetchRequestPayload(topic, partition,
+ self._offsets.fetch[(topic, partition)],
+ max_bytes)
for (topic, partition) in self._topics]
# send_fetch_request will batch topic/partition requests by leader
@@ -353,7 +353,7 @@ class KafkaConsumer(object):
self._refresh_metadata_on_error()
continue
- topic = kafka_bytestring(resp.topic)
+ topic = resp.topic
partition = resp.partition
try:
check_error(resp)
@@ -425,7 +425,7 @@ class KafkaConsumer(object):
topic / partition. See:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
"""
- reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
+ reqs = [OffsetRequestPayload(topic, partition, request_time_ms, max_num_offsets)]
(resp,) = self._client.send_offset_request(reqs)
@@ -545,14 +545,14 @@ class KafkaConsumer(object):
continue
commits.append(
- OffsetCommitRequest(topic_partition[0], topic_partition[1],
+ OffsetCommitRequestPayload(topic_partition[0], topic_partition[1],
commit_offset, metadata)
)
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
resps = self._client.send_offset_commit_request(
- kafka_bytestring(self._config['group_id']), commits,
+ self._config['group_id'], commits,
fail_on_error=False
)
@@ -576,7 +576,6 @@ class KafkaConsumer(object):
#
def _consume_topic_partition(self, topic, partition):
- topic = kafka_bytestring(topic)
if not isinstance(partition, int):
raise KafkaConfigurationError('Unknown partition type (%s) '
'-- expected int' % type(partition))
@@ -616,8 +615,8 @@ class KafkaConsumer(object):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
- kafka_bytestring(self._config['group_id']),
- [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
+ self._config['group_id'],
+ [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try:
check_error(resp)
@@ -665,7 +664,7 @@ class KafkaConsumer(object):
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
- raise # pylint: disable-msg=E0704
+ raise # pylint: disable=E0704
(offset, ) = self.get_partition_offsets(topic, partition,
request_time_ms, max_num_offsets=1)