summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/context.py (renamed from kafka/transaction.py)32
1 files changed, 16 insertions, 16 deletions
diff --git a/kafka/transaction.py b/kafka/context.py
index 0dfe9d4..98ed7b3 100644
--- a/kafka/transaction.py
+++ b/kafka/context.py
@@ -1,14 +1,14 @@
"""
-Transactional commit and rollback semantics for consumer.
+Context manager to commit/rollback consumer offsets.
"""
from logging import getLogger
from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError
-class KafkaTransaction(object):
+class OffsetCommitContext(object):
"""
- Provides transactional commit/rollback semantics around a `SimpleConsumer`.
+ Provides commit/rollback semantics around a `SimpleConsumer`.
Usage assumes that `auto_commit` is disabled, that messages are consumed in
batches, and that the consuming process will record its own successful
@@ -23,16 +23,16 @@ class KafkaTransaction(object):
consumer.fetch_last_known_offsets()
while some_condition:
- with KafkaTransaction(consumer) as transaction:
+ with OffsetCommitContext(consumer) as context:
messages = consumer.get_messages(count, block=False)
for partition, message in messages:
if can_process(message):
- transaction.mark(partition, message.offset)
+ context.mark(partition, message.offset)
else:
break
- if not transaction:
+ if not context:
sleep(delay)
@@ -48,11 +48,11 @@ class KafkaTransaction(object):
self.consumer = consumer
self.initial_offsets = None
self.high_water_mark = None
- self.logger = getLogger("kafka.transaction")
+ self.logger = getLogger("kafka.context")
def mark(self, partition, offset):
"""
- Set the high-water mark in the current transaction.
+ Set the high-water mark in the current context.
In order to know the current partition, it is helpful to initialize
the consumer to provide partition info via:
@@ -68,13 +68,13 @@ class KafkaTransaction(object):
def __nonzero__(self):
"""
- Return whether any operations were marked in the transaction.
+ Return whether any operations were marked in the context.
"""
return bool(self.high_water_mark)
def __enter__(self):
"""
- Start a new transaction:
+ Start a new context:
- Record the initial offsets for rollback
- Reset the high-water mark
@@ -82,13 +82,13 @@ class KafkaTransaction(object):
self.initial_offsets = dict(self.consumer.offsets)
self.high_water_mark = dict()
- self.logger.debug("Starting transaction at: %s", self.initial_offsets)
+ self.logger.debug("Starting context at: %s", self.initial_offsets)
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
- End a transaction.
+ End a context.
- If there was no exception, commit up to the current high-water mark.
- If there was an offset of range error, attempt to find the correct
@@ -105,14 +105,14 @@ class KafkaTransaction(object):
def commit(self):
"""
- Commit this transaction:
+ Commit this context's offsets:
- If the high-water mark has moved, commit up to and position the
consumer at the high-water mark.
- Otherwise, reset to the consumer to the initial offsets.
"""
if self.high_water_mark:
- self.logger.info("Committing transaction: %s", self.high_water_mark)
+ self.logger.info("Committing offsets: %s", self.high_water_mark)
self.commit_partition_offsets(self.high_water_mark)
self.update_consumer_offsets(self.high_water_mark)
else:
@@ -120,11 +120,11 @@ class KafkaTransaction(object):
def rollback(self):
"""
- Rollback this transaction:
+ Rollback this context:
- Position the consumer at the initial offsets.
"""
- self.logger.info("Rolling back transaction: %s", self.initial_offsets)
+ self.logger.info("Rolling back context: %s", self.initial_offsets)
self.update_consumer_offsets(self.initial_offsets)
def commit_partition_offsets(self, partition_offsets):