diff options
Diffstat (limited to 'kafka/consumer/subscription_state.py')
-rw-r--r-- | kafka/consumer/subscription_state.py | 34 |
1 files changed, 11 insertions, 23 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index c60f192..9b52ffb 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -276,7 +276,7 @@ class SubscriptionState(object): all_consumed = {} for partition, state in six.iteritems(self.assignment): if state.has_valid_position: - all_consumed[partition] = OffsetAndMetadata(state.consumed, '') + all_consumed[partition] = OffsetAndMetadata(state.position, '') return all_consumed def need_offset_reset(self, partition, offset_reset_strategy=None): @@ -332,41 +332,29 @@ class SubscriptionState(object): class TopicPartitionState(object): def __init__(self): self.committed = None # last committed position - self.has_valid_position = False # whether we have valid consumed and fetched positions + self.has_valid_position = False # whether we have valid position self.paused = False # whether this partition has been paused by the user self.awaiting_reset = False # whether we are awaiting reset self.reset_strategy = None # the reset strategy if awaitingReset is set - self._consumed = None # offset exposed to the user - self._fetched = None # current fetch position + self._position = None # offset exposed to the user - def _set_fetched(self, offset): - assert self.has_valid_position, 'Valid consumed/fetch position required' - self._fetched = offset + def _set_position(self, offset): + assert self.has_valid_position, 'Valid position required' + self._position = offset - def _get_fetched(self): - return self._fetched + def _get_position(self): + return self._position - fetched = property(_get_fetched, _set_fetched, None, "current fetch position") - - def _set_consumed(self, offset): - assert self.has_valid_position, 'Valid consumed/fetch position required' - self._consumed = offset - - def _get_consumed(self): - return self._consumed - - consumed = property(_get_consumed, _set_consumed, None, "last consumed position") + position = property(_get_position, _set_position, None, "last position") def await_reset(self, strategy): self.awaiting_reset = True self.reset_strategy = strategy - self._consumed = None - self._fetched = None + self._position = None self.has_valid_position = False def seek(self, offset): - self._consumed = offset - self._fetched = offset + self._position = offset self.awaiting_reset = False self.reset_strategy = None self.has_valid_position = True |