summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-08 13:15:56 -0700
committerDana Powers <dana.powers@rd.io>2014-09-08 13:17:43 -0700
commitfff812ddc80243208233f785b3f005904cf33482 (patch)
tree30f1eb703dcd8ce1063c413fd70ac11f3fff5072 /kafka/consumer.py
parent42a7ab18bb84fea60deed5f7e3a6cfdfaaaeecd6 (diff)
parent0dabb1fbe8a9f538527a03c2903475ed77a12c10 (diff)
downloadkafka-python-fff812ddc80243208233f785b3f005904cf33482.tar.gz
Merge pull request #223 from dpkp/metadata_refactor
Metadata Refactor * add MetadataRequest and MetadataResponse namedtuples * add TopicMetadata namedtuple * add error codes to Topic and Partition Metadata * add KafkaClient.send_metadata_request() method * KafkaProtocol.decode_metadata_response changed to return a MetadataResponse object so that it is consistent with server api: [broker_list, topic_list] * raise server exceptions in load_metadata_for_topics(*topics) unless topics is null (full refresh) * Replace non-standard exceptions (LeaderUnavailable, PartitionUnavailable) with server standard exceptions (LeaderNotAvailableError, UnknownTopicOrPartitionError) Conflicts: kafka/client.py test/test_client.py test/test_producer_integration.py test/test_protocol.py
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py11
1 files changed, 5 insertions, 6 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index fa1b8bc..42628e1 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -91,7 +91,7 @@ class Consumer(object):
self.offsets = {}
if not partitions:
- partitions = self.client.topic_partitions[topic]
+ partitions = self.client.get_partition_ids_for_topic(topic)
else:
assert all(isinstance(x, numbers.Integral) for x in partitions)
@@ -117,9 +117,9 @@ class Consumer(object):
def fetch_last_known_offsets(self, partitions=None):
if not partitions:
- partitions = self.client.topic_partitions[self.topic]
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
- def get_or_init_offset_callback(resp):
+ def get_or_init_offset(resp):
try:
kafka.common.check_error(resp)
return resp.offset
@@ -128,10 +128,9 @@ class Consumer(object):
for partition in partitions:
req = OffsetFetchRequest(self.topic, partition)
- (offset,) = self.client.send_offset_fetch_request(self.group, [req],
- callback=get_or_init_offset_callback,
+ (resp,) = self.client.send_offset_fetch_request(self.group, [req],
fail_on_error=False)
- self.offsets[partition] = offset
+ self.offsets[partition] = get_or_init_offset(resp)
self.fetch_offsets = self.offsets.copy()
def commit(self, partitions=None):