diff options
-rw-r--r-- | kafka/consumer/group.py | 2 | ||||
-rw-r--r-- | kafka/coordinator/abstract.py | 8 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 4 |
3 files changed, 6 insertions, 8 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index b7093f3..0814983 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -154,7 +154,7 @@ class KafkaConsumer(object): self._fetcher = Fetcher( self._client, self._subscription, **self.config) self._coordinator = ConsumerCoordinator( - self._client, self.config['group_id'], self._subscription, + self._client, self._subscription, assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py index ea5cb97..6790bb1 100644 --- a/kafka/coordinator/abstract.py +++ b/kafka/coordinator/abstract.py @@ -46,18 +46,16 @@ class AbstractCoordinator(object): """ DEFAULT_CONFIG = { + 'group_id': 'kafka-python-default-group', 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, 'retry_backoff_ms': 100, } - def __init__(self, client, group_id, **configs): + def __init__(self, client, **configs): if not client: raise Errors.IllegalStateError('a client is required to use' ' Group Coordinator') - if not group_id: - raise Errors.IllegalStateError('a group_id is required to use' - ' Group Coordinator') self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -67,7 +65,7 @@ class AbstractCoordinator(object): self._client = client self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID - self.group_id = group_id + self.group_id = self.config['group_id'] self.coordinator_id = None self.rejoin_needed = True self.needs_join_prepare = True diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index dd3eea0..3d5669e 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -47,6 +47,7 @@ class ConsumerProtocol(object): class ConsumerCoordinator(AbstractCoordinator): """This class manages the coordination process with the consumer coordinator.""" DEFAULT_CONFIG = { + 'group_id': 'kafka-python-default-group', 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': lambda offsets, error: True, @@ -56,9 +57,8 @@ class ConsumerCoordinator(AbstractCoordinator): 'retry_backoff_ms': 100, } - def __init__(self, client, group_id, subscription, **configs): """Initialize the coordination manager.""" - super(ConsumerCoordinator, self).__init__(client, group_id, **configs) + super(ConsumerCoordinator, self).__init__(client, **configs) self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: |