summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py60
1 files changed, 32 insertions, 28 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 6f5bcdd..47a5b00 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -194,10 +194,10 @@ class KafkaConsumer(object):
elif isinstance(arg, tuple):
topic = kafka_bytestring(arg[0])
partition = arg[1]
+ self._consume_topic_partition(topic, partition)
if len(arg) == 3:
offset = arg[2]
self._offsets.fetch[(topic, partition)] = offset
- self._consume_topic_partition(topic, partition)
# { topic: partitions, ... } dict
elif isinstance(arg, dict):
@@ -224,7 +224,7 @@ class KafkaConsumer(object):
topic = kafka_bytestring(key[0])
partition = key[1]
self._consume_topic_partition(topic, partition)
- self._offsets.fetch[key] = value
+ self._offsets.fetch[(topic, partition)] = value
else:
raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
@@ -312,16 +312,16 @@ class KafkaConsumer(object):
max_wait_time = self._config['fetch_wait_max_ms']
min_bytes = self._config['fetch_min_bytes']
- # Get current fetch offsets
- offsets = self._offsets.fetch
- if not offsets:
- if not self._topics:
- raise KafkaConfigurationError('No topics or partitions configured')
+ if not self._topics:
+ raise KafkaConfigurationError('No topics or partitions configured')
+
+ if not self._offsets.fetch:
raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
- fetches = []
- for topic_partition, offset in six.iteritems(offsets):
- fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
+ fetches = [FetchRequest(topic, partition,
+ self._offsets.fetch[(topic, partition)],
+ max_bytes)
+ for (topic, partition) in self._topics]
# client.send_fetch_request will collect topic/partition requests by leader
# and send each group as a single FetchRequest to the correct broker
@@ -336,49 +336,53 @@ class KafkaConsumer(object):
return
for resp in responses:
- topic_partition = (resp.topic, resp.partition)
+ topic = kafka_bytestring(resp.topic)
+ partition = resp.partition
try:
check_error(resp)
except OffsetOutOfRangeError:
- logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d '
- '(Highwatermark: %d)',
- resp.topic, resp.partition,
- offsets[topic_partition], resp.highwaterMark)
+ logger.warning('OffsetOutOfRange: topic %s, partition %d, '
+ 'offset %d (Highwatermark: %d)',
+ topic, partition,
+ self.offsets._fetch[(topic, partition)],
+ resp.highwaterMark)
# Reset offset
- self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
+ self._offsets.fetch[(topic, partition)] = (
+ self._reset_partition_offset((topic, partition))
+ )
continue
except NotLeaderForPartitionError:
logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
- resp.topic, resp.partition)
+ topic, partition)
self._refresh_metadata_on_error()
continue
except RequestTimedOutError:
logger.warning("RequestTimedOutError for %s - %d",
- resp.topic, resp.partition)
+ topic, partition)
continue
# Track server highwater mark
- self._offsets.highwater[topic_partition] = resp.highwaterMark
+ self._offsets.highwater[(topic, partition)] = resp.highwaterMark
# Yield each message
# Kafka-python could raise an exception during iteration
# we are not catching -- user will need to address
for (offset, message) in resp.messages:
# deserializer_class could raise an exception here
- msg = KafkaMessage(resp.topic,
- resp.partition,
- offset, message.key,
- self._config['deserializer_class'](message.value))
-
- if offset < self._offsets.fetch[topic_partition]:
- logger.debug('Skipping message %s because its offset is less than the consumer offset',
- msg)
+ val = self._config['deserializer_class'](message.value)
+ msg = KafkaMessage(topic, partition, offset, message.key, val)
+
+ # in some cases the server will return earlier messages
+ # than we requested. skip them per kafka spec
+ if offset < self._offsets.fetch[(topic, partition)]:
+ logger.debug('message offset less than fetched offset '
+ 'skipping: %s', msg)
continue
# Only increment fetch offset if we safely got the message and deserialized
- self._offsets.fetch[topic_partition] = offset + 1
+ self._offsets.fetch[(topic, partition)] = offset + 1
# Then yield to user
yield msg