summaryrefslogtreecommitdiff
path: root/kafka/producer/simple.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-08 17:23:56 -0700
committerDana Powers <dana.powers@rd.io>2015-06-08 18:59:23 -0700
commitc0fb0de7c4ced45dae4e09cdc660ebc741e11af3 (patch)
treebaec77f5972b283fb12ee038fd72dd4b41557b00 /kafka/producer/simple.py
parenta9ddf155e7f9c94be3c4b7508946158aa1900466 (diff)
downloadkafka-python-c0fb0de7c4ced45dae4e09cdc660ebc741e11af3.tar.gz
Dont maintain all producer args / kwargs in subclass __init__ and docstrings -- just refer to super class (Producer)
Diffstat (limited to 'kafka/producer/simple.py')
-rw-r--r--kafka/producer/simple.py62
1 files changed, 14 insertions, 48 deletions
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 280a02e..13e60d9 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -1,68 +1,34 @@
from __future__ import absolute_import
+from itertools import cycle
import logging
import random
import six
-from itertools import cycle
-
from six.moves import xrange
-from .base import (
- Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
- ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
-)
+from .base import Producer
+
log = logging.getLogger(__name__)
class SimpleProducer(Producer):
- """
- A simple, round-robin producer. Each message goes to exactly one partition
-
- Arguments:
- client: The Kafka client instance to use
-
- Keyword Arguments:
- async: If True, the messages are sent asynchronously via another
- thread (process). We will not wait for a response to these
- req_acks: A value indicating the acknowledgements that the server must
- receive before responding to the request
- ack_timeout: Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send: If True, messages are send in batches
- batch_send_every_n: If set, messages are send in batches of this size
- batch_send_every_t: If set, messages are send after this timeout
- random_start: If true, randomize the initial partition which the
+ """A simple, round-robin producer.
+
+ See Producer class for Base Arguments
+
+ Additional Arguments:
+ random_start (bool, optional): randomize the initial partition which
the first message block will be published to, otherwise
if false, the first message block will always publish
- to partition 0 before cycling through each partition
+ to partition 0 before cycling through each partition,
+ defaults to True.
"""
- def __init__(self, client, async=False,
- req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
- codec=None,
- batch_send=False,
- batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- random_start=True,
- async_retry_limit=ASYNC_RETRY_LIMIT,
- async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
- async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
- async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
- async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
+ def __init__(self, *args, **kwargs):
self.partition_cycles = {}
- self.random_start = random_start
- super(SimpleProducer, self).__init__(client, req_acks, ack_timeout,
- codec, async, batch_send,
- batch_send_every_n,
- batch_send_every_t,
- async_retry_limit,
- async_retry_backoff_ms,
- async_retry_on_timeouts,
- async_queue_maxsize,
- async_queue_put_timeout)
+ self.random_start = kwargs.pop('random_start', True)
+ super(SimpleProducer, self).__init__(*args, **kwargs)
def _next_partition(self, topic):
if topic not in self.partition_cycles: