summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-03 16:06:35 -0800
committerDana Powers <dana.powers@rd.io>2016-01-03 16:06:35 -0800
commit9b07bfb5298f961b965ee4a295b0bceb52803852 (patch)
treec3d239365f2e3621ee5a3d9c25bdc017dd3352c1
parent5c45ec13f3e59d9c398f2d3035c762ca13589885 (diff)
downloadkafka-python-9b07bfb5298f961b965ee4a295b0bceb52803852.tar.gz
Check for 0.8.2 GroupCoordinator quirk in BrokerConnection
-rw-r--r--kafka/conn.py14
1 files changed, 13 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index e13913f..d713b56 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -16,6 +16,7 @@ import kafka.common as Errors
from kafka.common import ConnectionError
from kafka.future import Future
from kafka.protocol.api import RequestHeader
+from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -44,6 +45,7 @@ class BrokerConnection(object):
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': 32768,
'send_buffer_bytes': 131072,
+ 'api_version': (0, 8, 2), # default to most restrictive
}
def __init__(self, host, port, **configs):
@@ -278,7 +280,17 @@ class BrokerConnection(object):
# verify send/recv correlation ids match
recv_correlation_id = Int32.decode(read_buffer)
- if ifr.correlation_id != recv_correlation_id:
+
+ # 0.8.2 quirk
+ if (self.config['api_version'] == (0, 8, 2) and
+ ifr.response_type is GroupCoordinatorResponse and
+ recv_correlation_id == 0):
+ raise Errors.KafkaError(
+ 'Kafka 0.8.2 quirk -- try creating a topic first')
+
+ elif ifr.correlation_id != recv_correlation_id:
+
+
error = Errors.CorrelationIdError(
'Correlation ids do not match: sent %d, recv %d'
% (ifr.correlation_id, recv_correlation_id))