summaryrefslogtreecommitdiff
path: root/kafka/coordinator
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-03 16:03:30 -0800
committerDana Powers <dana.powers@rd.io>2016-01-03 16:03:30 -0800
commit5c45ec13f3e59d9c398f2d3035c762ca13589885 (patch)
treefe2ddf4e841aae1a352c91bc229be7be8a3c49db /kafka/coordinator
parentfae1a227b1eb67fda2264d81c36cdbe39b49e057 (diff)
downloadkafka-python-5c45ec13f3e59d9c398f2d3035c762ca13589885.tar.gz
Check api_version in ConsumerCoordinator
- Full group support in 0.9 - Kafka-storage offsets w/ GroupCoordinator in 0.8.2 - Zookeeper-storage offsets in 0.8.1 - Assign all partitions locally if < 0.9
Diffstat (limited to 'kafka/coordinator')
-rw-r--r--kafka/coordinator/consumer.py126
1 files changed, 94 insertions, 32 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 99d62f2..673cbaf 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -9,7 +9,9 @@ from .base import BaseCoordinator
import kafka.common as Errors
from kafka.common import OffsetAndMetadata, TopicPartition
from kafka.future import Future
-from kafka.protocol.commit import OffsetCommitRequest_v2, OffsetFetchRequest_v1
+from kafka.protocol.commit import (
+ OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
+ OffsetFetchRequest_v0, OffsetFetchRequest_v1)
from kafka.protocol.struct import Struct
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
@@ -55,6 +57,7 @@ class ConsumerCoordinator(BaseCoordinator):
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
+ 'api_version': (0, 9),
}
def __init__(self, client, subscription, **configs):
@@ -99,14 +102,16 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription = subscription
self._partitions_per_topic = {}
self._auto_commit_task = None
- assert self.config['assignors'], 'Coordinator require assignors'
+ if self.config['api_version'] >= (0, 9):
+ assert self.config['assignors'], 'Coordinator require assignors'
self._cluster.request_update()
self._cluster.add_listener(self._handle_metadata_update)
- if self.config['enable_auto_commit']:
- interval = self.config['auto_commit_interval_ms'] / 1000.0
- self._auto_commit_task = AutoCommitTask(self, interval)
+ if self.config['api_version'] >= (0, 8, 1):
+ if self.config['enable_auto_commit']:
+ interval = self.config['auto_commit_interval_ms'] / 1000.0
+ self._auto_commit_task = AutoCommitTask(self, interval)
# metrics=None,
# metric_group_prefix=None,
@@ -143,7 +148,17 @@ class ConsumerCoordinator(BaseCoordinator):
# check if there are any changes to the metadata which should trigger a rebalance
if self._subscription_metadata_changed():
- self._subscription.mark_for_reassignment()
+ if self.config['api_version'] >= (0, 9):
+ self._subscription.mark_for_reassignment()
+
+ # If we haven't got group coordinator support,
+ # just assign all partitions locally
+ else:
+ self._subscription.assign_from_subscribed([
+ TopicPartition(topic, partition)
+ for topic in self._subscription.subscription
+ for partition in self._partitions_per_topic[topic]
+ ])
def _subscription_metadata_changed(self):
if not self._subscription.partitions_auto_assigned():
@@ -273,7 +288,8 @@ class ConsumerCoordinator(BaseCoordinator):
dict: {TopicPartition: OffsetAndMetadata}
"""
while True:
- self.ensure_coordinator_known()
+ if self.config['api_version'] >= (0, 8, 2):
+ self.ensure_coordinator_known()
# contact coordinator to fetch committed offsets
future = self._send_offset_fetch_request(partitions)
@@ -331,7 +347,8 @@ class ConsumerCoordinator(BaseCoordinator):
return
while True:
- self.ensure_coordinator_known()
+ if self.config['api_version'] >= (0, 8, 2):
+ self.ensure_coordinator_known()
future = self._send_offset_commit_request(offsets)
self._client.poll(future=future)
@@ -345,6 +362,8 @@ class ConsumerCoordinator(BaseCoordinator):
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def _maybe_auto_commit_offsets_sync(self):
+ if self.config['api_version'] < (0, 8, 1):
+ return
if self.config['enable_auto_commit']:
# disable periodic commits prior to committing synchronously. note that they will
# be re-enabled after a rebalance completes
@@ -379,8 +398,12 @@ class ConsumerCoordinator(BaseCoordinator):
Returns:
Future: indicating whether the commit was successful or not
"""
- if self.coordinator_unknown():
- return Future().failure(Errors.GroupCoordinatorNotAvailableError)
+ if self.config['api_version'] >= (0, 8, 2):
+ if self.coordinator_unknown():
+ return Future().failure(Errors.GroupCoordinatorNotAvailableError)
+ node_id = self.coordinator_id
+ else:
+ node_id = self._client.least_loaded_node()
if not offsets:
return Future().failure(None)
@@ -390,25 +413,49 @@ class ConsumerCoordinator(BaseCoordinator):
for tp, offset in six.iteritems(offsets):
offset_data[tp.topic][tp.partition] = offset
- request = OffsetCommitRequest_v2(
- self.group_id,
- self.generation,
- self.member_id,
- OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME,
- [(
- topic, [(
- partition,
- offset.offset,
- offset.metadata
- ) for partition, offset in six.iteritems(partitions)]
- ) for topic, partitions in six.iteritems(offset_data)]
- )
+ if self.config['api_version'] >= (0, 9):
+ request = OffsetCommitRequest_v2(
+ self.group_id,
+ self.generation,
+ self.member_id,
+ OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME,
+ [(
+ topic, [(
+ partition,
+ offset.offset,
+ offset.metadata
+ ) for partition, offset in six.iteritems(partitions)]
+ ) for topic, partitions in six.iteritems(offset_data)]
+ )
+ elif self.config['api_version'] >= (0, 8, 2):
+ request = OffsetCommitRequest_v1(
+ self.group_id, -1, '',
+ [(
+ topic, [(
+ partition,
+ offset.offset,
+ -1,
+ offset.metadata
+ ) for partition, offset in six.iteritems(partitions)]
+ ) for topic, partitions in six.iteritems(offset_data)]
+ )
+ elif self.config['api_version'] >= (0, 8, 1):
+ request = OffsetCommitRequest_v0(
+ self.group_id,
+ [(
+ topic, [(
+ partition,
+ offset.offset,
+ offset.metadata
+ ) for partition, offset in six.iteritems(partitions)]
+ ) for topic, partitions in six.iteritems(offset_data)]
+ )
log.debug("Sending offset-commit request with %s to %s",
- offsets, self.coordinator_id)
+ offsets, node_id)
future = Future()
- _f = self._client.send(self.coordinator_id, request)
+ _f = self._client.send(node_id, request)
_f.add_callback(self._handle_offset_commit_response, offsets, future)
_f.add_errback(self._failed_request, future)
return future
@@ -495,22 +542,33 @@ class ConsumerCoordinator(BaseCoordinator):
Returns:
Future: resolves to dict of offsets: {TopicPartition: int}
"""
- if self.coordinator_unknown():
- return Future().failure(Errors.GroupCoordinatorNotAvailableError)
+ if self.config['api_version'] >= (0, 8, 2):
+ if self.coordinator_unknown():
+ return Future().failure(Errors.GroupCoordinatorNotAvailableError)
+ node_id = self.coordinator_id
+ else:
+ node_id = self._client.least_loaded_node()
log.debug("Fetching committed offsets for partitions: %s", partitions)
# construct the request
topic_partitions = collections.defaultdict(set)
for tp in partitions:
topic_partitions[tp.topic].add(tp.partition)
- request = OffsetFetchRequest_v1(
- self.group_id,
- list(topic_partitions.items())
- )
+
+ if self.config['api_version'] >= (0, 8, 2):
+ request = OffsetFetchRequest_v1(
+ self.group_id,
+ list(topic_partitions.items())
+ )
+ else:
+ request = OffsetFetchRequest_v0(
+ self.group_id,
+ list(topic_partitions.items())
+ )
# send the request with a callback
future = Future()
- _f = self._client.send(self.coordinator_id, request)
+ _f = self._client.send(node_id, request)
_f.add_callback(self._handle_offset_fetch_response, future)
_f.add_errback(self._failed_request, future)
return future
@@ -536,6 +594,10 @@ class ConsumerCoordinator(BaseCoordinator):
# need to re-join group
self._subscription.mark_for_reassignment()
future.failure(error)
+ elif error_type is Errors.UnknownTopicOrPartitionError:
+ log.warning("OffsetFetchRequest -- unknown topic %s",
+ topic)
+ continue
else:
log.error("Unknown error fetching offsets for %s: %s",
tp, error)