summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/conn.py14
1 files changed, 13 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index e13913f..d713b56 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -16,6 +16,7 @@ import kafka.common as Errors
from kafka.common import ConnectionError
from kafka.future import Future
from kafka.protocol.api import RequestHeader
+from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -44,6 +45,7 @@ class BrokerConnection(object):
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': 32768,
'send_buffer_bytes': 131072,
+ 'api_version': (0, 8, 2), # default to most restrictive
}
def __init__(self, host, port, **configs):
@@ -278,7 +280,17 @@ class BrokerConnection(object):
# verify send/recv correlation ids match
recv_correlation_id = Int32.decode(read_buffer)
- if ifr.correlation_id != recv_correlation_id:
+
+ # 0.8.2 quirk
+ if (self.config['api_version'] == (0, 8, 2) and
+ ifr.response_type is GroupCoordinatorResponse and
+ recv_correlation_id == 0):
+ raise Errors.KafkaError(
+ 'Kafka 0.8.2 quirk -- try creating a topic first')
+
+ elif ifr.correlation_id != recv_correlation_id:
+
+
error = Errors.CorrelationIdError(
'Correlation ids do not match: sent %d, recv %d'
% (ifr.correlation_id, recv_correlation_id))