summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile57
-rw-r--r--kafka/consumer/fetcher.py15
-rw-r--r--kafka/consumer/group.py29
-rw-r--r--kafka/coordinator/base.py23
-rw-r--r--kafka/coordinator/consumer.py28
-rw-r--r--requirements-dev.txt15
-rw-r--r--test/test_coordinator.py9
7 files changed, 150 insertions, 26 deletions
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..73c3ecf
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,57 @@
+# Some simple testing tasks (sorry, UNIX only).
+
+FLAGS=
+KAFKA_VERSION=0.11.0.1
+SCALA_VERSION=2.12
+
+setup:
+ pip install -r requirements-dev.txt
+ pip install -Ue .
+
+servers/$(KAFKA_VERSION)/kafka-bin:
+ KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) ./build_integration.sh
+
+build-integration: servers/$(KAFKA_VERSION)/kafka-bin
+
+# Test and produce coverage using tox. This is the same as is run on Travis
+test36: build-integration
+ KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) tox -e py36 -- $(FLAGS)
+
+test27: build-integration
+ KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) tox -e py27 -- $(FLAGS)
+
+# Test using py.test directly if you want to use local python. Useful for other
+# platforms that require manual installation for C libraries, ie. Windows.
+test-local: build-integration
+ py.test --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF kafka test
+
+cov-local: build-integration
+ py.test --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
+ --cov-config=.covrc --cov-report html kafka test
+ @echo "open file://`pwd`/htmlcov/index.html"
+
+# Check the readme for syntax errors, which can lead to invalid formatting on
+# PyPi homepage (https://pypi.python.org/pypi/kafka-python)
+check-readme:
+ python setup.py check -rms
+
+clean:
+ rm -rf `find . -name __pycache__`
+ rm -f `find . -type f -name '*.py[co]' `
+ rm -f `find . -type f -name '*~' `
+ rm -f `find . -type f -name '.*~' `
+ rm -f `find . -type f -name '@*' `
+ rm -f `find . -type f -name '#*#' `
+ rm -f `find . -type f -name '*.orig' `
+ rm -f `find . -type f -name '*.rej' `
+ rm -f .coverage
+ rm -rf htmlcov
+ rm -rf docs/_build/
+ rm -rf cover
+ rm -rf dist
+
+doc:
+ make -C docs html
+ @echo "open file://`pwd`/docs/_build/html/index.html"
+
+.PHONY: all test36 test27 test-local cov-local clean doc
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index dd90c2e..d3ee26e 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -134,6 +134,18 @@ class Fetcher(six.Iterator):
self._clean_done_fetch_futures()
return futures
+ def reset_offsets_if_needed(self, partitions):
+ """Lookup and set offsets for any partitions which are awaiting an
+ explicit reset.
+
+ Arguments:
+ partitions (set of TopicPartitions): the partitions to reset
+ """
+ for tp in partitions:
+ # TODO: If there are several offsets to reset, we could submit offset requests in parallel
+ if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
+ self._reset_offset(tp)
+
def _clean_done_fetch_futures(self):
while True:
if not self._fetch_futures:
@@ -168,9 +180,6 @@ class Fetcher(six.Iterator):
" update", tp)
continue
- # TODO: If there are several offsets to reset,
- # we could submit offset requests in parallel
- # for now, each call to _reset_offset will block
if self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
elif self._subscriptions.assignment[tp].committed is None:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index a83d5da..cbfd720 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -585,12 +585,11 @@ class KafkaConsumer(six.Iterator):
dict: Map of topic to list of records (may be empty).
"""
if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_active_group()
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -835,6 +834,8 @@ class KafkaConsumer(six.Iterator):
Returns:
set: {topic, ...}
"""
+ if self._subscription.subscription is None:
+ return None
return self._subscription.subscription.copy()
def unsubscribe(self):
@@ -988,26 +989,34 @@ class KafkaConsumer(six.Iterator):
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined.
"""
- if (self.config['api_version'] >= (0, 8, 1) and
- self.config['group_id'] is not None):
+ # Lookup any positions for partitions which are awaiting reset (which may be the
+ # case if the user called seekToBeginning or seekToEnd. We do this check first to
+ # avoid an unnecessary lookup of committed offsets (which typically occurs when
+ # the user is manually assigning partitions and managing their own offsets).
+ self._fetcher.reset_offsets_if_needed(partitions)
- # Refresh commits for all assigned partitions
- self._coordinator.refresh_committed_offsets_if_needed()
+ if not self._subscription.has_all_fetch_positions():
+ # if we still don't have offsets for all partitions, then we should either seek
+ # to the last committed position or reset using the auto reset policy
+ if (self.config['api_version'] >= (0, 8, 1) and
+ self.config['group_id'] is not None):
+ # first refresh commits for all assigned partitions
+ self._coordinator.refresh_committed_offsets_if_needed()
- # Then, do any offset lookups in case some positions are not known
- self._fetcher.update_fetch_positions(partitions)
+ # Then, do any offset lookups in case some positions are not known
+ self._fetcher.update_fetch_positions(partitions)
def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
self._coordinator.ensure_active_group()
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index af0936c..53b3e1d 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -88,6 +88,7 @@ class BaseCoordinator(object):
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.group_id = self.config['group_id']
self.coordinator_id = None
+ self._find_coordinator_future = None
self.rejoin_needed = True
self.rejoining = False
self.heartbeat = Heartbeat(**self.config)
@@ -195,12 +196,11 @@ class BaseCoordinator(object):
return False
- def ensure_coordinator_known(self):
+ def ensure_coordinator_ready(self):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
while self.coordinator_unknown():
-
# Prior to 0.8.2 there was no group coordinator
# so we will just pick a node at random and treat
# it as the "coordinator"
@@ -210,7 +210,7 @@ class BaseCoordinator(object):
self._client.ready(self.coordinator_id)
continue
- future = self._send_group_coordinator_request()
+ future = self.lookup_coordinator()
self._client.poll(future=future)
if future.failed():
@@ -224,6 +224,16 @@ class BaseCoordinator(object):
else:
raise future.exception # pylint: disable-msg=raising-bad-type
+ def _reset_find_coordinator_future(self, result):
+ self._find_coordinator_future = None
+
+ def lookup_coordinator(self):
+ if self._find_coordinator_future is None:
+ self._find_coordinator_future = self._send_group_coordinator_request()
+
+ self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
+ return self._find_coordinator_future
+
def need_rejoin(self):
"""Check whether the group should be rejoined (e.g. if metadata changes)
@@ -234,6 +244,11 @@ class BaseCoordinator(object):
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
+ # always ensure that the coordinator is ready because we may have been
+ # disconnected when sending heartbeats and does not necessarily require
+ # us to rejoin the group.
+ self.ensure_coordinator_ready()
+
if not self.need_rejoin():
return
@@ -242,7 +257,7 @@ class BaseCoordinator(object):
self.rejoining = True
while self.need_rejoin():
- self.ensure_coordinator_known()
+ self.ensure_coordinator_ready()
# ensure that there are no pending requests to the coordinator.
# This is important in particular to avoid resending a pending
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 84c62df..0328837 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -315,7 +315,7 @@ class ConsumerCoordinator(BaseCoordinator):
return {}
while True:
- self.ensure_coordinator_known()
+ self.ensure_coordinator_ready()
# contact coordinator to fetch committed offsets
future = self._send_offset_fetch_request(partitions)
@@ -353,9 +353,29 @@ class ConsumerCoordinator(BaseCoordinator):
response will be either an Exception or a OffsetCommitResponse
struct. This callback can be used to trigger custom actions when
a commit request completes.
- Returns:
- Future: indicating whether the commit was successful or not
"""
+ if not self.coordinator_unknown():
+ self._do_commit_offsets_async(offsets, callback)
+ else:
+ # we don't know the current coordinator, so try to find it and then
+ # send the commit or fail (we don't want recursive retries which can
+ # cause offset commits to arrive out of order). Note that there may
+ # be multiple offset commits chained to the same coordinator lookup
+ # request. This is fine because the listeners will be invoked in the
+ # same order that they were added. Note also that BaseCoordinator
+ # prevents multiple concurrent coordinator lookup requests.
+ future = self.lookup_coordinator()
+ future.add_callback(self._do_commit_offsets_async, offsets, callback)
+ if callback:
+ future.add_errback(callback)
+
+ # ensure the commit has a chance to be transmitted (without blocking on
+ # its completion). Note that commits are treated as heartbeats by the
+ # coordinator, so there is no need to explicitly allow heartbeats
+ # through delayed task execution.
+ self._client.poll() # no wakeup if we add that feature
+
+ def _do_commit_offsets_async(self, offsets, callback=None):
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
@@ -386,7 +406,7 @@ class ConsumerCoordinator(BaseCoordinator):
return
while True:
- self.ensure_coordinator_known()
+ self.ensure_coordinator_ready()
future = self._send_offset_commit_request(offsets)
self._client.poll(future=future)
diff --git a/requirements-dev.txt b/requirements-dev.txt
new file mode 100644
index 0000000..dd56df6
--- /dev/null
+++ b/requirements-dev.txt
@@ -0,0 +1,15 @@
+flake8==3.4.1
+pytest==3.2.2
+pytest-cov==2.5.1
+pytest-catchlog==1.2.2
+docker-py==1.10.6
+coveralls==1.2.0
+Sphinx==1.6.4
+lz4==0.10.1
+xxhash==1.0.1
+python-snappy==0.5.1
+tox==2.9.1
+pytest-pylint==0.7.1
+# pytest-sugar==0.9.0
+pytest-mock==1.6.3
+sphinx-rtd-theme==0.2.4
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 4115c03..aea2662 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -234,7 +234,7 @@ def test_fetch_committed_offsets(mocker, coordinator):
assert coordinator._client.poll.call_count == 0
# general case -- send offset fetch request, get successful future
- mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, 'ensure_coordinator_ready')
mocker.patch.object(coordinator, '_send_offset_fetch_request',
return_value=Future().success('foobar'))
partitions = [TopicPartition('foobar', 0)]
@@ -295,16 +295,15 @@ def offsets():
def test_commit_offsets_async(mocker, coordinator, offsets):
mocker.patch.object(coordinator._client, 'poll')
- mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
mocker.patch.object(coordinator, '_send_offset_commit_request',
return_value=Future().success('fizzbuzz'))
- ret = coordinator.commit_offsets_async(offsets)
- assert isinstance(ret, Future)
+ coordinator.commit_offsets_async(offsets)
assert coordinator._send_offset_commit_request.call_count == 1
def test_commit_offsets_sync(mocker, coordinator, offsets):
- mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, 'ensure_coordinator_ready')
mocker.patch.object(coordinator, '_send_offset_commit_request',
return_value=Future().success('fizzbuzz'))
cli = coordinator._client