diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_coordinator.py | 4 | ||||
-rw-r--r-- | test/test_fetcher.py | 4 |
2 files changed, 4 insertions, 4 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 4afdcd9..88ca4c1 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -20,7 +20,7 @@ from kafka.protocol.commit import ( OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse) from kafka.protocol.metadata import MetadataResponse -from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import WeakMethod @@ -211,7 +211,7 @@ def test_refresh_committed_offsets_if_needed(mocker, coordinator): assert coordinator._subscription.needs_fetch_committed_offsets is True coordinator.refresh_committed_offsets_if_needed() assignment = coordinator._subscription.assignment - assert assignment[TopicPartition('foobar', 0)].committed == 123 + assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'') assert TopicPartition('foobar', 1) not in assignment assert coordinator._subscription.needs_fetch_committed_offsets is False diff --git a/test/test_fetcher.py b/test/test_fetcher.py index b61a0f0..697f8be 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -21,7 +21,7 @@ from kafka.errors import ( UnknownTopicOrPartitionError, OffsetOutOfRangeError ) from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords -from kafka.structs import TopicPartition +from kafka.structs import OffsetAndMetadata, TopicPartition @pytest.fixture @@ -124,7 +124,7 @@ def test_update_fetch_positions(fetcher, topic, mocker): fetcher._reset_offset.reset_mock() fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False - fetcher._subscriptions.assignment[partition].committed = 123 + fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'') mocker.patch.object(fetcher._subscriptions, 'seek') fetcher.update_fetch_positions([partition]) assert fetcher._reset_offset.call_count == 0 |