diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-03 16:06:35 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-03 16:06:35 -0800 |
commit | 9b07bfb5298f961b965ee4a295b0bceb52803852 (patch) | |
tree | c3d239365f2e3621ee5a3d9c25bdc017dd3352c1 | |
parent | 5c45ec13f3e59d9c398f2d3035c762ca13589885 (diff) | |
download | kafka-python-9b07bfb5298f961b965ee4a295b0bceb52803852.tar.gz |
Check for 0.8.2 GroupCoordinator quirk in BrokerConnection
-rw-r--r-- | kafka/conn.py | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index e13913f..d713b56 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -16,6 +16,7 @@ import kafka.common as Errors from kafka.common import ConnectionError from kafka.future import Future from kafka.protocol.api import RequestHeader +from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -44,6 +45,7 @@ class BrokerConnection(object): 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': 32768, 'send_buffer_bytes': 131072, + 'api_version': (0, 8, 2), # default to most restrictive } def __init__(self, host, port, **configs): @@ -278,7 +280,17 @@ class BrokerConnection(object): # verify send/recv correlation ids match recv_correlation_id = Int32.decode(read_buffer) - if ifr.correlation_id != recv_correlation_id: + + # 0.8.2 quirk + if (self.config['api_version'] == (0, 8, 2) and + ifr.response_type is GroupCoordinatorResponse and + recv_correlation_id == 0): + raise Errors.KafkaError( + 'Kafka 0.8.2 quirk -- try creating a topic first') + + elif ifr.correlation_id != recv_correlation_id: + + error = Errors.CorrelationIdError( 'Correlation ids do not match: sent %d, recv %d' % (ifr.correlation_id, recv_correlation_id)) |