summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py4
-rw-r--r--kafka/consumer/group.py6
-rw-r--r--kafka/coordinator/assignors/range.py77
-rw-r--r--kafka/coordinator/assignors/roundrobin.py18
-rw-r--r--kafka/coordinator/base.py12
-rw-r--r--kafka/coordinator/consumer.py20
-rw-r--r--kafka/coordinator/protocol.py2
-rw-r--r--kafka/util.py37
-rw-r--r--test/test_assignors.py58
-rw-r--r--test/test_coordinator.py21
10 files changed, 235 insertions, 20 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index cb8152a..973ece0 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -99,6 +99,10 @@ class KafkaClient(object):
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._wake_r, self._wake_w = os.pipe()
+ def __del__(self):
+ os.close(self._wake_r)
+ os.close(self._wake_w)
+
def _bootstrap(self, hosts):
# Exponential backoff if bootstrap fails
backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 30abe00..cf77df3 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -10,6 +10,7 @@ from kafka.client_async import KafkaClient
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
+from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.protocol.offset import OffsetResetStrategy
from kafka.version import __version__
@@ -98,7 +99,8 @@ class KafkaConsumer(six.Iterator):
brokers or partitions. Default: 300000
partition_assignment_strategy (list): List of objects to use to
distribute partition ownership amongst consumer instances when
- group management is used. Default: [RoundRobinPartitionAssignor]
+ group management is used.
+ Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
Kafka's group management feature. Heartbeats are used to ensure
@@ -148,7 +150,7 @@ class KafkaConsumer(six.Iterator):
'auto_commit_interval_ms': 5000,
'check_crcs': True,
'metadata_max_age_ms': 5 * 60 * 1000,
- 'partition_assignment_strategy': (RoundRobinPartitionAssignor,),
+ 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'heartbeat_interval_ms': 3000,
'session_timeout_ms': 30000,
'send_buffer_bytes': None,
diff --git a/kafka/coordinator/assignors/range.py b/kafka/coordinator/assignors/range.py
new file mode 100644
index 0000000..e4a7e33
--- /dev/null
+++ b/kafka/coordinator/assignors/range.py
@@ -0,0 +1,77 @@
+import collections
+import logging
+
+import six
+
+from .abstract import AbstractPartitionAssignor
+from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
+
+log = logging.getLogger(__name__)
+
+
+class RangePartitionAssignor(AbstractPartitionAssignor):
+ """
+ The range assignor works on a per-topic basis. For each topic, we lay out
+ the available partitions in numeric order and the consumers in
+ lexicographic order. We then divide the number of partitions by the total
+ number of consumers to determine the number of partitions to assign to each
+ consumer. If it does not evenly divide, then the first few consumers will
+ have one extra partition.
+
+ For example, suppose there are two consumers C0 and C1, two topics t0 and
+ t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
+ t0p2, t1p0, t1p1, and t1p2.
+
+ The assignment will be:
+ C0: [t0p0, t0p1, t1p0, t1p1]
+ C1: [t0p2, t1p2]
+ """
+ name = 'range'
+ version = 0
+
+ @classmethod
+ def assign(cls, cluster, member_metadata):
+ consumers_per_topic = collections.defaultdict(list)
+ for member, metadata in six.iteritems(member_metadata):
+ for topic in metadata.subscription:
+ consumers_per_topic[topic].append(member)
+
+ # construct {member_id: {topic: [partition, ...]}}
+ assignment = collections.defaultdict(dict)
+
+ for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
+ partitions = cluster.partitions_for_topic(topic)
+ if partitions is None:
+ log.warning('No partition metadata for topic %s', topic)
+ continue
+ partitions = sorted(list(partitions))
+ partitions_for_topic = len(partitions)
+ consumers_for_topic.sort()
+
+ partitions_per_consumer = len(partitions) // len(consumers_for_topic)
+ consumers_with_extra = len(partitions) % len(consumers_for_topic)
+
+ for i in range(len(consumers_for_topic)):
+ start = partitions_per_consumer * i
+ start += min(i, consumers_with_extra)
+ length = partitions_per_consumer
+ if not i + 1 > consumers_with_extra:
+ length += 1
+ member = consumers_for_topic[i]
+ assignment[member][topic] = partitions[start:start+length]
+
+ protocol_assignment = {}
+ for member_id in member_metadata:
+ protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
+ cls.version,
+ sorted(assignment[member_id].items()),
+ b'')
+ return protocol_assignment
+
+ @classmethod
+ def metadata(cls, topics):
+ return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
+
+ @classmethod
+ def on_assignment(cls, assignment):
+ pass
diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py
index d7cd884..3fd3fd6 100644
--- a/kafka/coordinator/assignors/roundrobin.py
+++ b/kafka/coordinator/assignors/roundrobin.py
@@ -12,6 +12,22 @@ log = logging.getLogger(__name__)
class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
+ """
+ The roundrobin assignor lays out all the available partitions and all the
+ available consumers. It then proceeds to do a roundrobin assignment from
+ partition to consumer. If the subscriptions of all consumer instances are
+ identical, then the partitions will be uniformly distributed. (i.e., the
+ partition ownership counts will be within a delta of exactly one across all
+ consumers.)
+
+ For example, suppose there are two consumers C0 and C1, two topics t0 and
+ t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
+ t0p2, t1p0, t1p1, and t1p2.
+
+ The assignment will be:
+ C0: [t0p0, t0p2, t1p1]
+ C1: [t0p1, t1p0, t1p2]
+ """
name = 'roundrobin'
version = 0
@@ -50,7 +66,7 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
cls.version,
- assignment[member_id].items(),
+ sorted(assignment[member_id].items()),
b'')
return protocol_assignment
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 6efdfd0..c49c38b 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -2,6 +2,7 @@ import abc
import copy
import logging
import time
+import weakref
import six
@@ -85,9 +86,12 @@ class BaseCoordinator(object):
self.rejoin_needed = True
self.needs_join_prepare = True
self.heartbeat = Heartbeat(**self.config)
- self.heartbeat_task = HeartbeatTask(self)
+ self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
#self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+ def __del__(self):
+ self.heartbeat_task.disable()
+
@abc.abstractmethod
def protocol_type(self):
"""
@@ -572,6 +576,12 @@ class HeartbeatTask(object):
self._client = coordinator._client
self._request_in_flight = False
+ def disable(self):
+ try:
+ self._client.unschedule(self)
+ except KeyError:
+ pass
+
def reset(self):
# start or restart the heartbeat task to be executed at the next chance
self._heartbeat.reset_session_timeout()
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index a393d7e..d63d052 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -4,19 +4,20 @@ import copy
import collections
import logging
import time
+import weakref
import six
from .base import BaseCoordinator
+from .assignors.range import RangePartitionAssignor
from .assignors.roundrobin import RoundRobinPartitionAssignor
-from .protocol import (
- ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
- ConsumerProtocol)
+from .protocol import ConsumerProtocol
from ..common import OffsetAndMetadata, TopicPartition
from ..future import Future
from ..protocol.commit import (
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
+from ..util import WeakMethod
import kafka.common as Errors
@@ -30,7 +31,7 @@ class ConsumerCoordinator(BaseCoordinator):
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': lambda offsets, response: True,
- 'assignors': (RoundRobinPartitionAssignor,),
+ 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
@@ -54,7 +55,7 @@ class ConsumerCoordinator(BaseCoordinator):
trigger custom actions when a commit request completes.
assignors (list): List of objects to use to distribute partition
ownership amongst consumer instances when group management is
- used. Default: [RoundRobinPartitionAssignor]
+ used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
Kafka's group management feature. Heartbeats are used to ensure
@@ -82,7 +83,7 @@ class ConsumerCoordinator(BaseCoordinator):
self._partitions_per_topic = {}
self._cluster = client.cluster
self._cluster.request_update()
- self._cluster.add_listener(self._handle_metadata_update)
+ self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
self._auto_commit_task = None
if self.config['enable_auto_commit']:
@@ -94,13 +95,18 @@ class ConsumerCoordinator(BaseCoordinator):
log.warning('group_id is None: disabling auto-commit.')
else:
interval = self.config['auto_commit_interval_ms'] / 1000.0
- self._auto_commit_task = AutoCommitTask(self, interval)
+ self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
# metrics=None,
# metric_group_prefix=None,
# metric_tags=None,
# self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+ def __del__(self):
+ if self._auto_commit_task:
+ self._auto_commit_task.disable()
+ self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
+
def protocol_type(self):
return ConsumerProtocol.PROTOCOL_TYPE
diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py
index 9af7225..9e37397 100644
--- a/kafka/coordinator/protocol.py
+++ b/kafka/coordinator/protocol.py
@@ -28,6 +28,6 @@ class ConsumerProtocolMemberAssignment(Struct):
class ConsumerProtocol(object):
PROTOCOL_TYPE = 'consumer'
- ASSIGNMENT_STRATEGIES = ('roundrobin',)
+ ASSIGNMENT_STRATEGIES = ('range', 'roundrobin')
METADATA = ConsumerProtocolMemberMetadata
ASSIGNMENT = ConsumerProtocolMemberAssignment
diff --git a/kafka/util.py b/kafka/util.py
index c6e77fa..7a11910 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -3,6 +3,7 @@ import collections
import struct
import sys
from threading import Thread, Event
+import weakref
import six
@@ -151,3 +152,39 @@ class ReentrantTimer(object):
def __del__(self):
self.stop()
+
+
+class WeakMethod(object):
+ """
+ Callable that weakly references a method and the object it is bound to. It
+ is based on http://stackoverflow.com/a/24287465.
+
+ Arguments:
+
+ object_dot_method: A bound instance method (i.e. 'object.method').
+ """
+ def __init__(self, object_dot_method):
+ try:
+ self.target = weakref.ref(object_dot_method.__self__)
+ except AttributeError:
+ self.target = weakref.ref(object_dot_method.im_self)
+ self._target_id = id(self.target())
+ try:
+ self.method = weakref.ref(object_dot_method.__func__)
+ except AttributeError:
+ self.method = weakref.ref(object_dot_method.im_func)
+ self._method_id = id(self.method())
+
+ def __call__(self, *args, **kwargs):
+ """
+ Calls the method on target with args and kwargs.
+ """
+ return self.method()(self.target(), *args, **kwargs)
+
+ def __hash__(self):
+ return hash(self.target) ^ hash(self.method)
+
+ def __eq__(self, other):
+ if not isinstance(other, WeakMethod):
+ return False
+ return self._target_id == other._target_id and self._method_id == other._method_id
diff --git a/test/test_assignors.py b/test/test_assignors.py
new file mode 100644
index 0000000..e2a1d4f
--- /dev/null
+++ b/test/test_assignors.py
@@ -0,0 +1,58 @@
+# pylint: skip-file
+from __future__ import absolute_import
+
+import pytest
+
+from kafka.coordinator.assignors.range import RangePartitionAssignor
+from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.coordinator.protocol import (
+ ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
+
+
+@pytest.fixture
+def cluster(mocker):
+ cluster = mocker.MagicMock()
+ cluster.partitions_for_topic.return_value = set([0, 1, 2])
+ return cluster
+
+
+def test_assignor_roundrobin(cluster):
+ assignor = RoundRobinPartitionAssignor
+
+ member_metadata = {
+ 'C0': assignor.metadata(set(['t0', 't1'])),
+ 'C1': assignor.metadata(set(['t0', 't1'])),
+ }
+
+ ret = assignor.assign(cluster, member_metadata)
+ expected = {
+ 'C0': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [0, 2]), ('t1', [1])], b''),
+ 'C1': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [1]), ('t1', [0, 2])], b'')
+ }
+ assert ret == expected
+ assert set(ret) == set(expected)
+ for member in ret:
+ assert ret[member].encode() == expected[member].encode()
+
+
+def test_assignor_range(cluster):
+ assignor = RangePartitionAssignor
+
+ member_metadata = {
+ 'C0': assignor.metadata(set(['t0', 't1'])),
+ 'C1': assignor.metadata(set(['t0', 't1'])),
+ }
+
+ ret = assignor.assign(cluster, member_metadata)
+ expected = {
+ 'C0': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''),
+ 'C1': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [2]), ('t1', [2])], b'')
+ }
+ assert ret == expected
+ assert set(ret) == set(expected)
+ for member in ret:
+ assert ret[member].encode() == expected[member].encode()
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 80d2de2..e0906c7 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -7,6 +7,7 @@ from kafka.client_async import KafkaClient
from kafka.common import TopicPartition, OffsetAndMetadata
from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
+from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.protocol import (
@@ -18,6 +19,7 @@ from kafka.protocol.commit import (
OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1,
OffsetFetchResponse)
from kafka.protocol.metadata import MetadataResponse
+from kafka.util import WeakMethod
import kafka.common as Errors
@@ -45,7 +47,7 @@ def test_init(conn):
# metadata update on init
assert cli.cluster._need_update is True
- assert coordinator._handle_metadata_update in cli.cluster._listeners
+ assert WeakMethod(coordinator._handle_metadata_update) in cli.cluster._listeners
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
@@ -72,13 +74,16 @@ def test_group_protocols(coordinator):
assert False, 'Exception not raised when expected'
coordinator._subscription.subscribe(topics=['foobar'])
- assert coordinator.group_protocols() == [(
- 'roundrobin',
- ConsumerProtocolMemberMetadata(
+ assert coordinator.group_protocols() == [
+ ('range', ConsumerProtocolMemberMetadata(
+ RangePartitionAssignor.version,
+ ['foobar'],
+ b'')),
+ ('roundrobin', ConsumerProtocolMemberMetadata(
RoundRobinPartitionAssignor.version,
['foobar'],
- b'')
- )]
+ b'')),
+ ]
@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)])
@@ -113,8 +118,8 @@ def test_pattern_subscription(coordinator, api_version):
def test_lookup_assignor(coordinator):
- assignor = coordinator._lookup_assignor('roundrobin')
- assert assignor is RoundRobinPartitionAssignor
+ assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor
+ assert coordinator._lookup_assignor('range') is RangePartitionAssignor
assert coordinator._lookup_assignor('foobar') is None