diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
commit | 828377377da43749af0d27ee256ef31bf714cf17 (patch) | |
tree | fbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /kafka/consumer/base.py | |
parent | 71e7568fcb8132899f366b37c32645fd5a40dc4b (diff) | |
parent | 9a8af1499ca425366d934487469d9977fae7fe5f (diff) | |
download | kafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz |
Merge branch '0.9'
Conflicts:
kafka/codec.py
kafka/version.py
test/test_producer.py
test/test_producer_integration.py
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r-- | kafka/consumer/base.py | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index c9f6e48..2059d92 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -7,11 +7,11 @@ from threading import Lock import kafka.common from kafka.common import ( - OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, + OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, UnknownTopicOrPartitionError, check_error, KafkaError ) -from kafka.util import kafka_bytestring, ReentrantTimer +from kafka.util import ReentrantTimer log = logging.getLogger('kafka.consumer') @@ -47,8 +47,8 @@ class Consumer(object): auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client - self.topic = kafka_bytestring(topic) - self.group = None if group is None else kafka_bytestring(group) + self.topic = topic + self.group = group self.client.load_metadata_for_topics(topic) self.offsets = {} @@ -94,14 +94,14 @@ class Consumer(object): def fetch_last_known_offsets(self, partitions=None): if self.group is None: - raise ValueError('KafkaClient.group must not be None') + raise ValueError('SimpleClient.group must not be None') if partitions is None: partitions = self.client.get_partition_ids_for_topic(self.topic) responses = self.client.send_offset_fetch_request( self.group, - [OffsetFetchRequest(self.topic, p) for p in partitions], + [OffsetFetchRequestPayload(self.topic, p) for p in partitions], fail_on_error=False ) @@ -155,7 +155,7 @@ class Consumer(object): 'group=%s, topic=%s, partition=%s', offset, self.group, self.topic, partition) - reqs.append(OffsetCommitRequest(self.topic, partition, + reqs.append(OffsetCommitRequestPayload(self.topic, partition, offset, None)) try: @@ -197,7 +197,8 @@ class Consumer(object): # ValueError on list.remove() if the exithandler no longer # exists is fine here try: - atexit._exithandlers.remove((self._cleanup_func, (self,), {})) + atexit._exithandlers.remove( # pylint: disable=no-member + (self._cleanup_func, (self,), {})) except ValueError: pass @@ -217,7 +218,7 @@ class Consumer(object): reqs = [] for partition in partitions: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) resps = self.client.send_offset_request(reqs) for resp in resps: |