summaryrefslogtreecommitdiff
path: root/kafka/consumer/subscription_state.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/subscription_state.py')
-rw-r--r--kafka/consumer/subscription_state.py34
1 files changed, 11 insertions, 23 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index c60f192..9b52ffb 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -276,7 +276,7 @@ class SubscriptionState(object):
all_consumed = {}
for partition, state in six.iteritems(self.assignment):
if state.has_valid_position:
- all_consumed[partition] = OffsetAndMetadata(state.consumed, '')
+ all_consumed[partition] = OffsetAndMetadata(state.position, '')
return all_consumed
def need_offset_reset(self, partition, offset_reset_strategy=None):
@@ -332,41 +332,29 @@ class SubscriptionState(object):
class TopicPartitionState(object):
def __init__(self):
self.committed = None # last committed position
- self.has_valid_position = False # whether we have valid consumed and fetched positions
+ self.has_valid_position = False # whether we have valid position
self.paused = False # whether this partition has been paused by the user
self.awaiting_reset = False # whether we are awaiting reset
self.reset_strategy = None # the reset strategy if awaitingReset is set
- self._consumed = None # offset exposed to the user
- self._fetched = None # current fetch position
+ self._position = None # offset exposed to the user
- def _set_fetched(self, offset):
- assert self.has_valid_position, 'Valid consumed/fetch position required'
- self._fetched = offset
+ def _set_position(self, offset):
+ assert self.has_valid_position, 'Valid position required'
+ self._position = offset
- def _get_fetched(self):
- return self._fetched
+ def _get_position(self):
+ return self._position
- fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
-
- def _set_consumed(self, offset):
- assert self.has_valid_position, 'Valid consumed/fetch position required'
- self._consumed = offset
-
- def _get_consumed(self):
- return self._consumed
-
- consumed = property(_get_consumed, _set_consumed, None, "last consumed position")
+ position = property(_get_position, _set_position, None, "last position")
def await_reset(self, strategy):
self.awaiting_reset = True
self.reset_strategy = strategy
- self._consumed = None
- self._fetched = None
+ self._position = None
self.has_valid_position = False
def seek(self, offset):
- self._consumed = offset
- self._fetched = offset
+ self._position = offset
self.awaiting_reset = False
self.reset_strategy = None
self.has_valid_position = True