summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-12-29 16:06:24 -0800
committerGitHub <noreply@github.com>2019-12-29 16:06:24 -0800
commit2a86b23f477e5ed57aa987db97d11284a37d05a0 (patch)
tree391d4d2d99db38eb2f9869d76592851bbae57cf2 /test
parent1a91a54688cb77fd77c342e719f24f346d5cee89 (diff)
downloadkafka-python-2a86b23f477e5ed57aa987db97d11284a37d05a0.tar.gz
Optionally return OffsetAndMetadata from consumer.committed(tp) (#1979)
Diffstat (limited to 'test')
-rw-r--r--test/test_coordinator.py4
-rw-r--r--test/test_fetcher.py4
2 files changed, 4 insertions, 4 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 4afdcd9..88ca4c1 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -20,7 +20,7 @@ from kafka.protocol.commit import (
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse)
from kafka.protocol.metadata import MetadataResponse
-from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.util import WeakMethod
@@ -211,7 +211,7 @@ def test_refresh_committed_offsets_if_needed(mocker, coordinator):
assert coordinator._subscription.needs_fetch_committed_offsets is True
coordinator.refresh_committed_offsets_if_needed()
assignment = coordinator._subscription.assignment
- assert assignment[TopicPartition('foobar', 0)].committed == 123
+ assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'')
assert TopicPartition('foobar', 1) not in assignment
assert coordinator._subscription.needs_fetch_committed_offsets is False
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index b61a0f0..697f8be 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -21,7 +21,7 @@ from kafka.errors import (
UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords
-from kafka.structs import TopicPartition
+from kafka.structs import OffsetAndMetadata, TopicPartition
@pytest.fixture
@@ -124,7 +124,7 @@ def test_update_fetch_positions(fetcher, topic, mocker):
fetcher._reset_offset.reset_mock()
fetcher._subscriptions.need_offset_reset(partition)
fetcher._subscriptions.assignment[partition].awaiting_reset = False
- fetcher._subscriptions.assignment[partition].committed = 123
+ fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'')
mocker.patch.object(fetcher._subscriptions, 'seek')
fetcher.update_fetch_positions([partition])
assert fetcher._reset_offset.call_count == 0