diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-03-23 05:58:55 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-23 05:58:55 -0700 |
commit | b62006aeb86258b4b1ef2735bebb1fe99459b82d (patch) | |
tree | 9a5cab083163b8e5d952d1bb0f3bb7141ffb746a /kafka | |
parent | 204388b0928c02a339eb84b376c74851eb074e69 (diff) | |
download | kafka-python-b62006aeb86258b4b1ef2735bebb1fe99459b82d.tar.gz |
Change SimpleProducer to use async_send (async is reserved in py37) (#1454)
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/producer/base.py | 38 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 2 | ||||
-rw-r--r-- | kafka/producer/simple.py | 2 |
3 files changed, 25 insertions, 17 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index c038bd3..e8d6c3d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -226,7 +226,7 @@ class Producer(object): Arguments: client (kafka.SimpleClient): instance to use for broker - communications. If async=True, the background thread will use + communications. If async_send=True, the background thread will use :meth:`client.copy`, which is expected to return a thread-safe object. codec (kafka.protocol.ALL_CODECS): compression codec to use. @@ -238,11 +238,11 @@ class Producer(object): sync_fail_on_error (bool, optional): whether sync producer should raise exceptions (True), or just return errors (False), defaults to True. - async (bool, optional): send message using a background thread, + async_send (bool, optional): send message using a background thread, defaults to False. - batch_send_every_n (int, optional): If async is True, messages are + batch_send_every_n (int, optional): If async_send is True, messages are sent in batches of this size, defaults to 20. - batch_send_every_t (int or float, optional): If async is True, + batch_send_every_t (int or float, optional): If async_send is True, messages are sent immediately after this timeout in seconds, even if there are fewer than batch_send_every_n, defaults to 20. async_retry_limit (int, optional): number of retries for failed messages @@ -268,8 +268,10 @@ class Producer(object): defaults to 30. Deprecated Arguments: + async (bool, optional): send message using a background thread, + defaults to False. Deprecated, use 'async_send' batch_send (bool, optional): If True, messages are sent by a background - thread in batches, defaults to False. Deprecated, use 'async' + thread in batches, defaults to False. Deprecated, use 'async_send' """ ACK_NOT_REQUIRED = 0 # No ack is required ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log @@ -282,8 +284,8 @@ class Producer(object): codec=None, codec_compresslevel=None, sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, - async=False, - batch_send=False, # deprecated, use async + async_send=False, + batch_send=False, # deprecated, use async_send batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, async_retry_limit=ASYNC_RETRY_LIMIT, @@ -292,15 +294,21 @@ class Producer(object): async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, - async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): + async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS, + **kwargs): + + # async renamed async_send for python3.7 support + if 'async' in kwargs: + log.warning('Deprecated async option found -- use async_send') + async_send = kwargs['async'] - if async: + if async_send: assert batch_send_every_n > 0 assert batch_send_every_t > 0 assert async_queue_maxsize >= 0 self.client = client - self.async = async + self.async_send = async_send self.req_acks = req_acks self.ack_timeout = ack_timeout self.stopped = False @@ -313,7 +321,7 @@ class Producer(object): self.codec = codec self.codec_compresslevel = codec_compresslevel - if self.async: + if self.async_send: # Messages are sent through this queue self.queue = Queue(async_queue_maxsize) self.async_queue_put_timeout = async_queue_put_timeout @@ -400,7 +408,7 @@ class Producer(object): if key is not None and not isinstance(key, six.binary_type): raise TypeError("the key must be type bytes") - if self.async: + if self.async_send: for idx, m in enumerate(msg): try: item = (TopicPartition(topic, partition), m, key) @@ -435,7 +443,7 @@ class Producer(object): log.warning('timeout argument to stop() is deprecated - ' 'it will be removed in future release') - if not self.async: + if not self.async_send: log.warning('producer.stop() called, but producer is not async') return @@ -443,7 +451,7 @@ class Producer(object): log.warning('producer.stop() called, but producer is already stopped') return - if self.async: + if self.async_send: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) self.thread_stop_event.set() self.thread.join() @@ -471,5 +479,5 @@ class Producer(object): self.stopped = True def __del__(self): - if self.async and not self.stopped: + if self.async_send and not self.stopped: self.stop() diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 8de3ad8..62bb733 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -46,4 +46,4 @@ class KeyedProducer(Producer): return self.send_messages(topic, key, msg) def __repr__(self): - return '<KeyedProducer batch=%s>' % self.async + return '<KeyedProducer batch=%s>' % self.async_send diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 589363c..91e0abc 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -51,4 +51,4 @@ class SimpleProducer(Producer): ) def __repr__(self): - return '<SimpleProducer batch=%s>' % self.async + return '<SimpleProducer batch=%s>' % self.async_send |