summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/keyed.py53
-rw-r--r--kafka/producer/simple.py62
2 files changed, 24 insertions, 91 deletions
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 5fe9b12..a5a26c9 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -3,14 +3,10 @@ from __future__ import absolute_import
import logging
import warnings
-from kafka.partitioner import HashedPartitioner
-from kafka.util import kafka_bytestring
+from .base import Producer
+from ..partitioner import HashedPartitioner
+from ..util import kafka_bytestring
-from .base import (
- Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
- ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
-)
log = logging.getLogger(__name__)
@@ -19,46 +15,17 @@ class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key
- Arguments:
- client: The kafka client instance
+ See Producer class for Arguments
- Keyword Arguments:
+ Additional Arguments:
partitioner: A partitioner class that will be used to get the partition
- to send the message to. Must be derived from Partitioner
- async: If True, the messages are sent asynchronously via another
- thread (process). We will not wait for a response to these
- ack_timeout: Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send: If True, messages are send in batches
- batch_send_every_n: If set, messages are send in batches of this size
- batch_send_every_t: If set, messages are send after this timeout
+ to send the message to. Must be derived from Partitioner.
+ Defaults to HashedPartitioner.
"""
- def __init__(self, client, partitioner=None, async=False,
- req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
- codec=None,
- batch_send=False,
- batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- async_retry_limit=ASYNC_RETRY_LIMIT,
- async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
- async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
- async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
- async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
- if not partitioner:
- partitioner = HashedPartitioner
- self.partitioner_class = partitioner
+ def __init__(self, *args, **kwargs):
+ self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner)
self.partitioners = {}
-
- super(KeyedProducer, self).__init__(client, req_acks, ack_timeout,
- codec, async, batch_send,
- batch_send_every_n,
- batch_send_every_t,
- async_retry_limit,
- async_retry_backoff_ms,
- async_retry_on_timeouts,
- async_queue_maxsize,
- async_queue_put_timeout)
+ super(KeyedProducer, self).__init__(*args, **kwargs)
def _next_partition(self, topic, key):
if topic not in self.partitioners:
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 280a02e..13e60d9 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -1,68 +1,34 @@
from __future__ import absolute_import
+from itertools import cycle
import logging
import random
import six
-from itertools import cycle
-
from six.moves import xrange
-from .base import (
- Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
- ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
-)
+from .base import Producer
+
log = logging.getLogger(__name__)
class SimpleProducer(Producer):
- """
- A simple, round-robin producer. Each message goes to exactly one partition
-
- Arguments:
- client: The Kafka client instance to use
-
- Keyword Arguments:
- async: If True, the messages are sent asynchronously via another
- thread (process). We will not wait for a response to these
- req_acks: A value indicating the acknowledgements that the server must
- receive before responding to the request
- ack_timeout: Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send: If True, messages are send in batches
- batch_send_every_n: If set, messages are send in batches of this size
- batch_send_every_t: If set, messages are send after this timeout
- random_start: If true, randomize the initial partition which the
+ """A simple, round-robin producer.
+
+ See Producer class for Base Arguments
+
+ Additional Arguments:
+ random_start (bool, optional): randomize the initial partition which
the first message block will be published to, otherwise
if false, the first message block will always publish
- to partition 0 before cycling through each partition
+ to partition 0 before cycling through each partition,
+ defaults to True.
"""
- def __init__(self, client, async=False,
- req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
- codec=None,
- batch_send=False,
- batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- random_start=True,
- async_retry_limit=ASYNC_RETRY_LIMIT,
- async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
- async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
- async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
- async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
+ def __init__(self, *args, **kwargs):
self.partition_cycles = {}
- self.random_start = random_start
- super(SimpleProducer, self).__init__(client, req_acks, ack_timeout,
- codec, async, batch_send,
- batch_send_every_n,
- batch_send_every_t,
- async_retry_limit,
- async_retry_backoff_ms,
- async_retry_on_timeouts,
- async_queue_maxsize,
- async_queue_put_timeout)
+ self.random_start = kwargs.pop('random_start', True)
+ super(SimpleProducer, self).__init__(*args, **kwargs)
def _next_partition(self, topic):
if topic not in self.partition_cycles: