diff options
author | Dana Powers <dana.powers@rd.io> | 2015-11-29 10:00:50 +0800 |
---|---|---|
committer | Zack Dever <zack.dever@rd.io> | 2015-12-04 11:25:40 -0800 |
commit | 058567912e8d82c1da5e5ead9e30be532573a173 (patch) | |
tree | eb5cdd9d7c25729441cfa097772ca0623e4cbfe0 | |
parent | a85e09df89a43de5b659a0fa4ed35bec37c60e04 (diff) | |
download | kafka-python-058567912e8d82c1da5e5ead9e30be532573a173.tar.gz |
Add simple BrokerConnection class; add request.RESPONSE_TYPE class vars
-rw-r--r-- | kafka/conn.py | 36 | ||||
-rw-r--r-- | kafka/protocol/commit.py | 42 | ||||
-rw-r--r-- | kafka/protocol/fetch.py | 24 | ||||
-rw-r--r-- | kafka/protocol/metadata.py | 17 | ||||
-rw-r--r-- | kafka/protocol/offset.py | 23 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 17 |
6 files changed, 101 insertions, 58 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 9514e48..0602d70 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -8,6 +8,8 @@ from threading import local import six from kafka.common import ConnectionError +from kafka.protocol.api import RequestHeader +from kafka.protocol.types import Int32 log = logging.getLogger(__name__) @@ -16,6 +18,40 @@ DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 DEFAULT_KAFKA_PORT = 9092 +class BrokerConnection(local): + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + super(BrokerConnection, self).__init__() + self.host = host + self.port = port + self.timeout = timeout + self._sock = socket.create_connection((host, port), timeout) + self.fd = self._sock.makefile(mode='+') + self.correlation_id = 0 + + def close(self): + self.fd.close() + self._sock.close() + + def send(self, request): + self.correlation_id += 1 + header = RequestHeader(request, correlation_id=self.correlation_id) + message = b''.join([header.encode(), request.encode()]) + size = Int32.encode(len(message)) + self.fd.write(size) + self.fd.write(message) + self.fd.flush() + + size = Int32.decode(self.fd) + correlation_id = Int32.decode(self.fd) + return request.RESPONSE_TYPE.decode(self.fd) + + def __getnewargs__(self): + return (self.host, self.port, self.timeout) + + def __repr__(self): + return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) + + def collect_hosts(hosts, randomize=True): """ Collects a comma-separated set of hosts (host:port) and optionally diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index 5ba0227..2955de1 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -2,9 +2,20 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String +class OffsetCommitResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + class OffsetCommitRequest_v2(Struct): API_KEY = 8 API_VERSION = 2 # added retention_time, dropped timestamp + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -22,6 +33,7 @@ class OffsetCommitRequest_v2(Struct): class OffsetCommitRequest_v1(Struct): API_KEY = 8 API_VERSION = 1 # Kafka-backed storage + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -39,6 +51,7 @@ class OffsetCommitRequest_v1(Struct): class OffsetCommitRequest_v0(Struct): API_KEY = 8 API_VERSION = 0 # Zookeeper-backed storage + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -50,12 +63,14 @@ class OffsetCommitRequest_v0(Struct): ) -class OffsetCommitResponse(Struct): +class OffsetFetchResponse(Struct): SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), ('error_code', Int16))))) ) @@ -63,6 +78,7 @@ class OffsetCommitResponse(Struct): class OffsetFetchRequest_v1(Struct): API_KEY = 9 API_VERSION = 1 # kafka-backed storage + RESPONSE_TYPE = OffsetFetchResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -74,6 +90,7 @@ class OffsetFetchRequest_v1(Struct): class OffsetFetchRequest_v0(Struct): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage + RESPONSE_TYPE = OffsetFetchResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -82,30 +99,19 @@ class OffsetFetchRequest_v0(Struct): ) -class OffsetFetchResponse(Struct): +class GroupCoordinatorResponse(Struct): SCHEMA = Schema( - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('metadata', String('utf-8')), - ('error_code', Int16))))) + ('error_code', Int16), + ('coordinator_id', Int32), + ('host', String('utf-8')), + ('port', Int32) ) class GroupCoordinatorRequest(Struct): API_KEY = 10 API_VERSION = 0 + RESPONSE_TYPE = GroupCoordinatorResponse SCHEMA = Schema( ('consumer_group', String('utf-8')) ) - - -class GroupCoordinatorResponse(Struct): - SCHEMA = Schema( - ('error_code', Int16), - ('coordinator_id', Int32), - ('host', String('utf-8')), - ('port', Int32) - ) diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index c6d60cc..e00c9ab 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -3,9 +3,22 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String +class FetchResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('message_set', MessageSet))))) + ) + + class FetchRequest(Struct): API_KEY = 1 API_VERSION = 0 + RESPONSE_TYPE = FetchResponse SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), @@ -17,14 +30,3 @@ class FetchRequest(Struct): ('offset', Int64), ('max_bytes', Int32))))) ) - -class FetchResponse(Struct): - SCHEMA = Schema( - ('topics', Array( - ('topics', String('utf-8')), - ('partitions', Array( - ('partition', Int32), - ('error_code', Int16), - ('highwater_offset', Int64), - ('message_set', MessageSet))))) - ) diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index b35e7ef..810f1b8 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -2,14 +2,6 @@ from .struct import Struct from .types import Array, Int16, Int32, Schema, String -class MetadataRequest(Struct): - API_KEY = 3 - API_VERSION = 0 - SCHEMA = Schema( - ('topics', Array(String('utf-8'))) - ) - - class MetadataResponse(Struct): SCHEMA = Schema( ('brokers', Array( @@ -26,3 +18,12 @@ class MetadataResponse(Struct): ('replicas', Array(Int32)), ('isr', Array(Int32)))))) ) + + +class MetadataRequest(Struct): + API_KEY = 3 + API_VERSION = 0 + RESPONSE_TYPE = MetadataResponse + SCHEMA = Schema( + ('topics', Array(String('utf-8'))) + ) diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 942bdbf..776de39 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -2,31 +2,30 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String -class OffsetRequest(Struct): - API_KEY = 2 - API_VERSION = 0 +class OffsetResponse(Struct): SCHEMA = Schema( - ('replica_id', Int32), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('time', Int64), - ('max_offsets', Int32))))) + ('error_code', Int16), + ('offsets', Array(Int64)))))) ) - DEFAULTS = { - 'replica_id': -1 - } -class OffsetResponse(Struct): +class OffsetRequest(Struct): API_KEY = 2 API_VERSION = 0 + RESPONSE_TYPE = OffsetResponse SCHEMA = Schema( + ('replica_id', Int32), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('error_code', Int16), - ('offsets', Array(Int64)))))) + ('time', Int64), + ('max_offsets', Int32))))) ) + DEFAULTS = { + 'replica_id': -1 + } diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 532a702..ef2f96e 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -3,28 +3,27 @@ from .struct import Struct from .types import Int8, Int16, Int32, Int64, Bytes, String, Array, Schema -class ProduceRequest(Struct): - API_KEY = 0 - API_VERSION = 0 +class ProduceResponse(Struct): SCHEMA = Schema( - ('required_acks', Int16), - ('timeout', Int32), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('messages', MessageSet))))) + ('error_code', Int16), + ('offset', Int64))))) ) -class ProduceResponse(Struct): +class ProduceRequest(Struct): API_KEY = 0 API_VERSION = 0 + RESPONSE_TYPE = ProduceResponse SCHEMA = Schema( + ('required_acks', Int16), + ('timeout', Int32), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('error_code', Int16), - ('offset', Int64))))) + ('messages', MessageSet))))) ) |