diff options
-rw-r--r-- | kafka/client_async.py | 18 | ||||
-rw-r--r-- | kafka/conn.py | 2 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 4 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 29 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 16 | ||||
-rw-r--r-- | kafka/producer/sender.py | 4 | ||||
-rw-r--r-- | kafka/protocol/admin.py | 24 | ||||
-rw-r--r-- | kafka/protocol/commit.py | 104 | ||||
-rw-r--r-- | kafka/protocol/fetch.py | 12 | ||||
-rw-r--r-- | kafka/protocol/group.py | 48 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 16 | ||||
-rw-r--r-- | kafka/protocol/metadata.py | 12 | ||||
-rw-r--r-- | kafka/protocol/offset.py | 13 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 14 | ||||
-rw-r--r-- | test/test_client.py | 20 | ||||
-rw-r--r-- | test/test_client_async.py | 8 | ||||
-rw-r--r-- | test/test_conn.py | 6 | ||||
-rw-r--r-- | test/test_consumer_group.py | 2 | ||||
-rw-r--r-- | test/test_coordinator.py | 67 | ||||
-rw-r--r-- | test/test_fetcher.py | 24 |
20 files changed, 279 insertions, 164 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index b77ead5..907ee0c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -113,7 +113,7 @@ class KafkaClient(object): time.sleep(next_at - now) self._last_bootstrap = time.time() - metadata_request = MetadataRequest([]) + metadata_request = MetadataRequest[0]([]) for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) bootstrap = BrokerConnection(host, port, afi, **self.config) @@ -299,7 +299,7 @@ class KafkaClient(object): # Every request gets a response, except one special case: expect_response = True - if isinstance(request, ProduceRequest) and request.required_acks == 0: + if isinstance(request, tuple(ProduceRequest)) and request.required_acks == 0: expect_response = False return self._conns[node_id].send(request, expect_response=expect_response) @@ -535,7 +535,7 @@ class KafkaClient(object): topics = [] if self._can_send_request(node_id): - request = MetadataRequest(topics) + request = MetadataRequest[0](topics) log.debug("Sending metadata request %s to node %s", request, node_id) future = self.send(node_id, request) future.add_callback(self.cluster.update_metadata) @@ -610,7 +610,7 @@ class KafkaClient(object): import socket from .protocol.admin import ListGroupsRequest from .protocol.commit import ( - OffsetFetchRequest_v0, GroupCoordinatorRequest) + OffsetFetchRequest, GroupCoordinatorRequest) from .protocol.metadata import MetadataRequest # Socket errors are logged as exceptions and can alarm users. Mute them @@ -623,10 +623,10 @@ class KafkaClient(object): log_filter = ConnFilter() test_cases = [ - ('0.9', ListGroupsRequest()), - ('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')), - ('0.8.1', OffsetFetchRequest_v0('kafka-python-default-group', [])), - ('0.8.0', MetadataRequest([])), + ('0.9', ListGroupsRequest[0]()), + ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), + ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), + ('0.8.0', MetadataRequest[0]([])), ] logging.getLogger('kafka.conn').addFilter(log_filter) @@ -634,7 +634,7 @@ class KafkaClient(object): connect(node_id) f = self.send(node_id, request) time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes - metadata = self.send(node_id, MetadataRequest([])) + metadata = self.send(node_id, MetadataRequest[0]([])) self.poll(future=f) self.poll(future=metadata) diff --git a/kafka/conn.py b/kafka/conn.py index dc7dd23..014b340 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -321,7 +321,7 @@ class BrokerConnection(object): # 0.8.2 quirk if (self.config['api_version'] == (0, 8, 2) and - ifr.response_type is GroupCoordinatorResponse and + ifr.response_type is GroupCoordinatorResponse[0] and ifr.correlation_id != 0 and recv_correlation_id == 0): log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse' diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c1f98eb..2883bd8 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -472,7 +472,7 @@ class Fetcher(six.Iterator): " wait for metadata refresh", partition) return Future().failure(Errors.LeaderNotAvailableError(partition)) - request = OffsetRequest( + request = OffsetRequest[0]( -1, [(partition.topic, [(partition.partition, timestamp, 1)])] ) # Client returns a future that only fails on network issues @@ -552,7 +552,7 @@ class Fetcher(six.Iterator): requests = {} for node_id, partition_data in six.iteritems(fetchable): - requests[node_id] = FetchRequest( + requests[node_id] = FetchRequest[0]( -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 3c7ea21..7ff7a04 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -8,8 +8,7 @@ import six import kafka.errors as Errors from kafka.future import Future -from kafka.protocol.commit import (GroupCoordinatorRequest, - OffsetCommitRequest_v2 as OffsetCommitRequest) +from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest) from .heartbeat import Heartbeat @@ -79,8 +78,8 @@ class BaseCoordinator(object): self.config[key] = configs[key] self._client = client - self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.group_id = self.config['group_id'] self.coordinator_id = None self.rejoin_needed = True @@ -269,7 +268,7 @@ class BaseCoordinator(object): # send a join group request to the coordinator log.info("(Re-)joining group %s", self.group_id) - request = JoinGroupRequest( + request = JoinGroupRequest[0]( self.group_id, self.config['session_timeout_ms'], self.member_id, @@ -324,7 +323,7 @@ class BaseCoordinator(object): elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately error = error_type(self.member_id) - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID log.debug("Attempt to join group %s failed due to unknown member id", self.group_id) future.failure(error) @@ -354,7 +353,7 @@ class BaseCoordinator(object): def _on_join_follower(self): # send follower's sync group with an empty assignment - request = SyncGroupRequest( + request = SyncGroupRequest[0]( self.group_id, self.generation, self.member_id, @@ -381,7 +380,7 @@ class BaseCoordinator(object): except Exception as e: return Future().failure(e) - request = SyncGroupRequest( + request = SyncGroupRequest[0]( self.group_id, self.generation, self.member_id, @@ -425,7 +424,7 @@ class BaseCoordinator(object): Errors.IllegalGenerationError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID future.failure(error) elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): @@ -450,7 +449,7 @@ class BaseCoordinator(object): log.debug("Sending group coordinator request for group %s to broker %s", self.group_id, node_id) - request = GroupCoordinatorRequest(self.group_id) + request = GroupCoordinatorRequest[0](self.group_id) future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_group_coordinator_response, future) @@ -514,14 +513,14 @@ class BaseCoordinator(object): if not self.coordinator_unknown() and self.generation > 0: # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. - request = LeaveGroupRequest(self.group_id, self.member_id) + request = LeaveGroupRequest[0](self.group_id, self.member_id) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) future.add_errback(log.error, "LeaveGroup request failed: %s") self._client.poll(future=future) - self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.rejoin_needed = True def _handle_leave_group_response(self, response): @@ -533,7 +532,7 @@ class BaseCoordinator(object): def _send_heartbeat_request(self): """Send a heartbeat request""" - request = HeartbeatRequest(self.group_id, self.generation, self.member_id) + request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id) log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) @@ -569,7 +568,7 @@ class BaseCoordinator(object): elif error_type is Errors.UnknownMemberIdError: log.warning("Heartbeat: local member_id was not recognized;" " this consumer needs to re-join") - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.rejoin_needed = True future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 3ce7570..cd3d48a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -14,9 +14,7 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ConsumerProtocol from .. import errors as Errors from ..future import Future -from ..protocol.commit import ( - OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0, - OffsetFetchRequest_v0, OffsetFetchRequest_v1) +from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest from ..structs import OffsetAndMetadata, TopicPartition from ..util import WeakMethod @@ -430,11 +428,11 @@ class ConsumerCoordinator(BaseCoordinator): offset_data[tp.topic][tp.partition] = offset if self.config['api_version'] >= (0, 9): - request = OffsetCommitRequest_v2( + request = OffsetCommitRequest[2]( self.group_id, self.generation, self.member_id, - OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME, + OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, [( topic, [( partition, @@ -444,7 +442,7 @@ class ConsumerCoordinator(BaseCoordinator): ) for topic, partitions in six.iteritems(offset_data)] ) elif self.config['api_version'] >= (0, 8, 2): - request = OffsetCommitRequest_v1( + request = OffsetCommitRequest[1]( self.group_id, -1, '', [( topic, [( @@ -456,7 +454,7 @@ class ConsumerCoordinator(BaseCoordinator): ) for topic, partitions in six.iteritems(offset_data)] ) elif self.config['api_version'] >= (0, 8, 1): - request = OffsetCommitRequest_v0( + request = OffsetCommitRequest[0]( self.group_id, [( topic, [( @@ -593,12 +591,12 @@ class ConsumerCoordinator(BaseCoordinator): topic_partitions[tp.topic].add(tp.partition) if self.config['api_version'] >= (0, 8, 2): - request = OffsetFetchRequest_v1( + request = OffsetFetchRequest[1]( self.group_id, list(topic_partitions.items()) ) else: - request = OffsetFetchRequest_v0( + request = OffsetFetchRequest[0]( self.group_id, list(topic_partitions.items()) ) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 3cafb26..2201261 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -12,8 +12,6 @@ from ..structs import TopicPartition from ..version import __version__ from ..protocol.produce import ProduceRequest - - log = logging.getLogger(__name__) @@ -258,7 +256,7 @@ class Sender(threading.Thread): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - return ProduceRequest( + return ProduceRequest[0]( required_acks=acks, timeout=timeout, topics=[(topic, list(partition_info.items())) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 56dd042..8c74613 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -2,7 +2,9 @@ from .struct import Struct from .types import Array, Bytes, Int16, Schema, String -class ListGroupsResponse(Struct): +class ListGroupsResponse_v0(Struct): + API_KEY = 16 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('groups', Array( @@ -11,14 +13,20 @@ class ListGroupsResponse(Struct): ) -class ListGroupsRequest(Struct): +class ListGroupsRequest_v0(Struct): API_KEY = 16 API_VERSION = 0 - RESPONSE_TYPE = ListGroupsResponse + RESPONSE_TYPE = ListGroupsResponse_v0 SCHEMA = Schema() -class DescribeGroupsResponse(Struct): +ListGroupsRequest = [ListGroupsRequest_v0] +ListGroupsResponse = [ListGroupsResponse_v0] + + +class DescribeGroupsResponse_v0(Struct): + API_KEY = 15 + API_VERSION = 0 SCHEMA = Schema( ('groups', Array( ('error_code', Int16), @@ -35,10 +43,14 @@ class DescribeGroupsResponse(Struct): ) -class DescribeGroupsRequest(Struct): +class DescribeGroupsRequest_v0(Struct): API_KEY = 15 API_VERSION = 0 - RESPONSE_TYPE = DescribeGroupsResponse + RESPONSE_TYPE = DescribeGroupsResponse_v0 SCHEMA = Schema( ('groups', Array(String('utf-8'))) ) + + +DescribeGroupsRequest = [DescribeGroupsRequest_v0] +DescribeGroupsResponse = [DescribeGroupsResponse_v0] diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index a32f8d3..90a3b76 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -2,7 +2,9 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String -class OffsetCommitResponse(Struct): +class OffsetCommitResponse_v0(Struct): + API_KEY = 8 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -12,15 +14,36 @@ class OffsetCommitResponse(Struct): ) -class OffsetCommitRequest_v2(Struct): +class OffsetCommitResponse_v1(Struct): API_KEY = 8 - API_VERSION = 2 # added retention_time, dropped timestamp - RESPONSE_TYPE = OffsetCommitResponse + API_VERSION = 1 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + +class OffsetCommitResponse_v2(Struct): + API_KEY = 8 + API_VERSION = 2 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + +class OffsetCommitRequest_v0(Struct): + API_KEY = 8 + API_VERSION = 0 # Zookeeper-backed storage + RESPONSE_TYPE = OffsetCommitResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')), - ('consumer_group_generation_id', Int32), - ('consumer_id', String('utf-8')), - ('retention_time', Int64), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( @@ -28,14 +51,12 @@ class OffsetCommitRequest_v2(Struct): ('offset', Int64), ('metadata', String('utf-8')))))) ) - DEFAULT_GENERATION_ID = -1 - DEFAULT_RETENTION_TIME = -1 class OffsetCommitRequest_v1(Struct): API_KEY = 8 API_VERSION = 1 # Kafka-backed storage - RESPONSE_TYPE = OffsetCommitResponse + RESPONSE_TYPE = OffsetCommitResponse_v1 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -50,12 +71,15 @@ class OffsetCommitRequest_v1(Struct): ) -class OffsetCommitRequest_v0(Struct): +class OffsetCommitRequest_v2(Struct): API_KEY = 8 - API_VERSION = 0 # Zookeeper-backed storage - RESPONSE_TYPE = OffsetCommitResponse + API_VERSION = 2 # added retention_time, dropped timestamp + RESPONSE_TYPE = OffsetCommitResponse_v2 SCHEMA = Schema( ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('retention_time', Int64), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( @@ -63,9 +87,19 @@ class OffsetCommitRequest_v0(Struct): ('offset', Int64), ('metadata', String('utf-8')))))) ) + DEFAULT_GENERATION_ID = -1 + DEFAULT_RETENTION_TIME = -1 -class OffsetFetchResponse(Struct): +OffsetCommitRequest = [OffsetCommitRequest_v0, OffsetCommitRequest_v1, + OffsetCommitRequest_v2] +OffsetCommitResponse = [OffsetCommitResponse_v0, OffsetCommitResponse_v1, + OffsetCommitResponse_v2] + + +class OffsetFetchResponse_v0(Struct): + API_KEY = 9 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -77,22 +111,24 @@ class OffsetFetchResponse(Struct): ) -class OffsetFetchRequest_v1(Struct): +class OffsetFetchResponse_v1(Struct): API_KEY = 9 - API_VERSION = 1 # kafka-backed storage - RESPONSE_TYPE = OffsetFetchResponse + API_VERSION = 1 SCHEMA = Schema( - ('consumer_group', String('utf-8')), ('topics', Array( ('topic', String('utf-8')), - ('partitions', Array(Int32)))) + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), + ('error_code', Int16))))) ) class OffsetFetchRequest_v0(Struct): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage - RESPONSE_TYPE = OffsetFetchResponse + RESPONSE_TYPE = OffsetFetchResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -101,7 +137,25 @@ class OffsetFetchRequest_v0(Struct): ) -class GroupCoordinatorResponse(Struct): +class OffsetFetchRequest_v1(Struct): + API_KEY = 9 + API_VERSION = 1 # kafka-backed storage + RESPONSE_TYPE = OffsetFetchResponse_v1 + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))) + ) + + +OffsetFetchRequest = [OffsetFetchRequest_v0, OffsetFetchRequest_v1] +OffsetFetchResponse = [OffsetFetchResponse_v0, OffsetFetchResponse_v1] + + +class GroupCoordinatorResponse_v0(Struct): + API_KEY = 10 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('coordinator_id', Int32), @@ -110,10 +164,14 @@ class GroupCoordinatorResponse(Struct): ) -class GroupCoordinatorRequest(Struct): +class GroupCoordinatorRequest_v0(Struct): API_KEY = 10 API_VERSION = 0 - RESPONSE_TYPE = GroupCoordinatorResponse + RESPONSE_TYPE = GroupCoordinatorResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')) ) + + +GroupCoordinatorRequest = [GroupCoordinatorRequest_v0] +GroupCoordinatorResponse = [GroupCoordinatorResponse_v0] diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index e00c9ab..eeda4e7 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -3,7 +3,9 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String -class FetchResponse(Struct): +class FetchResponse_v0(Struct): + API_KEY = 1 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topics', String('utf-8')), @@ -15,10 +17,10 @@ class FetchResponse(Struct): ) -class FetchRequest(Struct): +class FetchRequest_v0(Struct): API_KEY = 1 API_VERSION = 0 - RESPONSE_TYPE = FetchResponse + RESPONSE_TYPE = FetchResponse_v0 SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), @@ -30,3 +32,7 @@ class FetchRequest(Struct): ('offset', Int64), ('max_bytes', Int32))))) ) + + +FetchRequest = [FetchRequest_v0] +FetchResponse = [FetchResponse_v0] diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index 72de005..97ae5f7 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -2,7 +2,9 @@ from .struct import Struct from .types import Array, Bytes, Int16, Int32, Schema, String -class JoinGroupResponse(Struct): +class JoinGroupResponse_v0(Struct): + API_KEY = 11 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('generation_id', Int32), @@ -15,10 +17,10 @@ class JoinGroupResponse(Struct): ) -class JoinGroupRequest(Struct): +class JoinGroupRequest_v0(Struct): API_KEY = 11 API_VERSION = 0 - RESPONSE_TYPE = JoinGroupResponse + RESPONSE_TYPE = JoinGroupResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('session_timeout', Int32), @@ -31,6 +33,10 @@ class JoinGroupRequest(Struct): UNKNOWN_MEMBER_ID = '' +JoinGroupRequest = [JoinGroupRequest_v0] +JoinGroupResponse = [JoinGroupResponse_v0] + + class ProtocolMetadata(Struct): SCHEMA = Schema( ('version', Int16), @@ -39,17 +45,19 @@ class ProtocolMetadata(Struct): ) -class SyncGroupResponse(Struct): +class SyncGroupResponse_v0(Struct): + API_KEY = 14 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('member_assignment', Bytes) ) -class SyncGroupRequest(Struct): +class SyncGroupRequest_v0(Struct): API_KEY = 14 API_VERSION = 0 - RESPONSE_TYPE = SyncGroupResponse + RESPONSE_TYPE = SyncGroupResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('generation_id', Int32), @@ -60,6 +68,10 @@ class SyncGroupRequest(Struct): ) +SyncGroupRequest = [SyncGroupRequest_v0] +SyncGroupResponse = [SyncGroupResponse_v0] + + class MemberAssignment(Struct): SCHEMA = Schema( ('version', Int16), @@ -70,16 +82,18 @@ class MemberAssignment(Struct): ) -class HeartbeatResponse(Struct): +class HeartbeatResponse_v0(Struct): + API_KEY = 12 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16) ) -class HeartbeatRequest(Struct): +class HeartbeatRequest_v0(Struct): API_KEY = 12 API_VERSION = 0 - RESPONSE_TYPE = HeartbeatResponse + RESPONSE_TYPE = HeartbeatResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('generation_id', Int32), @@ -87,17 +101,27 @@ class HeartbeatRequest(Struct): ) -class LeaveGroupResponse(Struct): +HeartbeatRequest = [HeartbeatRequest_v0] +HeartbeatResponse = [HeartbeatResponse_v0] + + +class LeaveGroupResponse_v0(Struct): + API_KEY = 13 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16) ) -class LeaveGroupRequest(Struct): +class LeaveGroupRequest_v0(Struct): API_KEY = 13 API_VERSION = 0 - RESPONSE_TYPE = LeaveGroupResponse + RESPONSE_TYPE = LeaveGroupResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('member_id', String('utf-8')) ) + + +LeaveGroupRequest = [LeaveGroupRequest_v0] +LeaveGroupResponse = [LeaveGroupResponse_v0] diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index e4745f1..2eddf3b 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -136,7 +136,7 @@ class KafkaProtocol(object): if acks not in (1, 0, -1): raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks) - return kafka.protocol.produce.ProduceRequest( + return kafka.protocol.produce.ProduceRequest[0]( required_acks=acks, timeout=timeout, topics=[( @@ -180,7 +180,7 @@ class KafkaProtocol(object): Return: FetchRequest """ - return kafka.protocol.fetch.FetchRequest( + return kafka.protocol.fetch.FetchRequest[0]( replica_id=-1, max_wait_time=max_wait_time, min_bytes=min_bytes, @@ -212,7 +212,7 @@ class KafkaProtocol(object): @classmethod def encode_offset_request(cls, payloads=()): - return kafka.protocol.offset.OffsetRequest( + return kafka.protocol.offset.OffsetRequest[0]( replica_id=-1, topics=[( topic, @@ -250,7 +250,7 @@ class KafkaProtocol(object): if payloads is not None: topics = payloads - return kafka.protocol.metadata.MetadataRequest(topics) + return kafka.protocol.metadata.MetadataRequest[0](topics) @classmethod def decode_metadata_response(cls, response): @@ -297,7 +297,7 @@ class KafkaProtocol(object): group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequestPayload """ - return kafka.protocol.commit.OffsetCommitRequest_v0( + return kafka.protocol.commit.OffsetCommitRequest[0]( consumer_group=group, topics=[( topic, @@ -337,11 +337,11 @@ class KafkaProtocol(object): from_kafka: bool, default False, set True for Kafka-committed offsets """ if from_kafka: - request_class = kafka.protocol.commit.OffsetFetchRequest_v1 + version = 1 else: - request_class = kafka.protocol.commit.OffsetFetchRequest_v0 + version = 0 - return request_class( + return kafka.protocol.commit.OffsetFetchRequest[version]( consumer_group=group, topics=[( topic, diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 810f1b8..8063dda 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -2,7 +2,9 @@ from .struct import Struct from .types import Array, Int16, Int32, Schema, String -class MetadataResponse(Struct): +class MetadataResponse_v0(Struct): + API_KEY = 3 + API_VERSION = 0 SCHEMA = Schema( ('brokers', Array( ('node_id', Int32), @@ -20,10 +22,14 @@ class MetadataResponse(Struct): ) -class MetadataRequest(Struct): +class MetadataRequest_v0(Struct): API_KEY = 3 API_VERSION = 0 - RESPONSE_TYPE = MetadataResponse + RESPONSE_TYPE = MetadataResponse_v0 SCHEMA = Schema( ('topics', Array(String('utf-8'))) ) + + +MetadataRequest = [MetadataRequest_v0] +MetadataResponse = [MetadataResponse_v0] diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 606f1f1..57bf4ac 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -1,13 +1,16 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String + class OffsetResetStrategy(object): LATEST = -1 EARLIEST = -2 NONE = 0 -class OffsetResponse(Struct): +class OffsetResponse_v0(Struct): + API_KEY = 2 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -18,10 +21,10 @@ class OffsetResponse(Struct): ) -class OffsetRequest(Struct): +class OffsetRequest_v0(Struct): API_KEY = 2 API_VERSION = 0 - RESPONSE_TYPE = OffsetResponse + RESPONSE_TYPE = OffsetResponse_v0 SCHEMA = Schema( ('replica_id', Int32), ('topics', Array( @@ -34,3 +37,7 @@ class OffsetRequest(Struct): DEFAULTS = { 'replica_id': -1 } + + +OffsetRequest = [OffsetRequest_v0] +OffsetResponse = [OffsetResponse_v0] diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index ef2f96e..5753f64 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -1,9 +1,11 @@ from .message import MessageSet from .struct import Struct -from .types import Int8, Int16, Int32, Int64, Bytes, String, Array, Schema +from .types import Int16, Int32, Int64, String, Array, Schema -class ProduceResponse(Struct): +class ProduceResponse_v0(Struct): + API_KEY = 0 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -14,10 +16,10 @@ class ProduceResponse(Struct): ) -class ProduceRequest(Struct): +class ProduceRequest_v0(Struct): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = ProduceResponse + RESPONSE_TYPE = ProduceResponse_v0 SCHEMA = Schema( ('required_acks', Int16), ('timeout', Int32), @@ -27,3 +29,7 @@ class ProduceRequest(Struct): ('partition', Int32), ('messages', MessageSet))))) ) + + +ProduceRequest = [ProduceRequest_v0] +ProduceResponse = [ProduceResponse_v0] diff --git a/test/test_client.py b/test/test_client.py index 42d7dbd..38235fd 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -137,7 +137,7 @@ class TestSimpleClient(unittest.TestCase): (NO_ERROR, 2, 0, [0, 1], [0, 1]) ]) ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) # client loads metadata at init client = SimpleClient(hosts=['broker_1:4567']) @@ -179,7 +179,7 @@ class TestSimpleClient(unittest.TestCase): (NO_LEADER, 1, -1, [], []), ]), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) @@ -209,7 +209,7 @@ class TestSimpleClient(unittest.TestCase): (NO_LEADER, 1, -1, [], []), ]), ] - decode_metadata_response.return_value = MetadataResponse(brokers, topics) + decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) @@ -237,7 +237,7 @@ class TestSimpleClient(unittest.TestCase): topics = [ (NO_LEADER, 'topic_no_partitions', []) ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) @@ -249,7 +249,7 @@ class TestSimpleClient(unittest.TestCase): (NO_ERROR, 0, 0, [0, 1], [0, 1]) ]) ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) # calling _get_leader_for_partition (from any broker aware request) # will try loading metadata again for the same topic @@ -275,7 +275,7 @@ class TestSimpleClient(unittest.TestCase): (NO_LEADER, 'topic_no_partitions', []), (UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) @@ -304,7 +304,7 @@ class TestSimpleClient(unittest.TestCase): (NO_LEADER, 1, -1, [], []), ]), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) self.assertDictEqual( @@ -330,7 +330,7 @@ class TestSimpleClient(unittest.TestCase): (NO_ERROR, 1, 1, [1, 0], [1, 0]) ]), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) @@ -350,7 +350,7 @@ class TestSimpleClient(unittest.TestCase): (NO_LEADER, 1, -1, [], []), ]), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) @@ -375,7 +375,7 @@ class TestSimpleClient(unittest.TestCase): topics = [ (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) diff --git a/test/test_client_async.py b/test/test_client_async.py index eaac8e1..2cf348c 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -37,7 +37,7 @@ def conn(mocker): conn.return_value = conn conn.state = ConnectionStates.CONNECTED conn.send.return_value = Future().success( - MetadataResponse( + MetadataResponse[0]( [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics conn.blacked_out.return_value = False @@ -51,7 +51,7 @@ def test_bootstrap_success(conn): cli = KafkaClient() conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config) conn.connect.assert_called_with() - conn.send.assert_called_once_with(MetadataRequest([])) + conn.send.assert_called_once_with(MetadataRequest[0]([])) assert cli._bootstrap_fails == 0 assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12), BrokerMetadata(1, 'bar', 34)]) @@ -230,12 +230,12 @@ def test_send(conn): conn.state = ConnectionStates.CONNECTED cli._maybe_connect(0) # ProduceRequest w/ 0 required_acks -> no response - request = ProduceRequest(0, 0, []) + request = ProduceRequest[0](0, 0, []) ret = cli.send(0, request) assert conn.send.called_with(request, expect_response=False) assert isinstance(ret, Future) - request = MetadataRequest([]) + request = MetadataRequest[0]([]) cli.send(0, request) assert conn.send.called_with(request, expect_response=True) diff --git a/test/test_conn.py b/test/test_conn.py index 5432ebd..a55e39b 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -111,7 +111,7 @@ def test_send_max_ifr(conn): def test_send_no_response(socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED - req = MetadataRequest([]) + req = MetadataRequest[0]([]) header = RequestHeader(req, client_id=conn.config['client_id']) payload_bytes = len(header.encode()) + len(req.encode()) third = payload_bytes // 3 @@ -128,7 +128,7 @@ def test_send_no_response(socket, conn): def test_send_response(socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED - req = MetadataRequest([]) + req = MetadataRequest[0]([]) header = RequestHeader(req, client_id=conn.config['client_id']) payload_bytes = len(header.encode()) + len(req.encode()) third = payload_bytes // 3 @@ -144,7 +144,7 @@ def test_send_response(socket, conn): def test_send_error(socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED - req = MetadataRequest([]) + req = MetadataRequest[0]([]) header = RequestHeader(req, client_id=conn.config['client_id']) try: error = ConnectionError diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index c02eddc..fe66d2b 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -146,7 +146,7 @@ def conn(mocker): conn.return_value = conn conn.state = ConnectionStates.CONNECTED conn.send.return_value = Future().success( - MetadataResponse( + MetadataResponse[0]( [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics return conn diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 1dc7788..629b72f 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -16,9 +16,8 @@ from kafka.conn import ConnectionStates import kafka.errors as Errors from kafka.future import Future from kafka.protocol.commit import ( - OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2, - OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1, - OffsetFetchResponse) + OffsetCommitRequest, OffsetCommitResponse, + OffsetFetchRequest, OffsetFetchResponse) from kafka.protocol.metadata import MetadataResponse from kafka.util import WeakMethod @@ -29,7 +28,7 @@ def conn(mocker): conn.return_value = conn conn.state = ConnectionStates.CONNECTED conn.send.return_value = Future().success( - MetadataResponse( + MetadataResponse[0]( [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics return conn @@ -98,7 +97,7 @@ def test_pattern_subscription(coordinator, api_version): assert coordinator._subscription.needs_partition_assignment is False cluster = coordinator._client.cluster - cluster.update_metadata(MetadataResponse( + cluster.update_metadata(MetadataResponse[0]( # brokers [(0, 'foo', 12), (1, 'bar', 34)], # topics @@ -428,9 +427,9 @@ def test_send_offset_commit_request_fail(patched_coord, offsets): @pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetCommitRequest_v0), - ((0, 8, 2), OffsetCommitRequest_v1), - ((0, 9), OffsetCommitRequest_v2)]) + ((0, 8, 1), OffsetCommitRequest[0]), + ((0, 8, 2), OffsetCommitRequest[1]), + ((0, 9), OffsetCommitRequest[2])]) def test_send_offset_commit_request_versions(patched_coord, offsets, api_version, req_type): # assuming fixture sets coordinator=0, least_loaded_node=1 @@ -460,36 +459,36 @@ def test_send_offset_commit_request_success(patched_coord, offsets): patched_coord._client.send.return_value = _f future = patched_coord._send_offset_commit_request(offsets) (node, request), _ = patched_coord._client.send.call_args - response = OffsetCommitResponse([('foobar', [(0, 0), (1, 0)])]) + response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])]) _f.success(response) patched_coord._handle_offset_commit_response.assert_called_with( offsets, future, response) @pytest.mark.parametrize('response,error,dead,reassign', [ - (OffsetCommitResponse([('foobar', [(0, 30), (1, 30)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]), Errors.GroupAuthorizationFailedError, False, False), - (OffsetCommitResponse([('foobar', [(0, 12), (1, 12)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]), Errors.OffsetMetadataTooLargeError, False, False), - (OffsetCommitResponse([('foobar', [(0, 28), (1, 28)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]), Errors.InvalidCommitOffsetSizeError, False, False), - (OffsetCommitResponse([('foobar', [(0, 14), (1, 14)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]), Errors.GroupLoadInProgressError, False, False), - (OffsetCommitResponse([('foobar', [(0, 15), (1, 15)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]), Errors.GroupCoordinatorNotAvailableError, True, False), - (OffsetCommitResponse([('foobar', [(0, 16), (1, 16)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]), Errors.NotCoordinatorForGroupError, True, False), - (OffsetCommitResponse([('foobar', [(0, 7), (1, 7)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]), Errors.RequestTimedOutError, True, False), - (OffsetCommitResponse([('foobar', [(0, 25), (1, 25)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]), Errors.CommitFailedError, False, True), - (OffsetCommitResponse([('foobar', [(0, 22), (1, 22)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]), Errors.CommitFailedError, False, True), - (OffsetCommitResponse([('foobar', [(0, 27), (1, 27)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]), Errors.CommitFailedError, False, True), - (OffsetCommitResponse([('foobar', [(0, 17), (1, 17)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]), Errors.InvalidTopicError, False, False), - (OffsetCommitResponse([('foobar', [(0, 29), (1, 29)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]), Errors.TopicAuthorizationFailedError, False, False), ]) def test_handle_offset_commit_response(patched_coord, offsets, @@ -523,9 +522,9 @@ def test_send_offset_fetch_request_fail(patched_coord, partitions): @pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetFetchRequest_v0), - ((0, 8, 2), OffsetFetchRequest_v1), - ((0, 9), OffsetFetchRequest_v1)]) + ((0, 8, 1), OffsetFetchRequest[0]), + ((0, 8, 2), OffsetFetchRequest[1]), + ((0, 9), OffsetFetchRequest[1])]) def test_send_offset_fetch_request_versions(patched_coord, partitions, api_version, req_type): # assuming fixture sets coordinator=0, least_loaded_node=1 @@ -555,30 +554,30 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): patched_coord._client.send.return_value = _f future = patched_coord._send_offset_fetch_request(partitions) (node, request), _ = patched_coord._client.send.call_args - response = OffsetFetchResponse([('foobar', [(0, 0), (1, 0)])]) + response = OffsetFetchResponse[0]([('foobar', [(0, 0), (1, 0)])]) _f.success(response) patched_coord._handle_offset_fetch_response.assert_called_with( future, response) @pytest.mark.parametrize('response,error,dead,reassign', [ - #(OffsetFetchResponse([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]), + #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]), # Errors.GroupAuthorizationFailedError, False, False), - #(OffsetFetchResponse([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]), + #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]), # Errors.RequestTimedOutError, True, False), - #(OffsetFetchResponse([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]), + #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]), # Errors.RebalanceInProgressError, False, True), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), Errors.GroupLoadInProgressError, False, False), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), Errors.NotCoordinatorForGroupError, True, False), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), Errors.UnknownMemberIdError, False, True), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), Errors.IllegalGenerationError, False, True), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), Errors.TopicAuthorizationFailedError, False, False), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), None, False, False), ]) def test_handle_offset_fetch_response(patched_coord, offsets, diff --git a/test/test_fetcher.py b/test/test_fetcher.py index cdd324f..644adfa 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -34,17 +34,19 @@ def fetcher(client, subscription_state): def test_init_fetches(fetcher, mocker): fetch_requests = [ - FetchRequest(-1, fetcher.config['fetch_max_wait_ms'], - fetcher.config['fetch_min_bytes'], - [('foobar', [ - (0, 0, fetcher.config['max_partition_fetch_bytes']), - (1, 0, fetcher.config['max_partition_fetch_bytes']), - ])]), - FetchRequest(-1, fetcher.config['fetch_max_wait_ms'], - fetcher.config['fetch_min_bytes'], - [('foobar', [ - (2, 0, fetcher.config['max_partition_fetch_bytes']), - ])]) + FetchRequest[0]( + -1, fetcher.config['fetch_max_wait_ms'], + fetcher.config['fetch_min_bytes'], + [('foobar', [ + (0, 0, fetcher.config['max_partition_fetch_bytes']), + (1, 0, fetcher.config['max_partition_fetch_bytes']), + ])]), + FetchRequest[0]( + -1, fetcher.config['fetch_max_wait_ms'], + fetcher.config['fetch_min_bytes'], + [('foobar', [ + (2, 0, fetcher.config['max_partition_fetch_bytes']), + ])]) ] mocker.patch.object(fetcher, '_create_fetch_requests', |