diff options
Diffstat (limited to 'test/test_context.py')
-rw-r--r-- | test/test_context.py | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/test/test_context.py b/test/test_context.py new file mode 100644 index 0000000..8c3fd85 --- /dev/null +++ b/test/test_context.py @@ -0,0 +1,117 @@ +""" +OffsetCommitContext tests. +""" +from unittest2 import TestCase + +from mock import MagicMock, patch + +from kafka.common import OffsetOutOfRangeError +from kafka.context import OffsetCommitContext + + +class TestOffsetCommitContext(TestCase): + """ + OffsetCommitContext tests. + """ + + def setUp(self): + self.client = MagicMock() + self.consumer = MagicMock() + self.topic = "topic" + self.group = "group" + self.partition = 0 + self.consumer.topic = self.topic + self.consumer.group = self.group + self.consumer.client = self.client + self.consumer.offsets = {self.partition: 0} + self.context = OffsetCommitContext(self.consumer) + + def test_noop(self): + """ + Should revert consumer after context exit with no mark() call. + """ + with self.context: + # advance offset + self.consumer.offsets = {self.partition: 1} + + # offset restored + self.assertEqual(self.consumer.offsets, {self.partition: 0}) + # and seek called with relative zero delta + self.assertEqual(self.consumer.seek.call_count, 1) + self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) + + def test_mark(self): + """ + Should remain at marked location ater context exit. + """ + with self.context as context: + context.mark(self.partition, 0) + # advance offset + self.consumer.offsets = {self.partition: 1} + + # offset sent to client + self.assertEqual(self.client.send_offset_commit_request.call_count, 1) + + # offset remains advanced + self.assertEqual(self.consumer.offsets, {self.partition: 1}) + + # and seek called with relative zero delta + self.assertEqual(self.consumer.seek.call_count, 1) + self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) + + def test_mark_multiple(self): + """ + Should remain at highest marked location after context exit. + """ + with self.context as context: + context.mark(self.partition, 0) + context.mark(self.partition, 1) + context.mark(self.partition, 2) + # advance offset + self.consumer.offsets = {self.partition: 3} + + # offset sent to client + self.assertEqual(self.client.send_offset_commit_request.call_count, 1) + + # offset remains advanced + self.assertEqual(self.consumer.offsets, {self.partition: 3}) + + # and seek called with relative zero delta + self.assertEqual(self.consumer.seek.call_count, 1) + self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) + + def test_rollback(self): + """ + Should rollback to initial offsets on context exit with exception. + """ + with self.assertRaises(Exception): + with self.context as context: + context.mark(self.partition, 0) + # advance offset + self.consumer.offsets = {self.partition: 1} + + raise Exception("Intentional failure") + + # offset rolled back (ignoring mark) + self.assertEqual(self.consumer.offsets, {self.partition: 0}) + + # and seek called with relative zero delta + self.assertEqual(self.consumer.seek.call_count, 1) + self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) + + def test_out_of_range(self): + """ + Should reset to beginning of valid offsets on `OffsetOutOfRangeError` + """ + def _seek(offset, whence): + # seek must be called with 0, 0 to find the beginning of the range + self.assertEqual(offset, 0) + self.assertEqual(whence, 0) + # set offsets to something different + self.consumer.offsets = {self.partition: 100} + + with patch.object(self.consumer, "seek", _seek): + with self.context: + raise OffsetOutOfRangeError() + + self.assertEqual(self.consumer.offsets, {self.partition: 100}) |