From 63e08e643b5d02aacbccab21b72b34ff1798a23a Mon Sep 17 00:00:00 2001 From: Jesse Myers Date: Sat, 30 Aug 2014 20:25:28 -0700 Subject: Add transaction context manager. --- kafka/transaction.py | 169 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 kafka/transaction.py (limited to 'kafka') diff --git a/kafka/transaction.py b/kafka/transaction.py new file mode 100644 index 0000000..10c2ebd --- /dev/null +++ b/kafka/transaction.py @@ -0,0 +1,169 @@ +""" +Transactional commit and rollback semantics for consumer. +""" +from logging import getLogger + +from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError + + +class KafkaTransaction(object): + """ + Provides transactional commit/rollback semantics around a `SimpleConsumer`. + + Usage assumes that `auto_commit` is disabled, that messages are consumed in + batches, and that the consuming process will record its own successful + processing of each message. Both the commit and rollback operations respect + a "high-water mark" to ensure that last unsuccessfully processed message + will be retried. + + Example: + + consumer = SimpleConsumer(client, group, topic, auto_commit=False) + consumer.provide_partition_info() + + while some_condition: + with KafkaTransaction(consumer) as transaction: + messages = consumer.get_messages(count, block=False) + + for partition, message in messages: + if can_process(message.value): + transaction.mark(partition, message.offset) + else: + break + + if not transaction: + sleep(delay) + + + These semantics allow for deferred message processing (e.g. if `can_process` + compares message time to clock time) and for repeated processing of the last + unsuccessful message (until some external error is resolved). + """ + + def __init__(self, consumer): + """ + :param consumer: an instance of `SimpleConsumer` + """ + self.consumer = consumer + self.initial_offsets = None + self.high_water_mark = None + self.logger = getLogger("kafka.transaction") + + def mark(self, partition, offset): + """ + Set the high-water mark in the current transaction. + + In order to know the current partition, it is helpful to initialize + the consumer to provide partition info via: + + consumer.provide_partition_info() + """ + max_offset = max(offset + 1, self.high_water_mark.get(partition, 0)) + + self.logger.debug("Setting high-water mark to: %s", + {partition: max_offset}) + + self.high_water_mark[partition] = max_offset + + def __nonzero__(self): + """ + Return whether any operations were marked in the transaction. + """ + return bool(self.high_water_mark) + + def __enter__(self): + """ + Start a new transaction: + + - Record the initial offsets for rollback + - Reset the high-water mark + """ + self.initial_offsets = dict(self.consumer.offsets) + self.high_water_mark = dict() + + self.logger.debug("Starting transaction at: %s", self.initial_offsets) + + return self + + def __exit__(self, exc_type, exc_value, traceback): + """ + End a transaction. + + - If there was no exception, commit up to the current high-water mark. + - If there was an offset of range error, attempt to find the correct + initial offset. + - If there was any other error, roll back to the initial offsets. + """ + if exc_type is None: + self.commit() + elif isinstance(exc_value, OffsetOutOfRangeError): + self.handle_out_of_range() + return True + else: + self.rollback() + + def commit(self): + """ + Commit this transaction: + + - If the high-water mark has moved, commit up to and position the + consumer at the high-water mark. + - Otherwise, reset to the consumer to the initial offsets. + """ + if self.high_water_mark: + self.logger.info("Committing transaction: %s", self.high_water_mark) + self.commit_partition_offsets(self.high_water_mark) + self.update_consumer_offsets(self.high_water_mark) + else: + self.update_consumer_offsets(self.initial_offsets) + + def rollback(self): + """ + Rollback this transaction: + + - Position the consumer at the initial offsets. + """ + self.logger.info("Rolling back transaction: %s", self.initial_offsets) + self.update_consumer_offsets(self.initial_offsets) + + def commit_partition_offsets(self, partition_offsets): + """ + Commit explicit partition/offset pairs. + """ + self.logger.debug("Committing partition offsets: %s", partition_offsets) + + commit_requests = [ + OffsetCommitRequest(self.consumer.topic, partition, offset, None) + for partition, offset in partition_offsets.items() + ] + commit_responses = self.consumer.client.send_offset_commit_request( + self.consumer.group, + commit_requests, + ) + for commit_response in commit_responses: + check_error(commit_response) + + def update_consumer_offsets(self, partition_offsets): + """ + Update consumer offsets to explicit positions. + """ + self.logger.debug("Updating consumer offsets to: %s", partition_offsets) + + for partition, offset in partition_offsets.items(): + self.consumer.offsets[partition] = offset + + # consumer keeps other offset states beyond its `offsets` dictionary, + # a relative seek with zero delta forces the consumer to reset to the + # current value of the `offsets` dictionary + self.consumer.seek(0, 1) + + def handle_out_of_range(self): + """ + Handle out of range condition by seeking to the beginning of valid + ranges. + + This assumes that an out of range doesn't happen by seeking past the end + of valid ranges -- which is far less likely. + """ + self.logger.info("Seeking beginning of partition on out of range error") + self.consumer.seek(0, 0) -- cgit v1.2.1 From d65bc5b70dfa381f650dc4c1e136680b8f6c9649 Mon Sep 17 00:00:00 2001 From: Jesse Myers Date: Sat, 30 Aug 2014 20:43:18 -0700 Subject: Improve documentation in example --- kafka/transaction.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'kafka') diff --git a/kafka/transaction.py b/kafka/transaction.py index 10c2ebd..0dfe9d4 100644 --- a/kafka/transaction.py +++ b/kafka/transaction.py @@ -20,13 +20,14 @@ class KafkaTransaction(object): consumer = SimpleConsumer(client, group, topic, auto_commit=False) consumer.provide_partition_info() + consumer.fetch_last_known_offsets() while some_condition: with KafkaTransaction(consumer) as transaction: messages = consumer.get_messages(count, block=False) for partition, message in messages: - if can_process(message.value): + if can_process(message): transaction.mark(partition, message.offset) else: break -- cgit v1.2.1 From 20db3b5f7684df1a63fe5eb7def0478ffeff5d3d Mon Sep 17 00:00:00 2001 From: Jesse Myers Date: Wed, 3 Sep 2014 08:06:56 -0700 Subject: Rename KafkaTransaction to OffsetCommitContext for clarity. --- kafka/context.py | 170 +++++++++++++++++++++++++++++++++++++++++++++++++++ kafka/transaction.py | 170 --------------------------------------------------- 2 files changed, 170 insertions(+), 170 deletions(-) create mode 100644 kafka/context.py delete mode 100644 kafka/transaction.py (limited to 'kafka') diff --git a/kafka/context.py b/kafka/context.py new file mode 100644 index 0000000..98ed7b3 --- /dev/null +++ b/kafka/context.py @@ -0,0 +1,170 @@ +""" +Context manager to commit/rollback consumer offsets. +""" +from logging import getLogger + +from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError + + +class OffsetCommitContext(object): + """ + Provides commit/rollback semantics around a `SimpleConsumer`. + + Usage assumes that `auto_commit` is disabled, that messages are consumed in + batches, and that the consuming process will record its own successful + processing of each message. Both the commit and rollback operations respect + a "high-water mark" to ensure that last unsuccessfully processed message + will be retried. + + Example: + + consumer = SimpleConsumer(client, group, topic, auto_commit=False) + consumer.provide_partition_info() + consumer.fetch_last_known_offsets() + + while some_condition: + with OffsetCommitContext(consumer) as context: + messages = consumer.get_messages(count, block=False) + + for partition, message in messages: + if can_process(message): + context.mark(partition, message.offset) + else: + break + + if not context: + sleep(delay) + + + These semantics allow for deferred message processing (e.g. if `can_process` + compares message time to clock time) and for repeated processing of the last + unsuccessful message (until some external error is resolved). + """ + + def __init__(self, consumer): + """ + :param consumer: an instance of `SimpleConsumer` + """ + self.consumer = consumer + self.initial_offsets = None + self.high_water_mark = None + self.logger = getLogger("kafka.context") + + def mark(self, partition, offset): + """ + Set the high-water mark in the current context. + + In order to know the current partition, it is helpful to initialize + the consumer to provide partition info via: + + consumer.provide_partition_info() + """ + max_offset = max(offset + 1, self.high_water_mark.get(partition, 0)) + + self.logger.debug("Setting high-water mark to: %s", + {partition: max_offset}) + + self.high_water_mark[partition] = max_offset + + def __nonzero__(self): + """ + Return whether any operations were marked in the context. + """ + return bool(self.high_water_mark) + + def __enter__(self): + """ + Start a new context: + + - Record the initial offsets for rollback + - Reset the high-water mark + """ + self.initial_offsets = dict(self.consumer.offsets) + self.high_water_mark = dict() + + self.logger.debug("Starting context at: %s", self.initial_offsets) + + return self + + def __exit__(self, exc_type, exc_value, traceback): + """ + End a context. + + - If there was no exception, commit up to the current high-water mark. + - If there was an offset of range error, attempt to find the correct + initial offset. + - If there was any other error, roll back to the initial offsets. + """ + if exc_type is None: + self.commit() + elif isinstance(exc_value, OffsetOutOfRangeError): + self.handle_out_of_range() + return True + else: + self.rollback() + + def commit(self): + """ + Commit this context's offsets: + + - If the high-water mark has moved, commit up to and position the + consumer at the high-water mark. + - Otherwise, reset to the consumer to the initial offsets. + """ + if self.high_water_mark: + self.logger.info("Committing offsets: %s", self.high_water_mark) + self.commit_partition_offsets(self.high_water_mark) + self.update_consumer_offsets(self.high_water_mark) + else: + self.update_consumer_offsets(self.initial_offsets) + + def rollback(self): + """ + Rollback this context: + + - Position the consumer at the initial offsets. + """ + self.logger.info("Rolling back context: %s", self.initial_offsets) + self.update_consumer_offsets(self.initial_offsets) + + def commit_partition_offsets(self, partition_offsets): + """ + Commit explicit partition/offset pairs. + """ + self.logger.debug("Committing partition offsets: %s", partition_offsets) + + commit_requests = [ + OffsetCommitRequest(self.consumer.topic, partition, offset, None) + for partition, offset in partition_offsets.items() + ] + commit_responses = self.consumer.client.send_offset_commit_request( + self.consumer.group, + commit_requests, + ) + for commit_response in commit_responses: + check_error(commit_response) + + def update_consumer_offsets(self, partition_offsets): + """ + Update consumer offsets to explicit positions. + """ + self.logger.debug("Updating consumer offsets to: %s", partition_offsets) + + for partition, offset in partition_offsets.items(): + self.consumer.offsets[partition] = offset + + # consumer keeps other offset states beyond its `offsets` dictionary, + # a relative seek with zero delta forces the consumer to reset to the + # current value of the `offsets` dictionary + self.consumer.seek(0, 1) + + def handle_out_of_range(self): + """ + Handle out of range condition by seeking to the beginning of valid + ranges. + + This assumes that an out of range doesn't happen by seeking past the end + of valid ranges -- which is far less likely. + """ + self.logger.info("Seeking beginning of partition on out of range error") + self.consumer.seek(0, 0) diff --git a/kafka/transaction.py b/kafka/transaction.py deleted file mode 100644 index 0dfe9d4..0000000 --- a/kafka/transaction.py +++ /dev/null @@ -1,170 +0,0 @@ -""" -Transactional commit and rollback semantics for consumer. -""" -from logging import getLogger - -from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError - - -class KafkaTransaction(object): - """ - Provides transactional commit/rollback semantics around a `SimpleConsumer`. - - Usage assumes that `auto_commit` is disabled, that messages are consumed in - batches, and that the consuming process will record its own successful - processing of each message. Both the commit and rollback operations respect - a "high-water mark" to ensure that last unsuccessfully processed message - will be retried. - - Example: - - consumer = SimpleConsumer(client, group, topic, auto_commit=False) - consumer.provide_partition_info() - consumer.fetch_last_known_offsets() - - while some_condition: - with KafkaTransaction(consumer) as transaction: - messages = consumer.get_messages(count, block=False) - - for partition, message in messages: - if can_process(message): - transaction.mark(partition, message.offset) - else: - break - - if not transaction: - sleep(delay) - - - These semantics allow for deferred message processing (e.g. if `can_process` - compares message time to clock time) and for repeated processing of the last - unsuccessful message (until some external error is resolved). - """ - - def __init__(self, consumer): - """ - :param consumer: an instance of `SimpleConsumer` - """ - self.consumer = consumer - self.initial_offsets = None - self.high_water_mark = None - self.logger = getLogger("kafka.transaction") - - def mark(self, partition, offset): - """ - Set the high-water mark in the current transaction. - - In order to know the current partition, it is helpful to initialize - the consumer to provide partition info via: - - consumer.provide_partition_info() - """ - max_offset = max(offset + 1, self.high_water_mark.get(partition, 0)) - - self.logger.debug("Setting high-water mark to: %s", - {partition: max_offset}) - - self.high_water_mark[partition] = max_offset - - def __nonzero__(self): - """ - Return whether any operations were marked in the transaction. - """ - return bool(self.high_water_mark) - - def __enter__(self): - """ - Start a new transaction: - - - Record the initial offsets for rollback - - Reset the high-water mark - """ - self.initial_offsets = dict(self.consumer.offsets) - self.high_water_mark = dict() - - self.logger.debug("Starting transaction at: %s", self.initial_offsets) - - return self - - def __exit__(self, exc_type, exc_value, traceback): - """ - End a transaction. - - - If there was no exception, commit up to the current high-water mark. - - If there was an offset of range error, attempt to find the correct - initial offset. - - If there was any other error, roll back to the initial offsets. - """ - if exc_type is None: - self.commit() - elif isinstance(exc_value, OffsetOutOfRangeError): - self.handle_out_of_range() - return True - else: - self.rollback() - - def commit(self): - """ - Commit this transaction: - - - If the high-water mark has moved, commit up to and position the - consumer at the high-water mark. - - Otherwise, reset to the consumer to the initial offsets. - """ - if self.high_water_mark: - self.logger.info("Committing transaction: %s", self.high_water_mark) - self.commit_partition_offsets(self.high_water_mark) - self.update_consumer_offsets(self.high_water_mark) - else: - self.update_consumer_offsets(self.initial_offsets) - - def rollback(self): - """ - Rollback this transaction: - - - Position the consumer at the initial offsets. - """ - self.logger.info("Rolling back transaction: %s", self.initial_offsets) - self.update_consumer_offsets(self.initial_offsets) - - def commit_partition_offsets(self, partition_offsets): - """ - Commit explicit partition/offset pairs. - """ - self.logger.debug("Committing partition offsets: %s", partition_offsets) - - commit_requests = [ - OffsetCommitRequest(self.consumer.topic, partition, offset, None) - for partition, offset in partition_offsets.items() - ] - commit_responses = self.consumer.client.send_offset_commit_request( - self.consumer.group, - commit_requests, - ) - for commit_response in commit_responses: - check_error(commit_response) - - def update_consumer_offsets(self, partition_offsets): - """ - Update consumer offsets to explicit positions. - """ - self.logger.debug("Updating consumer offsets to: %s", partition_offsets) - - for partition, offset in partition_offsets.items(): - self.consumer.offsets[partition] = offset - - # consumer keeps other offset states beyond its `offsets` dictionary, - # a relative seek with zero delta forces the consumer to reset to the - # current value of the `offsets` dictionary - self.consumer.seek(0, 1) - - def handle_out_of_range(self): - """ - Handle out of range condition by seeking to the beginning of valid - ranges. - - This assumes that an out of range doesn't happen by seeking past the end - of valid ranges -- which is far less likely. - """ - self.logger.info("Seeking beginning of partition on out of range error") - self.consumer.seek(0, 0) -- cgit v1.2.1