diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-12-29 12:02:10 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-12-29 12:02:10 -0800 |
commit | 8f8deea7731e26b4cdee3ce5ae8bfdd8e3f57d3a (patch) | |
tree | 40bdde2881753565601a8754870e26f71a164af1 | |
parent | cf28da8420b007d836b0db9d865d74fee89cdbcd (diff) | |
download | kafka-python-issue-1082-offset-and-metadata.tar.gz |
Optionally return OffsetAndMetadata from consumer.committed(tp)issue-1082-offset-and-metadata
-rw-r--r-- | kafka/consumer/fetcher.py | 2 | ||||
-rw-r--r-- | kafka/consumer/group.py | 15 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 2 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 6 | ||||
-rw-r--r-- | test/test_coordinator.py | 4 | ||||
-rw-r--r-- | test/test_fetcher.py | 4 |
6 files changed, 20 insertions, 13 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f9d96b0..5cb25f2 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -185,7 +185,7 @@ class Fetcher(six.Iterator): self._subscriptions.need_offset_reset(tp) self._reset_offset(tp) else: - committed = self._subscriptions.assignment[tp].committed + committed = self._subscriptions.assignment[tp].committed.offset log.debug("Resetting offset for partition %s to the committed" " offset %s", tp, committed) self._subscriptions.seek(tp, committed) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index cde956c..89a0292 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -525,7 +525,7 @@ class KafkaConsumer(six.Iterator): offsets = self._subscription.all_consumed_offsets() self._coordinator.commit_offsets_sync(offsets) - def committed(self, partition): + def committed(self, partition, metadata=False): """Get the last committed offset for the given partition. This offset will be used as the position for the consumer @@ -537,9 +537,11 @@ class KafkaConsumer(six.Iterator): Arguments: partition (TopicPartition): The partition to check. + metadata (bool, optional): If True, return OffsetAndMetadata struct + instead of offset int. Default: False. Returns: - The last committed offset, or None if there was no prior commit. + The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit. """ assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1' assert self.config['group_id'] is not None, 'Requires group_id' @@ -553,10 +555,15 @@ class KafkaConsumer(six.Iterator): else: commit_map = self._coordinator.fetch_committed_offsets([partition]) if partition in commit_map: - committed = commit_map[partition].offset + committed = commit_map[partition] else: committed = None - return committed + + if committed is not None: + if metadata: + return committed + else: + return committed.offset def _fetch_all_topic_metadata(self): """A blocking call that fetches topic metadata for all topics in the diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 76a6c50..08842d1 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -374,7 +374,7 @@ class SubscriptionState(object): class TopicPartitionState(object): def __init__(self): - self.committed = None # last committed position + self.committed = None # last committed OffsetAndMetadata self.has_valid_position = False # whether we have valid position self.paused = False # whether this partition has been paused by the user self.awaiting_reset = False # whether we are awaiting reset diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 30337c3..fda80aa 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -387,7 +387,7 @@ class ConsumerCoordinator(BaseCoordinator): for partition, offset in six.iteritems(offsets): # verify assignment is still active if self._subscription.is_assigned(partition): - self._subscription.assignment[partition].committed = offset.offset + self._subscription.assignment[partition].committed = offset self._subscription.needs_fetch_committed_offsets = False def fetch_committed_offsets(self, partitions): @@ -641,7 +641,7 @@ class ConsumerCoordinator(BaseCoordinator): log.debug("Group %s committed offset %s for partition %s", self.group_id, offset, tp) if self._subscription.is_assigned(tp): - self._subscription.assignment[tp].committed = offset.offset + self._subscription.assignment[tp].committed = offset elif error_type is Errors.GroupAuthorizationFailedError: log.error("Not authorized to commit offsets for group %s", self.group_id) @@ -704,7 +704,7 @@ class ConsumerCoordinator(BaseCoordinator): partitions (list of TopicPartition): the partitions to fetch Returns: - Future: resolves to dict of offsets: {TopicPartition: int} + Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata} """ assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 4afdcd9..88ca4c1 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -20,7 +20,7 @@ from kafka.protocol.commit import ( OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse) from kafka.protocol.metadata import MetadataResponse -from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import WeakMethod @@ -211,7 +211,7 @@ def test_refresh_committed_offsets_if_needed(mocker, coordinator): assert coordinator._subscription.needs_fetch_committed_offsets is True coordinator.refresh_committed_offsets_if_needed() assignment = coordinator._subscription.assignment - assert assignment[TopicPartition('foobar', 0)].committed == 123 + assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'') assert TopicPartition('foobar', 1) not in assignment assert coordinator._subscription.needs_fetch_committed_offsets is False diff --git a/test/test_fetcher.py b/test/test_fetcher.py index b61a0f0..697f8be 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -21,7 +21,7 @@ from kafka.errors import ( UnknownTopicOrPartitionError, OffsetOutOfRangeError ) from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords -from kafka.structs import TopicPartition +from kafka.structs import OffsetAndMetadata, TopicPartition @pytest.fixture @@ -124,7 +124,7 @@ def test_update_fetch_positions(fetcher, topic, mocker): fetcher._reset_offset.reset_mock() fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False - fetcher._subscriptions.assignment[partition].committed = 123 + fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'') mocker.patch.object(fetcher._subscriptions, 'seek') fetcher.update_fetch_positions([partition]) assert fetcher._reset_offset.call_count == 0 |