summaryrefslogtreecommitdiff
path: root/test/unit2.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-06-01 00:08:16 -0400
committerDavid Arthur <mumrah@gmail.com>2013-06-01 00:10:47 -0400
commitacd388e4eab100b5cf23481780f0d836ca0c21fd (patch)
treedd21e726de91356689c6063057199418b7b16f2c /test/unit2.py
parent2c257eeb1f02748840a8f4535d8d2a88ef5235f2 (diff)
downloadkafka-python-issue-26.tar.gz
Starting work on fixing offset commitsissue-26
* Update the "public" offset before yielding the message * Add an option to SimpleConsumer.commit that excludes the current offset Ref #26
Diffstat (limited to 'test/unit2.py')
-rw-r--r--test/unit2.py56
1 files changed, 56 insertions, 0 deletions
diff --git a/test/unit2.py b/test/unit2.py
new file mode 100644
index 0000000..562bb76
--- /dev/null
+++ b/test/unit2.py
@@ -0,0 +1,56 @@
+import os
+import random
+import struct
+import unittest
+
+from kafka.consumer import SimpleConsumer
+from kafka.client import KafkaClient
+from kafka.common import (
+ ErrorMapping, OffsetAndMessage, OffsetCommitResponse, Message, FetchResponse
+)
+
+class MockClient(object):
+ def __init__(self):
+ self.topic_partitions = {"topic": [0]}
+ self.mock_committed_offsets = {}
+
+ def send_fetch_request(self, reqs):
+ resps = []
+ for req in reqs:
+ msgs = [OffsetAndMessage(0, Message(0, 0, "key", "value1")),
+ OffsetAndMessage(1, Message(0, 0, "key", "value2")),
+ OffsetAndMessage(2, Message(0, 0, "key", "value3")),
+ OffsetAndMessage(3, Message(0, 0, "key", "value4")),
+ OffsetAndMessage(4, Message(0, 0, "key", "value5"))]
+ resp = FetchResponse(req.topic, req.partition, ErrorMapping.NO_ERROR, 0, msgs)
+ resps.append(resp)
+ return resps
+
+ def send_offset_commit_request(self, group, reqs):
+ resps = []
+ for req in reqs:
+ self.mock_committed_offsets[(req.topic, req.partition)] = req.offset
+ resp = OffsetCommitResponse(req.topic, req.partition, ErrorMapping.NO_ERROR)
+ resps.append(resp)
+ return resps
+
+ def _load_metadata_for_topics(self, topic):
+ pass
+
+class TestConsumer(unittest.TestCase):
+ def test_offsets(self):
+ client = MockClient()
+ consumer = SimpleConsumer(client, "group", "topic", auto_commit=False)
+ it = iter(consumer)
+ m = it.next()
+ self.assertEquals(m.offset, 0)
+ self.assertEquals(consumer.offsets[0], 0)
+ m = it.next()
+ self.assertEquals(m.offset, 1)
+ self.assertEquals(consumer.offsets[0], 1)
+ consumer.commit()
+
+ print(client.mock_committed_offsets)
+
+if __name__ == '__main__':
+ unittest.main()