diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-06 16:08:01 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-06 16:14:49 -0700 |
commit | 868115c703afc4403adc8d9481bf31d2c15064dd (patch) | |
tree | 335b0090cd382226b9b9f2edb39752b52b9c2234 | |
parent | 3a4ceef3816e9ddfcb86099f691f72f1f60921cd (diff) | |
download | kafka-python-868115c703afc4403adc8d9481bf31d2c15064dd.tar.gz |
Raise an error if we attempt to group duplicate topic-partition payloads
- previously this would simply drop one of the payloads
-rw-r--r-- | kafka/client.py | 2 | ||||
-rw-r--r-- | kafka/util.py | 3 | ||||
-rw-r--r-- | test/test_util.py | 6 |
3 files changed, 10 insertions, 1 deletions
diff --git a/kafka/client.py b/kafka/client.py index 7ea09d7..4302ce5 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -423,6 +423,8 @@ class KafkaClient(object): Arguments: payloads (list of ProduceRequest): produce requests to send to kafka + ProduceRequest payloads must not contain duplicates for any + topic-partition. acks (int, optional): how many acks the servers should receive from replica brokers before responding to the request. If it is 0, the server will not send any response. If it is 1, the server will wait diff --git a/kafka/util.py b/kafka/util.py index 78c3607..6d9d307 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -82,6 +82,9 @@ def relative_unpack(fmt, data, cur): def group_by_topic_and_partition(tuples): out = collections.defaultdict(dict) for t in tuples: + assert t.topic not in out or t.partition not in out[t.topic], \ + 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__, + t.topic, t.partition) out[t.topic][t.partition] = t return out diff --git a/test/test_util.py b/test/test_util.py index 6a8f45b..ea3783e 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -108,7 +108,6 @@ class UtilTest(unittest.TestCase): l = [ t("a", 1), - t("a", 1), t("a", 2), t("a", 3), t("b", 3), @@ -124,3 +123,8 @@ class UtilTest(unittest.TestCase): 3: t("b", 3), } }) + + # should not be able to group duplicate topic-partitions + t1 = t("a", 1) + with self.assertRaises(AssertionError): + kafka.util.group_by_topic_and_partition([t1, t1]) |