summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/transaction.py169
-rw-r--r--test/test_transaction.py117
2 files changed, 286 insertions, 0 deletions
diff --git a/kafka/transaction.py b/kafka/transaction.py
new file mode 100644
index 0000000..10c2ebd
--- /dev/null
+++ b/kafka/transaction.py
@@ -0,0 +1,169 @@
+"""
+Transactional commit and rollback semantics for consumer.
+"""
+from logging import getLogger
+
+from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError
+
+
+class KafkaTransaction(object):
+ """
+ Provides transactional commit/rollback semantics around a `SimpleConsumer`.
+
+ Usage assumes that `auto_commit` is disabled, that messages are consumed in
+ batches, and that the consuming process will record its own successful
+ processing of each message. Both the commit and rollback operations respect
+ a "high-water mark" to ensure that last unsuccessfully processed message
+ will be retried.
+
+ Example:
+
+ consumer = SimpleConsumer(client, group, topic, auto_commit=False)
+ consumer.provide_partition_info()
+
+ while some_condition:
+ with KafkaTransaction(consumer) as transaction:
+ messages = consumer.get_messages(count, block=False)
+
+ for partition, message in messages:
+ if can_process(message.value):
+ transaction.mark(partition, message.offset)
+ else:
+ break
+
+ if not transaction:
+ sleep(delay)
+
+
+ These semantics allow for deferred message processing (e.g. if `can_process`
+ compares message time to clock time) and for repeated processing of the last
+ unsuccessful message (until some external error is resolved).
+ """
+
+ def __init__(self, consumer):
+ """
+ :param consumer: an instance of `SimpleConsumer`
+ """
+ self.consumer = consumer
+ self.initial_offsets = None
+ self.high_water_mark = None
+ self.logger = getLogger("kafka.transaction")
+
+ def mark(self, partition, offset):
+ """
+ Set the high-water mark in the current transaction.
+
+ In order to know the current partition, it is helpful to initialize
+ the consumer to provide partition info via:
+
+ consumer.provide_partition_info()
+ """
+ max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
+
+ self.logger.debug("Setting high-water mark to: %s",
+ {partition: max_offset})
+
+ self.high_water_mark[partition] = max_offset
+
+ def __nonzero__(self):
+ """
+ Return whether any operations were marked in the transaction.
+ """
+ return bool(self.high_water_mark)
+
+ def __enter__(self):
+ """
+ Start a new transaction:
+
+ - Record the initial offsets for rollback
+ - Reset the high-water mark
+ """
+ self.initial_offsets = dict(self.consumer.offsets)
+ self.high_water_mark = dict()
+
+ self.logger.debug("Starting transaction at: %s", self.initial_offsets)
+
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """
+ End a transaction.
+
+ - If there was no exception, commit up to the current high-water mark.
+ - If there was an offset of range error, attempt to find the correct
+ initial offset.
+ - If there was any other error, roll back to the initial offsets.
+ """
+ if exc_type is None:
+ self.commit()
+ elif isinstance(exc_value, OffsetOutOfRangeError):
+ self.handle_out_of_range()
+ return True
+ else:
+ self.rollback()
+
+ def commit(self):
+ """
+ Commit this transaction:
+
+ - If the high-water mark has moved, commit up to and position the
+ consumer at the high-water mark.
+ - Otherwise, reset to the consumer to the initial offsets.
+ """
+ if self.high_water_mark:
+ self.logger.info("Committing transaction: %s", self.high_water_mark)
+ self.commit_partition_offsets(self.high_water_mark)
+ self.update_consumer_offsets(self.high_water_mark)
+ else:
+ self.update_consumer_offsets(self.initial_offsets)
+
+ def rollback(self):
+ """
+ Rollback this transaction:
+
+ - Position the consumer at the initial offsets.
+ """
+ self.logger.info("Rolling back transaction: %s", self.initial_offsets)
+ self.update_consumer_offsets(self.initial_offsets)
+
+ def commit_partition_offsets(self, partition_offsets):
+ """
+ Commit explicit partition/offset pairs.
+ """
+ self.logger.debug("Committing partition offsets: %s", partition_offsets)
+
+ commit_requests = [
+ OffsetCommitRequest(self.consumer.topic, partition, offset, None)
+ for partition, offset in partition_offsets.items()
+ ]
+ commit_responses = self.consumer.client.send_offset_commit_request(
+ self.consumer.group,
+ commit_requests,
+ )
+ for commit_response in commit_responses:
+ check_error(commit_response)
+
+ def update_consumer_offsets(self, partition_offsets):
+ """
+ Update consumer offsets to explicit positions.
+ """
+ self.logger.debug("Updating consumer offsets to: %s", partition_offsets)
+
+ for partition, offset in partition_offsets.items():
+ self.consumer.offsets[partition] = offset
+
+ # consumer keeps other offset states beyond its `offsets` dictionary,
+ # a relative seek with zero delta forces the consumer to reset to the
+ # current value of the `offsets` dictionary
+ self.consumer.seek(0, 1)
+
+ def handle_out_of_range(self):
+ """
+ Handle out of range condition by seeking to the beginning of valid
+ ranges.
+
+ This assumes that an out of range doesn't happen by seeking past the end
+ of valid ranges -- which is far less likely.
+ """
+ self.logger.info("Seeking beginning of partition on out of range error")
+ self.consumer.seek(0, 0)
diff --git a/test/test_transaction.py b/test/test_transaction.py
new file mode 100644
index 0000000..e3d13c8
--- /dev/null
+++ b/test/test_transaction.py
@@ -0,0 +1,117 @@
+"""
+KafkaTransaction tests.
+"""
+from unittest2 import TestCase
+
+from mock import MagicMock, patch
+
+from kafka.common import OffsetOutOfRangeError
+from kafka.transaction import KafkaTransaction
+
+
+class TestKafkaTransaction(TestCase):
+ """
+ KafkaTransaction tests.
+ """
+
+ def setUp(self):
+ self.client = MagicMock()
+ self.consumer = MagicMock()
+ self.topic = "topic"
+ self.group = "group"
+ self.partition = 0
+ self.consumer.topic = self.topic
+ self.consumer.group = self.group
+ self.consumer.client = self.client
+ self.consumer.offsets = {self.partition: 0}
+ self.transaction = KafkaTransaction(self.consumer)
+
+ def test_noop(self):
+ """
+ Should revert consumer after transaction with no mark() call.
+ """
+ with self.transaction:
+ # advance offset
+ self.consumer.offsets = {self.partition: 1}
+
+ # offset restored
+ self.assertEqual(self.consumer.offsets, {self.partition: 0})
+ # and seek called with relative zero delta
+ self.assertEqual(self.consumer.seek.call_count, 1)
+ self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+ def test_mark(self):
+ """
+ Should remain at marked location.
+ """
+ with self.transaction as transaction:
+ transaction.mark(self.partition, 0)
+ # advance offset
+ self.consumer.offsets = {self.partition: 1}
+
+ # offset sent to client
+ self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
+
+ # offset remains advanced
+ self.assertEqual(self.consumer.offsets, {self.partition: 1})
+
+ # and seek called with relative zero delta
+ self.assertEqual(self.consumer.seek.call_count, 1)
+ self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+ def test_mark_multiple(self):
+ """
+ Should remain at highest marked location.
+ """
+ with self.transaction as transaction:
+ transaction.mark(self.partition, 0)
+ transaction.mark(self.partition, 1)
+ transaction.mark(self.partition, 2)
+ # advance offset
+ self.consumer.offsets = {self.partition: 3}
+
+ # offset sent to client
+ self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
+
+ # offset remains advanced
+ self.assertEqual(self.consumer.offsets, {self.partition: 3})
+
+ # and seek called with relative zero delta
+ self.assertEqual(self.consumer.seek.call_count, 1)
+ self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+ def test_rollback(self):
+ """
+ Should rollback to beginning of transaction.
+ """
+ with self.assertRaises(Exception):
+ with self.transaction as transaction:
+ transaction.mark(self.partition, 0)
+ # advance offset
+ self.consumer.offsets = {self.partition: 1}
+
+ raise Exception("Intentional failure")
+
+ # offset rolled back (ignoring mark)
+ self.assertEqual(self.consumer.offsets, {self.partition: 0})
+
+ # and seek called with relative zero delta
+ self.assertEqual(self.consumer.seek.call_count, 1)
+ self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+ def test_out_of_range(self):
+ """
+ Should remain at beginning of range.
+ """
+ def _seek(offset, whence):
+ # seek must be called with 0, 0 to find the beginning of the range
+ self.assertEqual(offset, 0)
+ self.assertEqual(whence, 0)
+ # set offsets to something different
+ self.consumer.offsets = {self.partition: 100}
+
+ with patch.object(self.consumer, "seek", _seek):
+ with self.transaction:
+ raise OffsetOutOfRangeError()
+
+ self.assertEqual(self.consumer.offsets, {self.partition: 100})