summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-06 16:08:01 -0700
committerDana Powers <dana.powers@rd.io>2015-06-06 16:14:49 -0700
commit868115c703afc4403adc8d9481bf31d2c15064dd (patch)
tree335b0090cd382226b9b9f2edb39752b52b9c2234
parent3a4ceef3816e9ddfcb86099f691f72f1f60921cd (diff)
downloadkafka-python-868115c703afc4403adc8d9481bf31d2c15064dd.tar.gz
Raise an error if we attempt to group duplicate topic-partition payloads
- previously this would simply drop one of the payloads
-rw-r--r--kafka/client.py2
-rw-r--r--kafka/util.py3
-rw-r--r--test/test_util.py6
3 files changed, 10 insertions, 1 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 7ea09d7..4302ce5 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -423,6 +423,8 @@ class KafkaClient(object):
Arguments:
payloads (list of ProduceRequest): produce requests to send to kafka
+ ProduceRequest payloads must not contain duplicates for any
+ topic-partition.
acks (int, optional): how many acks the servers should receive from replica
brokers before responding to the request. If it is 0, the server
will not send any response. If it is 1, the server will wait
diff --git a/kafka/util.py b/kafka/util.py
index 78c3607..6d9d307 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -82,6 +82,9 @@ def relative_unpack(fmt, data, cur):
def group_by_topic_and_partition(tuples):
out = collections.defaultdict(dict)
for t in tuples:
+ assert t.topic not in out or t.partition not in out[t.topic], \
+ 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__,
+ t.topic, t.partition)
out[t.topic][t.partition] = t
return out
diff --git a/test/test_util.py b/test/test_util.py
index 6a8f45b..ea3783e 100644
--- a/test/test_util.py
+++ b/test/test_util.py
@@ -108,7 +108,6 @@ class UtilTest(unittest.TestCase):
l = [
t("a", 1),
- t("a", 1),
t("a", 2),
t("a", 3),
t("b", 3),
@@ -124,3 +123,8 @@ class UtilTest(unittest.TestCase):
3: t("b", 3),
}
})
+
+ # should not be able to group duplicate topic-partitions
+ t1 = t("a", 1)
+ with self.assertRaises(AssertionError):
+ kafka.util.group_by_topic_and_partition([t1, t1])