diff options
author | Jesse Myers <jesse@locationlabs.com> | 2014-09-03 08:06:56 -0700 |
---|---|---|
committer | Jesse Myers <jesse@locationlabs.com> | 2014-09-03 08:06:56 -0700 |
commit | 20db3b5f7684df1a63fe5eb7def0478ffeff5d3d (patch) | |
tree | 2ead1639c4c47a5e94f426254d3a0ef914078621 | |
parent | d65bc5b70dfa381f650dc4c1e136680b8f6c9649 (diff) | |
download | kafka-python-20db3b5f7684df1a63fe5eb7def0478ffeff5d3d.tar.gz |
Rename KafkaTransaction to OffsetCommitContext for clarity.
-rw-r--r-- | kafka/context.py (renamed from kafka/transaction.py) | 32 | ||||
-rw-r--r-- | test/test_context.py (renamed from test/test_transaction.py) | 40 |
2 files changed, 36 insertions, 36 deletions
diff --git a/kafka/transaction.py b/kafka/context.py index 0dfe9d4..98ed7b3 100644 --- a/kafka/transaction.py +++ b/kafka/context.py @@ -1,14 +1,14 @@ """ -Transactional commit and rollback semantics for consumer. +Context manager to commit/rollback consumer offsets. """ from logging import getLogger from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError -class KafkaTransaction(object): +class OffsetCommitContext(object): """ - Provides transactional commit/rollback semantics around a `SimpleConsumer`. + Provides commit/rollback semantics around a `SimpleConsumer`. Usage assumes that `auto_commit` is disabled, that messages are consumed in batches, and that the consuming process will record its own successful @@ -23,16 +23,16 @@ class KafkaTransaction(object): consumer.fetch_last_known_offsets() while some_condition: - with KafkaTransaction(consumer) as transaction: + with OffsetCommitContext(consumer) as context: messages = consumer.get_messages(count, block=False) for partition, message in messages: if can_process(message): - transaction.mark(partition, message.offset) + context.mark(partition, message.offset) else: break - if not transaction: + if not context: sleep(delay) @@ -48,11 +48,11 @@ class KafkaTransaction(object): self.consumer = consumer self.initial_offsets = None self.high_water_mark = None - self.logger = getLogger("kafka.transaction") + self.logger = getLogger("kafka.context") def mark(self, partition, offset): """ - Set the high-water mark in the current transaction. + Set the high-water mark in the current context. In order to know the current partition, it is helpful to initialize the consumer to provide partition info via: @@ -68,13 +68,13 @@ class KafkaTransaction(object): def __nonzero__(self): """ - Return whether any operations were marked in the transaction. + Return whether any operations were marked in the context. """ return bool(self.high_water_mark) def __enter__(self): """ - Start a new transaction: + Start a new context: - Record the initial offsets for rollback - Reset the high-water mark @@ -82,13 +82,13 @@ class KafkaTransaction(object): self.initial_offsets = dict(self.consumer.offsets) self.high_water_mark = dict() - self.logger.debug("Starting transaction at: %s", self.initial_offsets) + self.logger.debug("Starting context at: %s", self.initial_offsets) return self def __exit__(self, exc_type, exc_value, traceback): """ - End a transaction. + End a context. - If there was no exception, commit up to the current high-water mark. - If there was an offset of range error, attempt to find the correct @@ -105,14 +105,14 @@ class KafkaTransaction(object): def commit(self): """ - Commit this transaction: + Commit this context's offsets: - If the high-water mark has moved, commit up to and position the consumer at the high-water mark. - Otherwise, reset to the consumer to the initial offsets. """ if self.high_water_mark: - self.logger.info("Committing transaction: %s", self.high_water_mark) + self.logger.info("Committing offsets: %s", self.high_water_mark) self.commit_partition_offsets(self.high_water_mark) self.update_consumer_offsets(self.high_water_mark) else: @@ -120,11 +120,11 @@ class KafkaTransaction(object): def rollback(self): """ - Rollback this transaction: + Rollback this context: - Position the consumer at the initial offsets. """ - self.logger.info("Rolling back transaction: %s", self.initial_offsets) + self.logger.info("Rolling back context: %s", self.initial_offsets) self.update_consumer_offsets(self.initial_offsets) def commit_partition_offsets(self, partition_offsets): diff --git a/test/test_transaction.py b/test/test_context.py index e3d13c8..8c3fd85 100644 --- a/test/test_transaction.py +++ b/test/test_context.py @@ -1,17 +1,17 @@ """ -KafkaTransaction tests. +OffsetCommitContext tests. """ from unittest2 import TestCase from mock import MagicMock, patch from kafka.common import OffsetOutOfRangeError -from kafka.transaction import KafkaTransaction +from kafka.context import OffsetCommitContext -class TestKafkaTransaction(TestCase): +class TestOffsetCommitContext(TestCase): """ - KafkaTransaction tests. + OffsetCommitContext tests. """ def setUp(self): @@ -24,13 +24,13 @@ class TestKafkaTransaction(TestCase): self.consumer.group = self.group self.consumer.client = self.client self.consumer.offsets = {self.partition: 0} - self.transaction = KafkaTransaction(self.consumer) + self.context = OffsetCommitContext(self.consumer) def test_noop(self): """ - Should revert consumer after transaction with no mark() call. + Should revert consumer after context exit with no mark() call. """ - with self.transaction: + with self.context: # advance offset self.consumer.offsets = {self.partition: 1} @@ -42,10 +42,10 @@ class TestKafkaTransaction(TestCase): def test_mark(self): """ - Should remain at marked location. + Should remain at marked location ater context exit. """ - with self.transaction as transaction: - transaction.mark(self.partition, 0) + with self.context as context: + context.mark(self.partition, 0) # advance offset self.consumer.offsets = {self.partition: 1} @@ -61,12 +61,12 @@ class TestKafkaTransaction(TestCase): def test_mark_multiple(self): """ - Should remain at highest marked location. + Should remain at highest marked location after context exit. """ - with self.transaction as transaction: - transaction.mark(self.partition, 0) - transaction.mark(self.partition, 1) - transaction.mark(self.partition, 2) + with self.context as context: + context.mark(self.partition, 0) + context.mark(self.partition, 1) + context.mark(self.partition, 2) # advance offset self.consumer.offsets = {self.partition: 3} @@ -82,11 +82,11 @@ class TestKafkaTransaction(TestCase): def test_rollback(self): """ - Should rollback to beginning of transaction. + Should rollback to initial offsets on context exit with exception. """ with self.assertRaises(Exception): - with self.transaction as transaction: - transaction.mark(self.partition, 0) + with self.context as context: + context.mark(self.partition, 0) # advance offset self.consumer.offsets = {self.partition: 1} @@ -101,7 +101,7 @@ class TestKafkaTransaction(TestCase): def test_out_of_range(self): """ - Should remain at beginning of range. + Should reset to beginning of valid offsets on `OffsetOutOfRangeError` """ def _seek(offset, whence): # seek must be called with 0, 0 to find the beginning of the range @@ -111,7 +111,7 @@ class TestKafkaTransaction(TestCase): self.consumer.offsets = {self.partition: 100} with patch.object(self.consumer, "seek", _seek): - with self.transaction: + with self.context: raise OffsetOutOfRangeError() self.assertEqual(self.consumer.offsets, {self.partition: 100}) |