From 92fefb06fa2475c0421dd065d0d15a659bad9d3c Mon Sep 17 00:00:00 2001 From: Alex Couture-Beil Date: Tue, 11 Mar 2014 18:09:52 -0700 Subject: Modified SimpleProducer to randomize the initial round robin ordering of partitions to prevent the first message from always being published to partition 0. --- kafka/producer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'kafka') diff --git a/kafka/producer.py b/kafka/producer.py index 12a2934..6359aa2 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import logging import time +import random from Queue import Empty from collections import defaultdict @@ -197,7 +198,9 @@ class SimpleProducer(Producer): if topic not in self.partition_cycles: if topic not in self.client.topic_partitions: self.client.load_metadata_for_topics(topic) - self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + randomly_ordered_partitions = self.client.topic_partitions[topic][:] + random.shuffle(randomly_ordered_partitions) + self.partition_cycles[topic] = cycle(randomly_ordered_partitions) return self.partition_cycles[topic].next() def send_messages(self, topic, *msg): -- cgit v1.2.1 From 4d0722aef2f7420f99b25f28e79fecc47b077d2d Mon Sep 17 00:00:00 2001 From: Alex Couture-Beil Date: Tue, 1 Apr 2014 18:07:24 -0700 Subject: Changed randomization to simply randomize the initial starting partition of the sorted list of partition rather than completely randomizing the initial ordering before round-robin cycling the partitions --- kafka/producer.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'kafka') diff --git a/kafka/producer.py b/kafka/producer.py index 6359aa2..7a7c48f 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -198,9 +198,13 @@ class SimpleProducer(Producer): if topic not in self.partition_cycles: if topic not in self.client.topic_partitions: self.client.load_metadata_for_topics(topic) - randomly_ordered_partitions = self.client.topic_partitions[topic][:] - random.shuffle(randomly_ordered_partitions) - self.partition_cycles[topic] = cycle(randomly_ordered_partitions) + self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + + # Randomize the initial partition that is returned + num_partitions = len(self.client.topic_partitions[topic]) + for _ in xrange(random.randint(0, num_partitions-1)): + self.partition_cycles[topic].next() + return self.partition_cycles[topic].next() def send_messages(self, topic, *msg): -- cgit v1.2.1 From 1fd35a558f96df5a0400dbfd07b7adce97fa050e Mon Sep 17 00:00:00 2001 From: Alex Couture-Beil Date: Tue, 1 Apr 2014 18:24:39 -0700 Subject: added random_start param to SimpleProducer to enable/disable randomization of the initial partition messages are published to --- kafka/producer.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'kafka') diff --git a/kafka/producer.py b/kafka/producer.py index 7a7c48f..8f35963 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -181,14 +181,20 @@ class SimpleProducer(Producer): batch_send - If True, messages are send in batches batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout + random_start - If true, randomize the initial partition which the + the first message block will be published to, otherwise + if false, the first message block will always publish + to partition 0 before cycling through each partition """ def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + random_start=False): self.partition_cycles = {} + self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, @@ -201,9 +207,10 @@ class SimpleProducer(Producer): self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) # Randomize the initial partition that is returned - num_partitions = len(self.client.topic_partitions[topic]) - for _ in xrange(random.randint(0, num_partitions-1)): - self.partition_cycles[topic].next() + if self.random_start: + num_partitions = len(self.client.topic_partitions[topic]) + for _ in xrange(random.randint(0, num_partitions-1)): + self.partition_cycles[topic].next() return self.partition_cycles[topic].next() -- cgit v1.2.1 From ef4648a0840bf8a212de5c7129d38e34cb335acb Mon Sep 17 00:00:00 2001 From: maciejkula Date: Wed, 16 Apr 2014 17:05:45 +0100 Subject: Fix lack of timeout support in KafkaClient and KafkaConnection --- kafka/client.py | 2 +- kafka/conn.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'kafka') diff --git a/kafka/client.py b/kafka/client.py index 39c89ba..5dee7b7 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -48,7 +48,7 @@ class KafkaClient(object): host_key = (host, port) if host_key not in self.conns: - self.conns[host_key] = KafkaConnection(host, port) + self.conns[host_key] = KafkaConnection(host, port, timeout=self.timeout) return self.conns[host_key] diff --git a/kafka/conn.py b/kafka/conn.py index 4fdeb17..5dc3d5a 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -150,6 +150,6 @@ class KafkaConnection(local): """ self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((self.host, self.port)) self._sock.settimeout(self.timeout) + self._sock.connect((self.host, self.port)) self._dirty = False -- cgit v1.2.1