diff options
author | David Arthur <mumrah@gmail.com> | 2013-06-01 00:08:16 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-06-01 00:10:47 -0400 |
commit | acd388e4eab100b5cf23481780f0d836ca0c21fd (patch) | |
tree | dd21e726de91356689c6063057199418b7b16f2c /test/unit2.py | |
parent | 2c257eeb1f02748840a8f4535d8d2a88ef5235f2 (diff) | |
download | kafka-python-issue-26.tar.gz |
Starting work on fixing offset commitsissue-26
* Update the "public" offset before yielding the message
* Add an option to SimpleConsumer.commit that excludes the current
offset
Ref #26
Diffstat (limited to 'test/unit2.py')
-rw-r--r-- | test/unit2.py | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/test/unit2.py b/test/unit2.py new file mode 100644 index 0000000..562bb76 --- /dev/null +++ b/test/unit2.py @@ -0,0 +1,56 @@ +import os +import random +import struct +import unittest + +from kafka.consumer import SimpleConsumer +from kafka.client import KafkaClient +from kafka.common import ( + ErrorMapping, OffsetAndMessage, OffsetCommitResponse, Message, FetchResponse +) + +class MockClient(object): + def __init__(self): + self.topic_partitions = {"topic": [0]} + self.mock_committed_offsets = {} + + def send_fetch_request(self, reqs): + resps = [] + for req in reqs: + msgs = [OffsetAndMessage(0, Message(0, 0, "key", "value1")), + OffsetAndMessage(1, Message(0, 0, "key", "value2")), + OffsetAndMessage(2, Message(0, 0, "key", "value3")), + OffsetAndMessage(3, Message(0, 0, "key", "value4")), + OffsetAndMessage(4, Message(0, 0, "key", "value5"))] + resp = FetchResponse(req.topic, req.partition, ErrorMapping.NO_ERROR, 0, msgs) + resps.append(resp) + return resps + + def send_offset_commit_request(self, group, reqs): + resps = [] + for req in reqs: + self.mock_committed_offsets[(req.topic, req.partition)] = req.offset + resp = OffsetCommitResponse(req.topic, req.partition, ErrorMapping.NO_ERROR) + resps.append(resp) + return resps + + def _load_metadata_for_topics(self, topic): + pass + +class TestConsumer(unittest.TestCase): + def test_offsets(self): + client = MockClient() + consumer = SimpleConsumer(client, "group", "topic", auto_commit=False) + it = iter(consumer) + m = it.next() + self.assertEquals(m.offset, 0) + self.assertEquals(consumer.offsets[0], 0) + m = it.next() + self.assertEquals(m.offset, 1) + self.assertEquals(consumer.offsets[0], 1) + consumer.commit() + + print(client.mock_committed_offsets) + +if __name__ == '__main__': + unittest.main() |