summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/group.py2
-rw-r--r--kafka/coordinator/abstract.py8
-rw-r--r--kafka/coordinator/consumer.py4
3 files changed, 6 insertions, 8 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index b7093f3..0814983 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -154,7 +154,7 @@ class KafkaConsumer(object):
self._fetcher = Fetcher(
self._client, self._subscription, **self.config)
self._coordinator = ConsumerCoordinator(
- self._client, self.config['group_id'], self._subscription,
+ self._client, self._subscription,
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
index ea5cb97..6790bb1 100644
--- a/kafka/coordinator/abstract.py
+++ b/kafka/coordinator/abstract.py
@@ -46,18 +46,16 @@ class AbstractCoordinator(object):
"""
DEFAULT_CONFIG = {
+ 'group_id': 'kafka-python-default-group',
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
}
- def __init__(self, client, group_id, **configs):
+ def __init__(self, client, **configs):
if not client:
raise Errors.IllegalStateError('a client is required to use'
' Group Coordinator')
- if not group_id:
- raise Errors.IllegalStateError('a group_id is required to use'
- ' Group Coordinator')
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -67,7 +65,7 @@ class AbstractCoordinator(object):
self._client = client
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
- self.group_id = group_id
+ self.group_id = self.config['group_id']
self.coordinator_id = None
self.rejoin_needed = True
self.needs_join_prepare = True
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index dd3eea0..3d5669e 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -47,6 +47,7 @@ class ConsumerProtocol(object):
class ConsumerCoordinator(AbstractCoordinator):
"""This class manages the coordination process with the consumer coordinator."""
DEFAULT_CONFIG = {
+ 'group_id': 'kafka-python-default-group',
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': lambda offsets, error: True,
@@ -56,9 +57,8 @@ class ConsumerCoordinator(AbstractCoordinator):
'retry_backoff_ms': 100,
}
- def __init__(self, client, group_id, subscription, **configs):
"""Initialize the coordination manager."""
- super(ConsumerCoordinator, self).__init__(client, group_id, **configs)
+ super(ConsumerCoordinator, self).__init__(client, **configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs: