summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py18
1 files changed, 9 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b77ead5..907ee0c 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -113,7 +113,7 @@ class KafkaClient(object):
time.sleep(next_at - now)
self._last_bootstrap = time.time()
- metadata_request = MetadataRequest([])
+ metadata_request = MetadataRequest[0]([])
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
bootstrap = BrokerConnection(host, port, afi, **self.config)
@@ -299,7 +299,7 @@ class KafkaClient(object):
# Every request gets a response, except one special case:
expect_response = True
- if isinstance(request, ProduceRequest) and request.required_acks == 0:
+ if isinstance(request, tuple(ProduceRequest)) and request.required_acks == 0:
expect_response = False
return self._conns[node_id].send(request, expect_response=expect_response)
@@ -535,7 +535,7 @@ class KafkaClient(object):
topics = []
if self._can_send_request(node_id):
- request = MetadataRequest(topics)
+ request = MetadataRequest[0](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request)
future.add_callback(self.cluster.update_metadata)
@@ -610,7 +610,7 @@ class KafkaClient(object):
import socket
from .protocol.admin import ListGroupsRequest
from .protocol.commit import (
- OffsetFetchRequest_v0, GroupCoordinatorRequest)
+ OffsetFetchRequest, GroupCoordinatorRequest)
from .protocol.metadata import MetadataRequest
# Socket errors are logged as exceptions and can alarm users. Mute them
@@ -623,10 +623,10 @@ class KafkaClient(object):
log_filter = ConnFilter()
test_cases = [
- ('0.9', ListGroupsRequest()),
- ('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')),
- ('0.8.1', OffsetFetchRequest_v0('kafka-python-default-group', [])),
- ('0.8.0', MetadataRequest([])),
+ ('0.9', ListGroupsRequest[0]()),
+ ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
+ ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
+ ('0.8.0', MetadataRequest[0]([])),
]
logging.getLogger('kafka.conn').addFilter(log_filter)
@@ -634,7 +634,7 @@ class KafkaClient(object):
connect(node_id)
f = self.send(node_id, request)
time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes
- metadata = self.send(node_id, MetadataRequest([]))
+ metadata = self.send(node_id, MetadataRequest[0]([]))
self.poll(future=f)
self.poll(future=metadata)