diff options
-rw-r--r-- | kafka/conn.py | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index e13913f..d713b56 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -16,6 +16,7 @@ import kafka.common as Errors from kafka.common import ConnectionError from kafka.future import Future from kafka.protocol.api import RequestHeader +from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -44,6 +45,7 @@ class BrokerConnection(object): 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': 32768, 'send_buffer_bytes': 131072, + 'api_version': (0, 8, 2), # default to most restrictive } def __init__(self, host, port, **configs): @@ -278,7 +280,17 @@ class BrokerConnection(object): # verify send/recv correlation ids match recv_correlation_id = Int32.decode(read_buffer) - if ifr.correlation_id != recv_correlation_id: + + # 0.8.2 quirk + if (self.config['api_version'] == (0, 8, 2) and + ifr.response_type is GroupCoordinatorResponse and + recv_correlation_id == 0): + raise Errors.KafkaError( + 'Kafka 0.8.2 quirk -- try creating a topic first') + + elif ifr.correlation_id != recv_correlation_id: + + error = Errors.CorrelationIdError( 'Correlation ids do not match: sent %d, recv %d' % (ifr.correlation_id, recv_correlation_id)) |