diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:15:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:17:43 -0700 |
commit | fff812ddc80243208233f785b3f005904cf33482 (patch) | |
tree | 30f1eb703dcd8ce1063c413fd70ac11f3fff5072 /kafka/consumer.py | |
parent | 42a7ab18bb84fea60deed5f7e3a6cfdfaaaeecd6 (diff) | |
parent | 0dabb1fbe8a9f538527a03c2903475ed77a12c10 (diff) | |
download | kafka-python-fff812ddc80243208233f785b3f005904cf33482.tar.gz |
Merge pull request #223 from dpkp/metadata_refactor
Metadata Refactor
* add MetadataRequest and MetadataResponse namedtuples
* add TopicMetadata namedtuple
* add error codes to Topic and Partition Metadata
* add KafkaClient.send_metadata_request() method
* KafkaProtocol.decode_metadata_response changed to return a
MetadataResponse object so that it is consistent with server api:
[broker_list, topic_list]
* raise server exceptions in load_metadata_for_topics(*topics)
unless topics is null (full refresh)
* Replace non-standard exceptions (LeaderUnavailable,
PartitionUnavailable) with server standard exceptions
(LeaderNotAvailableError, UnknownTopicOrPartitionError)
Conflicts:
kafka/client.py
test/test_client.py
test/test_producer_integration.py
test/test_protocol.py
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 11 |
1 files changed, 5 insertions, 6 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index fa1b8bc..42628e1 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -91,7 +91,7 @@ class Consumer(object): self.offsets = {} if not partitions: - partitions = self.client.topic_partitions[topic] + partitions = self.client.get_partition_ids_for_topic(topic) else: assert all(isinstance(x, numbers.Integral) for x in partitions) @@ -117,9 +117,9 @@ class Consumer(object): def fetch_last_known_offsets(self, partitions=None): if not partitions: - partitions = self.client.topic_partitions[self.topic] + partitions = self.client.get_partition_ids_for_topic(self.topic) - def get_or_init_offset_callback(resp): + def get_or_init_offset(resp): try: kafka.common.check_error(resp) return resp.offset @@ -128,10 +128,9 @@ class Consumer(object): for partition in partitions: req = OffsetFetchRequest(self.topic, partition) - (offset,) = self.client.send_offset_fetch_request(self.group, [req], - callback=get_or_init_offset_callback, + (resp,) = self.client.send_offset_fetch_request(self.group, [req], fail_on_error=False) - self.offsets[partition] = offset + self.offsets[partition] = get_or_init_offset(resp) self.fetch_offsets = self.offsets.copy() def commit(self, partitions=None): |