From 9b07bfb5298f961b965ee4a295b0bceb52803852 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 3 Jan 2016 16:06:35 -0800 Subject: Check for 0.8.2 GroupCoordinator quirk in BrokerConnection --- kafka/conn.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'kafka/conn.py') diff --git a/kafka/conn.py b/kafka/conn.py index e13913f..d713b56 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -16,6 +16,7 @@ import kafka.common as Errors from kafka.common import ConnectionError from kafka.future import Future from kafka.protocol.api import RequestHeader +from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -44,6 +45,7 @@ class BrokerConnection(object): 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': 32768, 'send_buffer_bytes': 131072, + 'api_version': (0, 8, 2), # default to most restrictive } def __init__(self, host, port, **configs): @@ -278,7 +280,17 @@ class BrokerConnection(object): # verify send/recv correlation ids match recv_correlation_id = Int32.decode(read_buffer) - if ifr.correlation_id != recv_correlation_id: + + # 0.8.2 quirk + if (self.config['api_version'] == (0, 8, 2) and + ifr.response_type is GroupCoordinatorResponse and + recv_correlation_id == 0): + raise Errors.KafkaError( + 'Kafka 0.8.2 quirk -- try creating a topic first') + + elif ifr.correlation_id != recv_correlation_id: + + error = Errors.CorrelationIdError( 'Correlation ids do not match: sent %d, recv %d' % (ifr.correlation_id, recv_correlation_id)) -- cgit v1.2.1