diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-08 17:23:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-08 18:59:23 -0700 |
commit | c0fb0de7c4ced45dae4e09cdc660ebc741e11af3 (patch) | |
tree | baec77f5972b283fb12ee038fd72dd4b41557b00 | |
parent | a9ddf155e7f9c94be3c4b7508946158aa1900466 (diff) | |
download | kafka-python-c0fb0de7c4ced45dae4e09cdc660ebc741e11af3.tar.gz |
Dont maintain all producer args / kwargs in subclass __init__ and docstrings -- just refer to super class (Producer)
-rw-r--r-- | kafka/producer/keyed.py | 53 | ||||
-rw-r--r-- | kafka/producer/simple.py | 62 |
2 files changed, 24 insertions, 91 deletions
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 5fe9b12..a5a26c9 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -3,14 +3,10 @@ from __future__ import absolute_import import logging import warnings -from kafka.partitioner import HashedPartitioner -from kafka.util import kafka_bytestring +from .base import Producer +from ..partitioner import HashedPartitioner +from ..util import kafka_bytestring -from .base import ( - Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, - ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS -) log = logging.getLogger(__name__) @@ -19,46 +15,17 @@ class KeyedProducer(Producer): """ A producer which distributes messages to partitions based on the key - Arguments: - client: The kafka client instance + See Producer class for Arguments - Keyword Arguments: + Additional Arguments: partitioner: A partitioner class that will be used to get the partition - to send the message to. Must be derived from Partitioner - async: If True, the messages are sent asynchronously via another - thread (process). We will not wait for a response to these - ack_timeout: Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send: If True, messages are send in batches - batch_send_every_n: If set, messages are send in batches of this size - batch_send_every_t: If set, messages are send after this timeout + to send the message to. Must be derived from Partitioner. + Defaults to HashedPartitioner. """ - def __init__(self, client, partitioner=None, async=False, - req_acks=Producer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, - codec=None, - batch_send=False, - batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - async_retry_limit=ASYNC_RETRY_LIMIT, - async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, - async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, - async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, - async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): - if not partitioner: - partitioner = HashedPartitioner - self.partitioner_class = partitioner + def __init__(self, *args, **kwargs): + self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner) self.partitioners = {} - - super(KeyedProducer, self).__init__(client, req_acks, ack_timeout, - codec, async, batch_send, - batch_send_every_n, - batch_send_every_t, - async_retry_limit, - async_retry_backoff_ms, - async_retry_on_timeouts, - async_queue_maxsize, - async_queue_put_timeout) + super(KeyedProducer, self).__init__(*args, **kwargs) def _next_partition(self, topic, key): if topic not in self.partitioners: diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 280a02e..13e60d9 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -1,68 +1,34 @@ from __future__ import absolute_import +from itertools import cycle import logging import random import six -from itertools import cycle - from six.moves import xrange -from .base import ( - Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, - ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS -) +from .base import Producer + log = logging.getLogger(__name__) class SimpleProducer(Producer): - """ - A simple, round-robin producer. Each message goes to exactly one partition - - Arguments: - client: The Kafka client instance to use - - Keyword Arguments: - async: If True, the messages are sent asynchronously via another - thread (process). We will not wait for a response to these - req_acks: A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout: Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send: If True, messages are send in batches - batch_send_every_n: If set, messages are send in batches of this size - batch_send_every_t: If set, messages are send after this timeout - random_start: If true, randomize the initial partition which the + """A simple, round-robin producer. + + See Producer class for Base Arguments + + Additional Arguments: + random_start (bool, optional): randomize the initial partition which the first message block will be published to, otherwise if false, the first message block will always publish - to partition 0 before cycling through each partition + to partition 0 before cycling through each partition, + defaults to True. """ - def __init__(self, client, async=False, - req_acks=Producer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, - codec=None, - batch_send=False, - batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - random_start=True, - async_retry_limit=ASYNC_RETRY_LIMIT, - async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, - async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, - async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, - async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): + def __init__(self, *args, **kwargs): self.partition_cycles = {} - self.random_start = random_start - super(SimpleProducer, self).__init__(client, req_acks, ack_timeout, - codec, async, batch_send, - batch_send_every_n, - batch_send_every_t, - async_retry_limit, - async_retry_backoff_ms, - async_retry_on_timeouts, - async_queue_maxsize, - async_queue_put_timeout) + self.random_start = kwargs.pop('random_start', True) + super(SimpleProducer, self).__init__(*args, **kwargs) def _next_partition(self, topic): if topic not in self.partition_cycles: |