summaryrefslogtreecommitdiff
path: root/kafka/consumer/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r--kafka/consumer/base.py19
1 files changed, 10 insertions, 9 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index c9f6e48..2059d92 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -7,11 +7,11 @@ from threading import Lock
import kafka.common
from kafka.common import (
- OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
+ OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
UnknownTopicOrPartitionError, check_error, KafkaError
)
-from kafka.util import kafka_bytestring, ReentrantTimer
+from kafka.util import ReentrantTimer
log = logging.getLogger('kafka.consumer')
@@ -47,8 +47,8 @@ class Consumer(object):
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
self.client = client
- self.topic = kafka_bytestring(topic)
- self.group = None if group is None else kafka_bytestring(group)
+ self.topic = topic
+ self.group = group
self.client.load_metadata_for_topics(topic)
self.offsets = {}
@@ -94,14 +94,14 @@ class Consumer(object):
def fetch_last_known_offsets(self, partitions=None):
if self.group is None:
- raise ValueError('KafkaClient.group must not be None')
+ raise ValueError('SimpleClient.group must not be None')
if partitions is None:
partitions = self.client.get_partition_ids_for_topic(self.topic)
responses = self.client.send_offset_fetch_request(
self.group,
- [OffsetFetchRequest(self.topic, p) for p in partitions],
+ [OffsetFetchRequestPayload(self.topic, p) for p in partitions],
fail_on_error=False
)
@@ -155,7 +155,7 @@ class Consumer(object):
'group=%s, topic=%s, partition=%s',
offset, self.group, self.topic, partition)
- reqs.append(OffsetCommitRequest(self.topic, partition,
+ reqs.append(OffsetCommitRequestPayload(self.topic, partition,
offset, None))
try:
@@ -197,7 +197,8 @@ class Consumer(object):
# ValueError on list.remove() if the exithandler no longer
# exists is fine here
try:
- atexit._exithandlers.remove((self._cleanup_func, (self,), {}))
+ atexit._exithandlers.remove( # pylint: disable=no-member
+ (self._cleanup_func, (self,), {}))
except ValueError:
pass
@@ -217,7 +218,7 @@ class Consumer(object):
reqs = []
for partition in partitions:
- reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+ reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))
resps = self.client.send_offset_request(reqs)
for resp in resps: