diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 22:54:28 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 22:54:28 -0700 |
commit | 90c729438a2e3f1b194e58231e41bd16bd7b7172 (patch) | |
tree | b22cef6b10fd167fb22b8318e1294f6137427f3b /kafka/client_async.py | |
parent | 452e7c2190b83f320f58e7f650302696dde458ed (diff) | |
download | kafka-python-protocol_versions.tar.gz |
Use version-indexed lists for request/response protocol structsprotocol_versions
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index b77ead5..907ee0c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -113,7 +113,7 @@ class KafkaClient(object): time.sleep(next_at - now) self._last_bootstrap = time.time() - metadata_request = MetadataRequest([]) + metadata_request = MetadataRequest[0]([]) for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) bootstrap = BrokerConnection(host, port, afi, **self.config) @@ -299,7 +299,7 @@ class KafkaClient(object): # Every request gets a response, except one special case: expect_response = True - if isinstance(request, ProduceRequest) and request.required_acks == 0: + if isinstance(request, tuple(ProduceRequest)) and request.required_acks == 0: expect_response = False return self._conns[node_id].send(request, expect_response=expect_response) @@ -535,7 +535,7 @@ class KafkaClient(object): topics = [] if self._can_send_request(node_id): - request = MetadataRequest(topics) + request = MetadataRequest[0](topics) log.debug("Sending metadata request %s to node %s", request, node_id) future = self.send(node_id, request) future.add_callback(self.cluster.update_metadata) @@ -610,7 +610,7 @@ class KafkaClient(object): import socket from .protocol.admin import ListGroupsRequest from .protocol.commit import ( - OffsetFetchRequest_v0, GroupCoordinatorRequest) + OffsetFetchRequest, GroupCoordinatorRequest) from .protocol.metadata import MetadataRequest # Socket errors are logged as exceptions and can alarm users. Mute them @@ -623,10 +623,10 @@ class KafkaClient(object): log_filter = ConnFilter() test_cases = [ - ('0.9', ListGroupsRequest()), - ('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')), - ('0.8.1', OffsetFetchRequest_v0('kafka-python-default-group', [])), - ('0.8.0', MetadataRequest([])), + ('0.9', ListGroupsRequest[0]()), + ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), + ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), + ('0.8.0', MetadataRequest[0]([])), ] logging.getLogger('kafka.conn').addFilter(log_filter) @@ -634,7 +634,7 @@ class KafkaClient(object): connect(node_id) f = self.send(node_id, request) time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes - metadata = self.send(node_id, MetadataRequest([])) + metadata = self.send(node_id, MetadataRequest[0]([])) self.poll(future=f) self.poll(future=metadata) |