summaryrefslogtreecommitdiff
path: root/test/test_context.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_context.py')
-rw-r--r--test/test_context.py117
1 files changed, 117 insertions, 0 deletions
diff --git a/test/test_context.py b/test/test_context.py
new file mode 100644
index 0000000..8c3fd85
--- /dev/null
+++ b/test/test_context.py
@@ -0,0 +1,117 @@
+"""
+OffsetCommitContext tests.
+"""
+from unittest2 import TestCase
+
+from mock import MagicMock, patch
+
+from kafka.common import OffsetOutOfRangeError
+from kafka.context import OffsetCommitContext
+
+
+class TestOffsetCommitContext(TestCase):
+ """
+ OffsetCommitContext tests.
+ """
+
+ def setUp(self):
+ self.client = MagicMock()
+ self.consumer = MagicMock()
+ self.topic = "topic"
+ self.group = "group"
+ self.partition = 0
+ self.consumer.topic = self.topic
+ self.consumer.group = self.group
+ self.consumer.client = self.client
+ self.consumer.offsets = {self.partition: 0}
+ self.context = OffsetCommitContext(self.consumer)
+
+ def test_noop(self):
+ """
+ Should revert consumer after context exit with no mark() call.
+ """
+ with self.context:
+ # advance offset
+ self.consumer.offsets = {self.partition: 1}
+
+ # offset restored
+ self.assertEqual(self.consumer.offsets, {self.partition: 0})
+ # and seek called with relative zero delta
+ self.assertEqual(self.consumer.seek.call_count, 1)
+ self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+ def test_mark(self):
+ """
+ Should remain at marked location ater context exit.
+ """
+ with self.context as context:
+ context.mark(self.partition, 0)
+ # advance offset
+ self.consumer.offsets = {self.partition: 1}
+
+ # offset sent to client
+ self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
+
+ # offset remains advanced
+ self.assertEqual(self.consumer.offsets, {self.partition: 1})
+
+ # and seek called with relative zero delta
+ self.assertEqual(self.consumer.seek.call_count, 1)
+ self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+ def test_mark_multiple(self):
+ """
+ Should remain at highest marked location after context exit.
+ """
+ with self.context as context:
+ context.mark(self.partition, 0)
+ context.mark(self.partition, 1)
+ context.mark(self.partition, 2)
+ # advance offset
+ self.consumer.offsets = {self.partition: 3}
+
+ # offset sent to client
+ self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
+
+ # offset remains advanced
+ self.assertEqual(self.consumer.offsets, {self.partition: 3})
+
+ # and seek called with relative zero delta
+ self.assertEqual(self.consumer.seek.call_count, 1)
+ self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+ def test_rollback(self):
+ """
+ Should rollback to initial offsets on context exit with exception.
+ """
+ with self.assertRaises(Exception):
+ with self.context as context:
+ context.mark(self.partition, 0)
+ # advance offset
+ self.consumer.offsets = {self.partition: 1}
+
+ raise Exception("Intentional failure")
+
+ # offset rolled back (ignoring mark)
+ self.assertEqual(self.consumer.offsets, {self.partition: 0})
+
+ # and seek called with relative zero delta
+ self.assertEqual(self.consumer.seek.call_count, 1)
+ self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+ def test_out_of_range(self):
+ """
+ Should reset to beginning of valid offsets on `OffsetOutOfRangeError`
+ """
+ def _seek(offset, whence):
+ # seek must be called with 0, 0 to find the beginning of the range
+ self.assertEqual(offset, 0)
+ self.assertEqual(whence, 0)
+ # set offsets to something different
+ self.consumer.offsets = {self.partition: 100}
+
+ with patch.object(self.consumer, "seek", _seek):
+ with self.context:
+ raise OffsetOutOfRangeError()
+
+ self.assertEqual(self.consumer.offsets, {self.partition: 100})