diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
commit | 828377377da43749af0d27ee256ef31bf714cf17 (patch) | |
tree | fbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /kafka/consumer/kafka.py | |
parent | 71e7568fcb8132899f366b37c32645fd5a40dc4b (diff) | |
parent | 9a8af1499ca425366d934487469d9977fae7fe5f (diff) | |
download | kafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz |
Merge branch '0.9'
Conflicts:
kafka/codec.py
kafka/version.py
test/test_producer.py
test/test_producer_integration.py
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 37 |
1 files changed, 18 insertions, 19 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 3ef106c..29ddd0e 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -9,14 +9,14 @@ import time import six -from kafka.client import KafkaClient +from kafka import SimpleClient from kafka.common import ( - OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, + OffsetFetchRequestPayload, OffsetCommitRequestPayload, + OffsetRequestPayload, FetchRequestPayload, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError ) -from kafka.util import kafka_bytestring logger = logging.getLogger(__name__) @@ -136,7 +136,7 @@ class KafkaConsumer(object): 'bootstrap_servers required to configure KafkaConsumer' ) - self._client = KafkaClient( + self._client = SimpleClient( self._config['bootstrap_servers'], client_id=self._config['client_id'], timeout=(self._config['socket_timeout_ms'] / 1000.0) @@ -192,14 +192,14 @@ class KafkaConsumer(object): # Topic name str -- all partitions if isinstance(arg, (six.string_types, six.binary_type)): - topic = kafka_bytestring(arg) + topic = arg for partition in self._client.get_partition_ids_for_topic(topic): self._consume_topic_partition(topic, partition) # (topic, partition [, offset]) tuple elif isinstance(arg, tuple): - topic = kafka_bytestring(arg[0]) + topic = arg[0] partition = arg[1] self._consume_topic_partition(topic, partition) if len(arg) == 3: @@ -212,7 +212,7 @@ class KafkaConsumer(object): # key can be string (a topic) if isinstance(key, (six.string_types, six.binary_type)): - topic = kafka_bytestring(key) + topic = key # topic: partition if isinstance(value, int): @@ -230,7 +230,7 @@ class KafkaConsumer(object): # (topic, partition): offset elif isinstance(key, tuple): - topic = kafka_bytestring(key[0]) + topic = key[0] partition = key[1] self._consume_topic_partition(topic, partition) self._offsets.fetch[(topic, partition)] = value @@ -333,9 +333,9 @@ class KafkaConsumer(object): 'No fetch offsets found when calling fetch_messages' ) - fetches = [FetchRequest(topic, partition, - self._offsets.fetch[(topic, partition)], - max_bytes) + fetches = [FetchRequestPayload(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) for (topic, partition) in self._topics] # send_fetch_request will batch topic/partition requests by leader @@ -353,7 +353,7 @@ class KafkaConsumer(object): self._refresh_metadata_on_error() continue - topic = kafka_bytestring(resp.topic) + topic = resp.topic partition = resp.partition try: check_error(resp) @@ -425,7 +425,7 @@ class KafkaConsumer(object): topic / partition. See: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI """ - reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] + reqs = [OffsetRequestPayload(topic, partition, request_time_ms, max_num_offsets)] (resp,) = self._client.send_offset_request(reqs) @@ -545,14 +545,14 @@ class KafkaConsumer(object): continue commits.append( - OffsetCommitRequest(topic_partition[0], topic_partition[1], + OffsetCommitRequestPayload(topic_partition[0], topic_partition[1], commit_offset, metadata) ) if commits: logger.info('committing consumer offsets to group %s', self._config['group_id']) resps = self._client.send_offset_commit_request( - kafka_bytestring(self._config['group_id']), commits, + self._config['group_id'], commits, fail_on_error=False ) @@ -576,7 +576,6 @@ class KafkaConsumer(object): # def _consume_topic_partition(self, topic, partition): - topic = kafka_bytestring(topic) if not isinstance(partition, int): raise KafkaConfigurationError('Unknown partition type (%s) ' '-- expected int' % type(partition)) @@ -616,8 +615,8 @@ class KafkaConsumer(object): logger.info("Consumer fetching stored offsets") for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( - kafka_bytestring(self._config['group_id']), - [OffsetFetchRequest(topic_partition[0], topic_partition[1])], + self._config['group_id'], + [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])], fail_on_error=False) try: check_error(resp) @@ -665,7 +664,7 @@ class KafkaConsumer(object): # Otherwise we should re-raise the upstream exception # b/c it typically includes additional data about # the request that triggered it, and we do not want to drop that - raise # pylint: disable-msg=E0704 + raise # pylint: disable=E0704 (offset, ) = self.get_partition_offsets(topic, partition, request_time_ms, max_num_offsets=1) |