summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-08-07 13:34:50 +0300
committerGitHub <noreply@github.com>2017-08-07 13:34:50 +0300
commit8cf44847bc91a986c98494c4a23be31c368ef4dd (patch)
treefaad0970119f52542b7a09b0db8abbc61166e694
parentda25df6d3c6380e27bf638f3620613d05ac9fd03 (diff)
parent55ded554f9f5b470eeb53500e455ecd87f4d8f87 (diff)
downloadkafka-python-8cf44847bc91a986c98494c4a23be31c368ef4dd.tar.gz
Merge pull request #1161 from dpkp/issue1036_offset_by_time
Added support for offsets_for_times, beginning_offsets and end_offsets APIs.
-rw-r--r--kafka/conn.py3
-rw-r--r--kafka/consumer/fetcher.py241
-rw-r--r--kafka/consumer/group.py109
-rw-r--r--kafka/protocol/offset.py4
-rw-r--r--kafka/structs.py3
-rw-r--r--test/test_consumer_integration.py134
-rw-r--r--test/test_fetcher.py183
7 files changed, 609 insertions, 68 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index ac8bb3d..61d63bf 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -19,6 +19,7 @@ from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
+from kafka.protocol.fetch import FetchRequest
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -886,7 +887,7 @@ class BrokerConnection(object):
def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
- # in descending order. As soon as we find one that works, return it
+ # in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker verion>, <needed struct>)
((0, 11, 0), MetadataRequest[4]),
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 8db89a1..c0d6075 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -14,9 +14,11 @@ from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
-from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
+from kafka.protocol.offset import (
+ OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
+)
from kafka.serializer import Deserializer
-from kafka.structs import TopicPartition
+from kafka.structs import TopicPartition, OffsetAndTimestamp
log = logging.getLogger(__name__)
@@ -48,6 +50,7 @@ class Fetcher(six.Iterator):
'iterator_refetch_records': 1, # undocumented -- interface may change
'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
+ 'retry_backoff_ms': 100
}
def __init__(self, client, subscriptions, metrics, **configs):
@@ -180,6 +183,31 @@ class Fetcher(six.Iterator):
" offset %s", tp, committed)
self._subscriptions.seek(tp, committed)
+ def get_offsets_by_times(self, timestamps, timeout_ms):
+ offsets = self._retrieve_offsets(timestamps, timeout_ms)
+ for tp in timestamps:
+ if tp not in offsets:
+ offsets[tp] = None
+ else:
+ offset, timestamp = offsets[tp]
+ offsets[tp] = OffsetAndTimestamp(offset, timestamp)
+ return offsets
+
+ def beginning_offsets(self, partitions, timeout_ms):
+ return self.beginning_or_end_offset(
+ partitions, OffsetResetStrategy.EARLIEST, timeout_ms)
+
+ def end_offsets(self, partitions, timeout_ms):
+ return self.beginning_or_end_offset(
+ partitions, OffsetResetStrategy.LATEST, timeout_ms)
+
+ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
+ timestamps = dict([(tp, timestamp) for tp in partitions])
+ offsets = self._retrieve_offsets(timestamps, timeout_ms)
+ for tp in timestamps:
+ offsets[tp] = offsets[tp][0]
+ return offsets
+
def _reset_offset(self, partition):
"""Reset offsets for the given partition using the offset reset strategy.
@@ -199,40 +227,64 @@ class Fetcher(six.Iterator):
log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
- offset = self._offset(partition, timestamp)
+ offsets = self._retrieve_offsets({partition: timestamp})
+ if partition not in offsets:
+ raise NoOffsetForPartitionError(partition)
+ offset = offsets[partition][0]
# we might lose the assignment while fetching the offset,
# so check it is still active
if self._subscriptions.is_assigned(partition):
self._subscriptions.seek(partition, offset)
- def _offset(self, partition, timestamp):
- """Fetch a single offset before the given timestamp for the partition.
+ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
+ """Fetch offset for each partition passed in ``timestamps`` map.
- Blocks until offset is obtained, or a non-retriable exception is raised
+ Blocks until offsets are obtained, a non-retriable exception is raised
+ or ``timeout_ms`` passed.
Arguments:
- partition The partition that needs fetching offset.
- timestamp (int): timestamp for fetching offset. -1 for the latest
- available, -2 for the earliest available. Otherwise timestamp
- is treated as epoch seconds.
+ timestamps: {TopicPartition: int} dict with timestamps to fetch
+ offsets by. -1 for the latest available, -2 for the earliest
+ available. Otherwise timestamp is treated as epoch miliseconds.
Returns:
- int: message offset
+ {TopicPartition: (int, int)}: Mapping of partition to
+ retrieved offset and timestamp. If offset does not exist for
+ the provided timestamp, that partition will be missing from
+ this mapping.
"""
- while True:
- future = self._send_offset_request(partition, timestamp)
- self._client.poll(future=future)
+ if not timestamps:
+ return {}
+
+ start_time = time.time()
+ remaining_ms = timeout_ms
+ while remaining_ms > 0:
+ future = self._send_offset_requests(timestamps)
+ self._client.poll(future=future, timeout_ms=remaining_ms)
if future.succeeded():
return future.value
-
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
+ elapsed_ms = (time.time() - start_time) * 1000
+ remaining_ms = timeout_ms - elapsed_ms
+ if remaining_ms < 0:
+ break
+
if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
- self._client.poll(future=refresh_future, sleep=True)
+ self._client.poll(
+ future=refresh_future, sleep=True, timeout_ms=remaining_ms)
+ else:
+ time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+
+ elapsed_ms = (time.time() - start_time) * 1000
+ remaining_ms = timeout_ms - elapsed_ms
+
+ raise Errors.KafkaTimeoutError(
+ "Failed to get offsets by timestamps in %s ms" % timeout_ms)
def _raise_if_offset_out_of_range(self):
"""Check FetchResponses for offset out of range.
@@ -576,73 +628,140 @@ class Fetcher(six.Iterator):
return f.deserialize(topic, bytes_)
return f(bytes_)
- def _send_offset_request(self, partition, timestamp):
- """Fetch a single offset before the given timestamp for the partition.
+ def _send_offset_requests(self, timestamps):
+ """Fetch offsets for each partition in timestamps dict. This may send
+ request to multiple nodes, based on who is Leader for partition.
Arguments:
- partition (TopicPartition): partition that needs fetching offset
- timestamp (int): timestamp for fetching offset
+ timestamps (dict): {TopicPartition: int} mapping of fetching
+ timestamps.
Returns:
- Future: resolves to the corresponding offset
+ Future: resolves to a mapping of retrieved offsets
"""
- node_id = self._client.cluster.leader_for_partition(partition)
- if node_id is None:
- log.debug("Partition %s is unknown for fetching offset,"
- " wait for metadata refresh", partition)
- return Future().failure(Errors.StaleMetadata(partition))
- elif node_id == -1:
- log.debug("Leader for partition %s unavailable for fetching offset,"
- " wait for metadata refresh", partition)
- return Future().failure(Errors.LeaderNotAvailableError(partition))
-
- request = OffsetRequest[0](
- -1, [(partition.topic, [(partition.partition, timestamp, 1)])]
- )
+ timestamps_by_node = collections.defaultdict(dict)
+ for partition, timestamp in six.iteritems(timestamps):
+ node_id = self._client.cluster.leader_for_partition(partition)
+ if node_id is None:
+ self._client.add_topic(partition.topic)
+ log.debug("Partition %s is unknown for fetching offset,"
+ " wait for metadata refresh", partition)
+ return Future().failure(Errors.StaleMetadata(partition))
+ elif node_id == -1:
+ log.debug("Leader for partition %s unavailable for fetching "
+ "offset, wait for metadata refresh", partition)
+ return Future().failure(
+ Errors.LeaderNotAvailableError(partition))
+ else:
+ timestamps_by_node[node_id][partition] = timestamp
+
+ # Aggregate results until we have all
+ list_offsets_future = Future()
+ responses = []
+ node_count = len(timestamps_by_node)
+
+ def on_success(value):
+ responses.append(value)
+ if len(responses) == node_count:
+ offsets = {}
+ for r in responses:
+ offsets.update(r)
+ list_offsets_future.success(offsets)
+
+ def on_fail(err):
+ if not list_offsets_future.is_done:
+ list_offsets_future.failure(err)
+
+ for node_id, timestamps in six.iteritems(timestamps_by_node):
+ _f = self._send_offset_request(node_id, timestamps)
+ _f.add_callback(on_success)
+ _f.add_errback(on_fail)
+ return list_offsets_future
+
+ def _send_offset_request(self, node_id, timestamps):
+ by_topic = collections.defaultdict(list)
+ for tp, timestamp in six.iteritems(timestamps):
+ if self.config['api_version'] >= (0, 10, 1):
+ data = (tp.partition, timestamp)
+ else:
+ data = (tp.partition, timestamp, 1)
+ by_topic[tp.topic].append(data)
+
+ if self.config['api_version'] >= (0, 10, 1):
+ request = OffsetRequest[1](-1, list(six.iteritems(by_topic)))
+ else:
+ request = OffsetRequest[0](-1, list(six.iteritems(by_topic)))
+
# Client returns a future that only fails on network issues
# so create a separate future and attach a callback to update it
# based on response error codes
future = Future()
+
_f = self._client.send(node_id, request)
- _f.add_callback(self._handle_offset_response, partition, future)
+ _f.add_callback(self._handle_offset_response, future)
_f.add_errback(lambda e: future.failure(e))
return future
- def _handle_offset_response(self, partition, future, response):
+ def _handle_offset_response(self, future, response):
"""Callback for the response of the list offset call above.
Arguments:
- partition (TopicPartition): The partition that was fetched
future (Future): the future to update based on response
response (OffsetResponse): response from the server
Raises:
AssertionError: if response does not match partition
"""
- topic, partition_info = response.topics[0]
- assert len(response.topics) == 1 and len(partition_info) == 1, (
- 'OffsetResponse should only be for a single topic-partition')
-
- part, error_code, offsets = partition_info[0]
- assert topic == partition.topic and part == partition.partition, (
- 'OffsetResponse partition does not match OffsetRequest partition')
-
- error_type = Errors.for_code(error_code)
- if error_type is Errors.NoError:
- assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
- offset = offsets[0]
- log.debug("Fetched offset %d for partition %s", offset, partition)
- future.success(offset)
- elif error_type in (Errors.NotLeaderForPartitionError,
- Errors.UnknownTopicOrPartitionError):
- log.debug("Attempt to fetch offsets for partition %s failed due"
- " to obsolete leadership information, retrying.",
- partition)
- future.failure(error_type(partition))
- else:
- log.warning("Attempt to fetch offsets for partition %s failed due to:"
- " %s", partition, error_type)
- future.failure(error_type(partition))
+ timestamp_offset_map = {}
+ for topic, part_data in response.topics:
+ for partition_info in part_data:
+ partition, error_code = partition_info[:2]
+ partition = TopicPartition(topic, partition)
+ error_type = Errors.for_code(error_code)
+ if error_type is Errors.NoError:
+ if response.API_VERSION == 0:
+ offsets = partition_info[2]
+ assert len(offsets) <= 1, 'Expected OffsetResponse with one offset'
+ if not offsets:
+ offset = UNKNOWN_OFFSET
+ else:
+ offset = offsets[0]
+ log.debug("Handling v0 ListOffsetResponse response for %s. "
+ "Fetched offset %s", partition, offset)
+ if offset != UNKNOWN_OFFSET:
+ timestamp_offset_map[partition] = (offset, None)
+ else:
+ timestamp, offset = partition_info[2:]
+ log.debug("Handling ListOffsetResponse response for %s. "
+ "Fetched offset %s, timestamp %s",
+ partition, offset, timestamp)
+ if offset != UNKNOWN_OFFSET:
+ timestamp_offset_map[partition] = (offset, timestamp)
+ elif error_type is Errors.UnsupportedForMessageFormatError:
+ # The message format on the broker side is before 0.10.0,
+ # we simply put None in the response.
+ log.debug("Cannot search by timestamp for partition %s because the"
+ " message format version is before 0.10.0", partition)
+ elif error_type is Errors.NotLeaderForPartitionError:
+ log.debug("Attempt to fetch offsets for partition %s failed due"
+ " to obsolete leadership information, retrying.",
+ partition)
+ future.failure(error_type(partition))
+ return
+ elif error_type is Errors.UnknownTopicOrPartitionError:
+ log.warn("Received unknown topic or partition error in ListOffset "
+ "request for partition %s. The topic/partition " +
+ "may not exist or the user may not have Describe access "
+ "to it.", partition)
+ future.failure(error_type(partition))
+ return
+ else:
+ log.warning("Attempt to fetch offsets for partition %s failed due to:"
+ " %s", partition, error_type)
+ future.failure(error_type(partition))
+ return
+ if not future.is_done:
+ future.success(timestamp_offset_map)
def _fetchable_partitions(self):
fetchable = self._subscriptions.fetchable_partitions()
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 6adb154..54a3711 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -6,7 +6,7 @@ import socket
import sys
import time
-from kafka.errors import KafkaConfigurationError
+from kafka.errors import KafkaConfigurationError, UnsupportedVersionError
from kafka.vendor import six
@@ -861,6 +861,113 @@ class KafkaConsumer(six.Iterator):
metrics[k.group][k.name] = v.value()
return metrics
+ def offsets_for_times(self, timestamps):
+ """Look up the offsets for the given partitions by timestamp. The
+ returned offset for each partition is the earliest offset whose
+ timestamp is greater than or equal to the given timestamp in the
+ corresponding partition.
+
+ This is a blocking call. The consumer does not have to be assigned the
+ partitions.
+
+ If the message format version in a partition is before 0.10.0, i.e.
+ the messages do not have timestamps, ``None`` will be returned for that
+ partition. ``None`` will also be returned for the partition if there
+ are no messages in it.
+
+ Note:
+ This method may block indefinitely if the partition does not exist.
+
+ Arguments:
+ timestamps (dict): ``{TopicPartition: int}`` mapping from partition
+ to the timestamp to look up. Unit should be milliseconds since
+ beginning of the epoch (midnight Jan 1, 1970 (UTC))
+
+ Returns:
+ ``{TopicPartition: OffsetAndTimestamp}``: mapping from partition
+ to the timestamp and offset of the first message with timestamp
+ greater than or equal to the target timestamp.
+
+ Raises:
+ ValueError: If the target timestamp is negative
+ UnsupportedVersionError: If the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: If fetch failed in request_timeout_ms
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ for tp, ts in timestamps.items():
+ timestamps[tp] = int(ts)
+ if ts < 0:
+ raise ValueError(
+ "The target time for partition {} is {}. The target time "
+ "cannot be negative.".format(tp, ts))
+ return self._fetcher.get_offsets_by_times(
+ timestamps, self.config['request_timeout_ms'])
+
+ def beginning_offsets(self, partitions):
+ """Get the first offset for the given partitions.
+
+ This method does not change the current consumer position of the
+ partitions.
+
+ Note:
+ This method may block indefinitely if the partition does not exist.
+
+ Arguments:
+ partitions (list): List of TopicPartition instances to fetch
+ offsets for.
+
+ Returns:
+ ``{TopicPartition: int}``: The earliest available offsets for the
+ given partitions.
+
+ Raises:
+ UnsupportedVersionError: If the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: If fetch failed in request_timeout_ms.
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ offsets = self._fetcher.beginning_offsets(
+ partitions, self.config['request_timeout_ms'])
+ return offsets
+
+ def end_offsets(self, partitions):
+ """Get the last offset for the given partitions. The last offset of a
+ partition is the offset of the upcoming message, i.e. the offset of the
+ last available message + 1.
+
+ This method does not change the current consumer position of the
+ partitions.
+
+ Note:
+ This method may block indefinitely if the partition does not exist.
+
+ Arguments:
+ partitions (list): List of TopicPartition instances to fetch
+ offsets for.
+
+ Returns:
+ ``{TopicPartition: int}``: The end offsets for the given partitions.
+
+ Raises:
+ UnsupportedVersionError: If the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: If fetch failed in request_timeout_ms
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ offsets = self._fetcher.end_offsets(
+ partitions, self.config['request_timeout_ms'])
+ return offsets
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):
diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py
index 8353f8c..5179658 100644
--- a/kafka/protocol/offset.py
+++ b/kafka/protocol/offset.py
@@ -3,6 +3,8 @@ from __future__ import absolute_import
from .api import Request, Response
from .types import Array, Int8, Int16, Int32, Int64, Schema, String
+UNKNOWN_OFFSET = -1
+
class OffsetResetStrategy(object):
LATEST = -1
@@ -91,7 +93,7 @@ class OffsetRequest_v2(Request):
RESPONSE_TYPE = OffsetResponse_v2
SCHEMA = Schema(
('replica_id', Int32),
- ('isolation_level', Int8),
+ ('isolation_level', Int8), # <- added isolation_level
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
diff --git a/kafka/structs.py b/kafka/structs.py
index 48321e7..62f36dd 100644
--- a/kafka/structs.py
+++ b/kafka/structs.py
@@ -74,6 +74,9 @@ PartitionMetadata = namedtuple("PartitionMetadata",
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
["offset", "metadata"])
+OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
+ ["offset", "timestamp"])
+
# Deprecated structs
OffsetAndMessage = namedtuple("OffsetAndMessage",
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 193a570..4b5e78a 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,16 +1,23 @@
import logging
import os
+import time
from six.moves import xrange
import six
from . import unittest
from kafka import (
- KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message
+ KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
+ create_gzip_message, KafkaProducer
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
-from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError
-from kafka.structs import ProduceRequestPayload, TopicPartition
+from kafka.errors import (
+ ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
+ KafkaTimeoutError
+)
+from kafka.structs import (
+ ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
+)
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
@@ -88,6 +95,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
**configs)
return consumer
+ def kafka_producer(self, **configs):
+ brokers = '%s:%d' % (self.server.host, self.server.port)
+ producer = KafkaProducer(
+ bootstrap_servers=brokers, **configs)
+ return producer
+
def test_simple_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -624,3 +637,118 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
fetched_msgs = [next(consumer) for i in range(10)]
self.assertEqual(len(fetched_msgs), 10)
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_offsets_for_time(self):
+ late_time = int(time.time()) * 1000
+ middle_time = late_time - 1000
+ early_time = late_time - 2000
+ tp = TopicPartition(self.topic, 0)
+
+ kafka_producer = self.kafka_producer()
+ early_msg = kafka_producer.send(
+ self.topic, partition=0, value=b"first",
+ timestamp_ms=early_time).get()
+ late_msg = kafka_producer.send(
+ self.topic, partition=0, value=b"last",
+ timestamp_ms=late_time).get()
+
+ consumer = self.kafka_consumer()
+ offsets = consumer.offsets_for_times({tp: early_time})
+ self.assertEqual(len(offsets), 1)
+ self.assertEqual(offsets[tp].offset, early_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, early_time)
+
+ offsets = consumer.offsets_for_times({tp: middle_time})
+ self.assertEqual(offsets[tp].offset, late_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, late_time)
+
+ offsets = consumer.offsets_for_times({tp: late_time})
+ self.assertEqual(offsets[tp].offset, late_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, late_time)
+
+ offsets = consumer.offsets_for_times({})
+ self.assertEqual(offsets, {})
+
+ # Out of bound timestamps check
+
+ offsets = consumer.offsets_for_times({tp: 0})
+ self.assertEqual(offsets[tp].offset, early_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, early_time)
+
+ offsets = consumer.offsets_for_times({tp: 9999999999999})
+ self.assertEqual(offsets[tp], None)
+
+ # Beginning/End offsets
+
+ offsets = consumer.beginning_offsets([tp])
+ self.assertEqual(offsets, {
+ tp: early_msg.offset,
+ })
+ offsets = consumer.end_offsets([tp])
+ self.assertEqual(offsets, {
+ tp: late_msg.offset + 1
+ })
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_offsets_search_many_partitions(self):
+ tp0 = TopicPartition(self.topic, 0)
+ tp1 = TopicPartition(self.topic, 1)
+
+ kafka_producer = self.kafka_producer()
+ send_time = int(time.time() * 1000)
+ p0msg = kafka_producer.send(
+ self.topic, partition=0, value=b"XXX",
+ timestamp_ms=send_time).get()
+ p1msg = kafka_producer.send(
+ self.topic, partition=1, value=b"XXX",
+ timestamp_ms=send_time).get()
+
+ consumer = self.kafka_consumer()
+ offsets = consumer.offsets_for_times({
+ tp0: send_time,
+ tp1: send_time
+ })
+
+ self.assertEqual(offsets, {
+ tp0: OffsetAndTimestamp(p0msg.offset, send_time),
+ tp1: OffsetAndTimestamp(p1msg.offset, send_time)
+ })
+
+ offsets = consumer.beginning_offsets([tp0, tp1])
+ self.assertEqual(offsets, {
+ tp0: p0msg.offset,
+ tp1: p1msg.offset
+ })
+
+ offsets = consumer.end_offsets([tp0, tp1])
+ self.assertEqual(offsets, {
+ tp0: p0msg.offset + 1,
+ tp1: p1msg.offset + 1
+ })
+
+ @kafka_versions('<0.10.1')
+ def test_kafka_consumer_offsets_for_time_old(self):
+ consumer = self.kafka_consumer()
+ tp = TopicPartition(self.topic, 0)
+
+ with self.assertRaises(UnsupportedVersionError):
+ consumer.offsets_for_times({tp: int(time.time())})
+
+ with self.assertRaises(UnsupportedVersionError):
+ consumer.beginning_offsets([tp])
+
+ with self.assertRaises(UnsupportedVersionError):
+ consumer.end_offsets([tp])
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_offsets_for_times_errors(self):
+ consumer = self.kafka_consumer()
+ tp = TopicPartition(self.topic, 0)
+ bad_tp = TopicPartition(self.topic, 100)
+
+ with self.assertRaises(ValueError):
+ consumer.offsets_for_times({tp: -1})
+
+ with self.assertRaises(KafkaTimeoutError):
+ consumer.offsets_for_times({bad_tp: 0})
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index dcfba78..0562ec5 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -3,12 +3,21 @@ from __future__ import absolute_import
import pytest
+import itertools
+from collections import OrderedDict
+
from kafka.client_async import KafkaClient
-from kafka.consumer.fetcher import Fetcher
+from kafka.consumer.fetcher import Fetcher, NoOffsetForPartitionError
from kafka.consumer.subscription_state import SubscriptionState
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest
+from kafka.protocol.offset import OffsetResponse
from kafka.structs import TopicPartition
+from kafka.future import Future
+from kafka.errors import (
+ StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
+ UnknownTopicOrPartitionError
+)
@pytest.fixture
@@ -101,3 +110,175 @@ def test_update_fetch_positions(fetcher, mocker):
fetcher.update_fetch_positions([partition])
assert fetcher._reset_offset.call_count == 0
fetcher._subscriptions.seek.assert_called_with(partition, 123)
+
+
+def test__reset_offset(fetcher, mocker):
+ tp = TopicPartition("topic", 0)
+ fetcher._subscriptions.subscribe(topics="topic")
+ fetcher._subscriptions.assign_from_subscribed([tp])
+ fetcher._subscriptions.need_offset_reset(tp)
+ mocked = mocker.patch.object(fetcher, '_retrieve_offsets')
+
+ mocked.return_value = {}
+ with pytest.raises(NoOffsetForPartitionError):
+ fetcher._reset_offset(tp)
+
+ mocked.return_value = {tp: (1001, None)}
+ fetcher._reset_offset(tp)
+ assert not fetcher._subscriptions.assignment[tp].awaiting_reset
+ assert fetcher._subscriptions.assignment[tp].position == 1001
+
+
+def test__send_offset_requests(fetcher, mocker):
+ tp = TopicPartition("topic_send_offset", 1)
+ mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
+ send_futures = []
+
+ def send_side_effect(*args, **kw):
+ f = Future()
+ send_futures.append(f)
+ return f
+ mocked_send.side_effect = send_side_effect
+
+ mocked_leader = mocker.patch.object(
+ fetcher._client.cluster, "leader_for_partition")
+ # First we report unavailable leader 2 times different ways and later
+ # always as available
+ mocked_leader.side_effect = itertools.chain(
+ [None, -1], itertools.cycle([0]))
+
+ # Leader == None
+ fut = fetcher._send_offset_requests({tp: 0})
+ assert fut.failed()
+ assert isinstance(fut.exception, StaleMetadata)
+ assert not mocked_send.called
+
+ # Leader == -1
+ fut = fetcher._send_offset_requests({tp: 0})
+ assert fut.failed()
+ assert isinstance(fut.exception, LeaderNotAvailableError)
+ assert not mocked_send.called
+
+ # Leader == 0, send failed
+ fut = fetcher._send_offset_requests({tp: 0})
+ assert not fut.is_done
+ assert mocked_send.called
+ # Check that we bound the futures correctly to chain failure
+ send_futures.pop().failure(NotLeaderForPartitionError(tp))
+ assert fut.failed()
+ assert isinstance(fut.exception, NotLeaderForPartitionError)
+
+ # Leader == 0, send success
+ fut = fetcher._send_offset_requests({tp: 0})
+ assert not fut.is_done
+ assert mocked_send.called
+ # Check that we bound the futures correctly to chain success
+ send_futures.pop().success({tp: (10, 10000)})
+ assert fut.succeeded()
+ assert fut.value == {tp: (10, 10000)}
+
+
+def test__send_offset_requests_multiple_nodes(fetcher, mocker):
+ tp1 = TopicPartition("topic_send_offset", 1)
+ tp2 = TopicPartition("topic_send_offset", 2)
+ tp3 = TopicPartition("topic_send_offset", 3)
+ tp4 = TopicPartition("topic_send_offset", 4)
+ mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
+ send_futures = []
+
+ def send_side_effect(node_id, timestamps):
+ f = Future()
+ send_futures.append((node_id, timestamps, f))
+ return f
+ mocked_send.side_effect = send_side_effect
+
+ mocked_leader = mocker.patch.object(
+ fetcher._client.cluster, "leader_for_partition")
+ mocked_leader.side_effect = itertools.cycle([0, 1])
+
+ # -- All node succeeded case
+ tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)])
+ fut = fetcher._send_offset_requests(tss)
+ assert not fut.is_done
+ assert mocked_send.call_count == 2
+
+ req_by_node = {}
+ second_future = None
+ for node, timestamps, f in send_futures:
+ req_by_node[node] = timestamps
+ if node == 0:
+ # Say tp3 does not have any messages so it's missing
+ f.success({tp1: (11, 1001)})
+ else:
+ second_future = f
+ assert req_by_node == {
+ 0: {tp1: 0, tp3: 0},
+ 1: {tp2: 0, tp4: 0}
+ }
+
+ # We only resolved 1 future so far, so result future is not yet ready
+ assert not fut.is_done
+ second_future.success({tp2: (12, 1002), tp4: (14, 1004)})
+ assert fut.succeeded()
+ assert fut.value == {tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)}
+
+ # -- First succeeded second not
+ del send_futures[:]
+ fut = fetcher._send_offset_requests(tss)
+ assert len(send_futures) == 2
+ send_futures[0][2].success({tp1: (11, 1001)})
+ send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1))
+ assert fut.failed()
+ assert isinstance(fut.exception, UnknownTopicOrPartitionError)
+
+ # -- First fails second succeeded
+ del send_futures[:]
+ fut = fetcher._send_offset_requests(tss)
+ assert len(send_futures) == 2
+ send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1))
+ send_futures[1][2].success({tp1: (11, 1001)})
+ assert fut.failed()
+ assert isinstance(fut.exception, UnknownTopicOrPartitionError)
+
+
+def test__handle_offset_response(fetcher, mocker):
+ # Broker returns UnsupportedForMessageFormatError, will omit partition
+ fut = Future()
+ res = OffsetResponse[1]([
+ ("topic", [(0, 43, -1, -1)]),
+ ("topic", [(1, 0, 1000, 9999)])
+ ])
+ fetcher._handle_offset_response(fut, res)
+ assert fut.succeeded()
+ assert fut.value == {TopicPartition("topic", 1): (9999, 1000)}
+
+ # Broker returns NotLeaderForPartitionError
+ fut = Future()
+ res = OffsetResponse[1]([
+ ("topic", [(0, 6, -1, -1)]),
+ ])
+ fetcher._handle_offset_response(fut, res)
+ assert fut.failed()
+ assert isinstance(fut.exception, NotLeaderForPartitionError)
+
+ # Broker returns UnknownTopicOrPartitionError
+ fut = Future()
+ res = OffsetResponse[1]([
+ ("topic", [(0, 3, -1, -1)]),
+ ])
+ fetcher._handle_offset_response(fut, res)
+ assert fut.failed()
+ assert isinstance(fut.exception, UnknownTopicOrPartitionError)
+
+ # Broker returns many errors and 1 result
+ # Will fail on 1st error and return
+ fut = Future()
+ res = OffsetResponse[1]([
+ ("topic", [(0, 43, -1, -1)]),
+ ("topic", [(1, 6, -1, -1)]),
+ ("topic", [(2, 3, -1, -1)]),
+ ("topic", [(3, 0, 1000, 9999)])
+ ])
+ fetcher._handle_offset_response(fut, res)
+ assert fut.failed()
+ assert isinstance(fut.exception, NotLeaderForPartitionError)