summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-06-10 02:39:05 -0700
committerDana Powers <dana.powers@gmail.com>2015-06-10 02:39:05 -0700
commit8e3cd1ca2801ed13522d177305898c29a3ebfd9b (patch)
tree5b2dc7ff857007e6ecf660d1f9b31ee393a16e06
parent4c9a3c6b9dac952154cdab2e11892bff240f9c91 (diff)
parent66b6b4aa6ee7c4461a4e43b2512e76ba3f04230f (diff)
downloadkafka-python-8e3cd1ca2801ed13522d177305898c29a3ebfd9b.tar.gz
Merge pull request #403 from dpkp/client_request_response_ordering
Client request response ordering
-rw-r--r--kafka/client.py30
-rw-r--r--test/test_client_integration.py32
2 files changed, 51 insertions, 11 deletions
diff --git a/kafka/client.py b/kafka/client.py
index da86175..1bd8587 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -138,7 +138,8 @@ class KafkaClient(object):
Arguments:
payloads: list of object-like entities with a topic (str) and
- partition (int) attribute
+ partition (int) attribute; payloads with duplicate topic-partitions
+ are not supported.
encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
@@ -152,6 +153,10 @@ class KafkaClient(object):
List of response objects in the same order as the supplied payloads
"""
+ # encoders / decoders do not maintain ordering currently
+ # so we need to keep this so we can rebuild order before returning
+ original_ordering = [(p.topic, p.partition) for p in payloads]
+
# Group the requests by topic+partition
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
@@ -165,7 +170,7 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
# and collect the responses and errors
- responses_by_broker = collections.defaultdict(list)
+ responses = {}
broker_failures = []
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
@@ -184,7 +189,8 @@ class KafkaClient(object):
'to server %s: %s', requestId, broker, e)
for payload in payloads:
- responses_by_broker[broker].append(FailedPayloadsError(payload))
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
# No exception, try to get response
else:
@@ -196,7 +202,8 @@ class KafkaClient(object):
log.debug('Request %s does not expect a response '
'(skipping conn.recv)', requestId)
for payload in payloads:
- responses_by_broker[broker].append(None)
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = None
continue
try:
@@ -208,12 +215,17 @@ class KafkaClient(object):
requestId, broker, e)
for payload in payloads:
- responses_by_broker[broker].append(FailedPayloadsError(payload))
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
else:
+ _resps = []
for payload_response in decoder_fn(response):
- responses_by_broker[broker].append(payload_response)
- log.debug('Response %s: %s', requestId, responses_by_broker[broker])
+ topic_partition = (payload_response.topic,
+ payload_response.partition)
+ responses[topic_partition] = payload_response
+ _resps.append(payload_response)
+ log.debug('Response %s: %s', requestId, _resps)
# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request
@@ -223,9 +235,7 @@ class KafkaClient(object):
self.reset_all_metadata()
# Return responses in the same order as provided
- responses_by_payload = [responses_by_broker[broker].pop(0)
- for broker in brokers_for_payloads]
- return responses_by_payload
+ return [responses[tp] for tp in original_ordering]
def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 585123b..a6ea8f7 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -2,8 +2,9 @@ import os
from kafka.common import (
FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
- KafkaTimeoutError
+ KafkaTimeoutError, ProduceRequest
)
+from kafka.protocol import create_message
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
@@ -49,6 +50,35 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
+ @kafka_versions('all')
+ def test_send_produce_request_maintains_request_response_order(self):
+
+ self.client.ensure_topic_exists(b'foo', timeout=1)
+ self.client.ensure_topic_exists(b'bar', timeout=1)
+
+ requests = [
+ ProduceRequest(
+ b'foo', 0,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'bar', 1,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'foo', 1,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'bar', 0,
+ [create_message(b'a'), create_message(b'b')]),
+ ]
+
+ responses = self.client.send_produce_request(requests)
+ while len(responses):
+ request = requests.pop()
+ response = responses.pop()
+ self.assertEqual(request.topic, response.topic)
+ self.assertEqual(request.partition, response.partition)
+
+
####################
# Offset Tests #
####################