diff options
Diffstat (limited to 'kafka/protocol/fetch.py')
-rw-r--r-- | kafka/protocol/fetch.py | 182 |
1 files changed, 180 insertions, 2 deletions
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index dd3f648..f367848 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -94,6 +94,72 @@ class FetchResponse_v6(Response): SCHEMA = FetchResponse_v5.SCHEMA +class FetchResponse_v7(Response): + """ + Add error_code and session_id to response + """ + API_KEY = 1 + API_VERSION = 7 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('session_id', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('log_start_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('message_set', Bytes))))) + ) + + +class FetchResponse_v8(Response): + API_KEY = 1 + API_VERSION = 8 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v9(Response): + API_KEY = 1 + API_VERSION = 9 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v10(Response): + API_KEY = 1 + API_VERSION = 10 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v11(Response): + API_KEY = 1 + API_VERSION = 11 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('session_id', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('log_start_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('preferred_read_replica', Int32), + ('message_set', Bytes))))) + ) + + class FetchRequest_v0(Request): API_KEY = 1 API_VERSION = 0 @@ -196,13 +262,125 @@ class FetchRequest_v6(Request): SCHEMA = FetchRequest_v5.SCHEMA +class FetchRequest_v7(Request): + """ + Add incremental fetch requests + """ + API_KEY = 1 + API_VERSION = 7 + RESPONSE_TYPE = FetchResponse_v7 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)) + )), + ) + + +class FetchRequest_v8(Request): + """ + bump used to indicate that on quota violation brokers send out responses before throttling. + """ + API_KEY = 1 + API_VERSION = 8 + RESPONSE_TYPE = FetchResponse_v8 + SCHEMA = FetchRequest_v7.SCHEMA + + +class FetchRequest_v9(Request): + """ + adds the current leader epoch (see KIP-320) + """ + API_KEY = 1 + API_VERSION = 9 + RESPONSE_TYPE = FetchResponse_v9 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)), + )), + ) + + +class FetchRequest_v10(Request): + """ + bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_KEY = 1 + API_VERSION = 10 + RESPONSE_TYPE = FetchResponse_v10 + SCHEMA = FetchRequest_v9.SCHEMA + + +class FetchRequest_v11(Request): + """ + added rack ID to support read from followers (KIP-392) + """ + API_KEY = 1 + API_VERSION = 11 + RESPONSE_TYPE = FetchResponse_v11 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)) + )), + ('rack_id', String('utf-8')), + ) + + FetchRequest = [ FetchRequest_v0, FetchRequest_v1, FetchRequest_v2, FetchRequest_v3, FetchRequest_v4, FetchRequest_v5, - FetchRequest_v6 + FetchRequest_v6, FetchRequest_v7, FetchRequest_v8, + FetchRequest_v9, FetchRequest_v10, FetchRequest_v11, ] FetchResponse = [ FetchResponse_v0, FetchResponse_v1, FetchResponse_v2, FetchResponse_v3, FetchResponse_v4, FetchResponse_v5, - FetchResponse_v6 + FetchResponse_v6, FetchResponse_v7, FetchResponse_v8, + FetchResponse_v9, FetchResponse_v10, FetchResponse_v11, ] |