diff options
Diffstat (limited to 'test/test_transaction.py')
-rw-r--r-- | test/test_transaction.py | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/test/test_transaction.py b/test/test_transaction.py new file mode 100644 index 0000000..e3d13c8 --- /dev/null +++ b/test/test_transaction.py @@ -0,0 +1,117 @@ +""" +KafkaTransaction tests. +""" +from unittest2 import TestCase + +from mock import MagicMock, patch + +from kafka.common import OffsetOutOfRangeError +from kafka.transaction import KafkaTransaction + + +class TestKafkaTransaction(TestCase): + """ + KafkaTransaction tests. + """ + + def setUp(self): + self.client = MagicMock() + self.consumer = MagicMock() + self.topic = "topic" + self.group = "group" + self.partition = 0 + self.consumer.topic = self.topic + self.consumer.group = self.group + self.consumer.client = self.client + self.consumer.offsets = {self.partition: 0} + self.transaction = KafkaTransaction(self.consumer) + + def test_noop(self): + """ + Should revert consumer after transaction with no mark() call. + """ + with self.transaction: + # advance offset + self.consumer.offsets = {self.partition: 1} + + # offset restored + self.assertEqual(self.consumer.offsets, {self.partition: 0}) + # and seek called with relative zero delta + self.assertEqual(self.consumer.seek.call_count, 1) + self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) + + def test_mark(self): + """ + Should remain at marked location. + """ + with self.transaction as transaction: + transaction.mark(self.partition, 0) + # advance offset + self.consumer.offsets = {self.partition: 1} + + # offset sent to client + self.assertEqual(self.client.send_offset_commit_request.call_count, 1) + + # offset remains advanced + self.assertEqual(self.consumer.offsets, {self.partition: 1}) + + # and seek called with relative zero delta + self.assertEqual(self.consumer.seek.call_count, 1) + self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) + + def test_mark_multiple(self): + """ + Should remain at highest marked location. + """ + with self.transaction as transaction: + transaction.mark(self.partition, 0) + transaction.mark(self.partition, 1) + transaction.mark(self.partition, 2) + # advance offset + self.consumer.offsets = {self.partition: 3} + + # offset sent to client + self.assertEqual(self.client.send_offset_commit_request.call_count, 1) + + # offset remains advanced + self.assertEqual(self.consumer.offsets, {self.partition: 3}) + + # and seek called with relative zero delta + self.assertEqual(self.consumer.seek.call_count, 1) + self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) + + def test_rollback(self): + """ + Should rollback to beginning of transaction. + """ + with self.assertRaises(Exception): + with self.transaction as transaction: + transaction.mark(self.partition, 0) + # advance offset + self.consumer.offsets = {self.partition: 1} + + raise Exception("Intentional failure") + + # offset rolled back (ignoring mark) + self.assertEqual(self.consumer.offsets, {self.partition: 0}) + + # and seek called with relative zero delta + self.assertEqual(self.consumer.seek.call_count, 1) + self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) + + def test_out_of_range(self): + """ + Should remain at beginning of range. + """ + def _seek(offset, whence): + # seek must be called with 0, 0 to find the beginning of the range + self.assertEqual(offset, 0) + self.assertEqual(whence, 0) + # set offsets to something different + self.consumer.offsets = {self.partition: 100} + + with patch.object(self.consumer, "seek", _seek): + with self.transaction: + raise OffsetOutOfRangeError() + + self.assertEqual(self.consumer.offsets, {self.partition: 100}) |