diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-06-19 10:11:53 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-06-19 10:12:07 -0700 |
commit | 3a67f641683102425d08a9572cad4f358f251fa8 (patch) | |
tree | 7af6b9badd7ea438c12d7b3af1b4664c0ab0d932 | |
parent | bbbac3dc3678df069ef72ecfea62d435bc519a07 (diff) | |
download | kafka-python-3a67f641683102425d08a9572cad4f358f251fa8.tar.gz |
Add new FetchRequest v4/v5, MetadataRequest v3/v4
-rw-r--r-- | kafka/protocol/fetch.py | 94 | ||||
-rw-r--r-- | kafka/protocol/metadata.py | 63 |
2 files changed, 150 insertions, 7 deletions
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index b441e63..6a61300 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from .api import Request, Response from .message import MessageSet -from .types import Array, Int16, Int32, Int64, Schema, String +from .types import Array, Int8, Int16, Int32, Int64, Schema, String class FetchResponse_v0(Response): @@ -46,6 +46,45 @@ class FetchResponse_v3(Response): SCHEMA = FetchResponse_v2.SCHEMA +class FetchResponse_v4(Response): + API_KEY = 1 + API_VERSION = 4 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('message_set', MessageSet))))) + ) + + +class FetchResponse_v5(Response): + API_KEY = 1 + API_VERSION = 5 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('log_start_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('message_set', MessageSet))))) + ) + + class FetchRequest_v0(Request): API_KEY = 1 API_VERSION = 0 @@ -95,7 +134,52 @@ class FetchRequest_v3(Request): ) -FetchRequest = [FetchRequest_v0, FetchRequest_v1, FetchRequest_v2, - FetchRequest_v3] -FetchResponse = [FetchResponse_v0, FetchResponse_v1, FetchResponse_v2, - FetchResponse_v3] +class FetchRequest_v4(Request): + """Adds isolation_level field""" + API_KEY = 1 + API_VERSION = 4 + RESPONSE_TYPE = FetchResponse_v4 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('max_bytes', Int32))))) + ) + + +class FetchRequest_v5(Request): + """This may only be used in broker-broker api calls""" + API_KEY = 1 + API_VERSION = 5 + RESPONSE_TYPE = FetchResponse_v5 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))) + ) + + +FetchRequest = [ + FetchRequest_v0, FetchRequest_v1, FetchRequest_v2, + FetchRequest_v3, FetchRequest_v4, FetchRequest_v5 +] +FetchResponse = [ + FetchResponse_v0, FetchResponse_v1, FetchResponse_v2, + FetchResponse_v3, FetchResponse_v4, FetchResponse_v5 +] diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 907ec25..4db9acf 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -71,6 +71,37 @@ class MetadataResponse_v2(Response): ) +class MetadataResponse_v3(Response): + API_KEY = 3 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('brokers', Array( + ('node_id', Int32), + ('host', String('utf-8')), + ('port', Int32), + ('rack', String('utf-8')))), + ('cluster_id', String('utf-8')), # <-- Added cluster_id field in v2 + ('controller_id', Int32), + ('topics', Array( + ('error_code', Int16), + ('topic', String('utf-8')), + ('is_internal', Boolean), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('leader', Int32), + ('replicas', Array(Int32)), + ('isr', Array(Int32)))))) + ) + + +class MetadataResponse_v4(Response): + API_KEY = 3 + API_VERSION = 4 + SCHEMA = MetadataResponse_v3.SCHEMA + + class MetadataRequest_v0(Request): API_KEY = 3 API_VERSION = 0 @@ -95,8 +126,36 @@ class MetadataRequest_v2(Request): API_VERSION = 2 RESPONSE_TYPE = MetadataResponse_v2 SCHEMA = MetadataRequest_v1.SCHEMA + ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics + NO_TOPICS = None # Empty array (len 0) for topics returns no topics + + +class MetadataRequest_v3(Request): + API_KEY = 3 + API_VERSION = 3 + RESPONSE_TYPE = MetadataResponse_v3 + SCHEMA = MetadataRequest_v1.SCHEMA + ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics + NO_TOPICS = None # Empty array (len 0) for topics returns no topics + + +class MetadataRequest_v4(Request): + API_KEY = 3 + API_VERSION = 4 + RESPONSE_TYPE = MetadataResponse_v4 + SCHEMA = Schema( + ('topics', Array(String('utf-8'))), + ('allow_auto_topic_creation', Boolean) + ) + ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics + NO_TOPICS = None # Empty array (len 0) for topics returns no topics -MetadataRequest = [MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2] +MetadataRequest = [ + MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2, + MetadataRequest_v3, MetadataRequest_v4 +] MetadataResponse = [ - MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2] + MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2, + MetadataResponse_v3, MetadataResponse_v4 +] |