summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 22:54:28 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 22:54:28 -0700
commit90c729438a2e3f1b194e58231e41bd16bd7b7172 (patch)
treeb22cef6b10fd167fb22b8318e1294f6137427f3b /kafka/client_async.py
parent452e7c2190b83f320f58e7f650302696dde458ed (diff)
downloadkafka-python-protocol_versions.tar.gz
Use version-indexed lists for request/response protocol structsprotocol_versions
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py18
1 files changed, 9 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b77ead5..907ee0c 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -113,7 +113,7 @@ class KafkaClient(object):
time.sleep(next_at - now)
self._last_bootstrap = time.time()
- metadata_request = MetadataRequest([])
+ metadata_request = MetadataRequest[0]([])
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
bootstrap = BrokerConnection(host, port, afi, **self.config)
@@ -299,7 +299,7 @@ class KafkaClient(object):
# Every request gets a response, except one special case:
expect_response = True
- if isinstance(request, ProduceRequest) and request.required_acks == 0:
+ if isinstance(request, tuple(ProduceRequest)) and request.required_acks == 0:
expect_response = False
return self._conns[node_id].send(request, expect_response=expect_response)
@@ -535,7 +535,7 @@ class KafkaClient(object):
topics = []
if self._can_send_request(node_id):
- request = MetadataRequest(topics)
+ request = MetadataRequest[0](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request)
future.add_callback(self.cluster.update_metadata)
@@ -610,7 +610,7 @@ class KafkaClient(object):
import socket
from .protocol.admin import ListGroupsRequest
from .protocol.commit import (
- OffsetFetchRequest_v0, GroupCoordinatorRequest)
+ OffsetFetchRequest, GroupCoordinatorRequest)
from .protocol.metadata import MetadataRequest
# Socket errors are logged as exceptions and can alarm users. Mute them
@@ -623,10 +623,10 @@ class KafkaClient(object):
log_filter = ConnFilter()
test_cases = [
- ('0.9', ListGroupsRequest()),
- ('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')),
- ('0.8.1', OffsetFetchRequest_v0('kafka-python-default-group', [])),
- ('0.8.0', MetadataRequest([])),
+ ('0.9', ListGroupsRequest[0]()),
+ ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
+ ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
+ ('0.8.0', MetadataRequest[0]([])),
]
logging.getLogger('kafka.conn').addFilter(log_filter)
@@ -634,7 +634,7 @@ class KafkaClient(object):
connect(node_id)
f = self.send(node_id, request)
time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes
- metadata = self.send(node_id, MetadataRequest([]))
+ metadata = self.send(node_id, MetadataRequest[0]([]))
self.poll(future=f)
self.poll(future=metadata)