summaryrefslogtreecommitdiff
path: root/kafka/producer
diff options
context:
space:
mode:
authortrbs <trbs@trbs.net>2015-09-12 18:22:50 +0200
committertrbs <trbs@trbs.net>2015-09-12 18:22:50 +0200
commit4d516fbd496096ae30191e13b7c2e378654b3759 (patch)
treee5e3177168dc1a4439d1d3e4ce4a6d21c47853e8 /kafka/producer
parentb525e1a8d63e4fcb0ede43c05739bc84c85cc79c (diff)
downloadkafka-python-4d516fbd496096ae30191e13b7c2e378654b3759.tar.gz
allow to specify compression level for codecs which support this
Diffstat (limited to 'kafka/producer')
-rw-r--r--kafka/producer/base.py12
1 files changed, 8 insertions, 4 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 3c826cd..7b7b7c1 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -47,7 +47,8 @@ SYNC_FAIL_ON_ERROR_DEFAULT = True
def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, retry_options, stop_event,
log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
- stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
+ stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
+ codec_compresslevel=None):
"""Private method to manage producing messages asynchronously
Listens on the queue for a specified number of messages or until
@@ -123,7 +124,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Send collected requests upstream
for topic_partition, msg in msgset.items():
- messages = create_message_set(msg, codec, key)
+ messages = create_message_set(msg, codec, key, codec_compresslevel)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
tuple(messages))
@@ -267,6 +268,7 @@ class Producer(object):
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
codec=None,
+ codec_compresslevel=None,
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
async=False,
batch_send=False, # deprecated, use async
@@ -297,6 +299,7 @@ class Producer(object):
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
self.codec = codec
+ self.codec_compresslevel = codec_compresslevel
if self.async:
# Messages are sent through this queue
@@ -314,7 +317,8 @@ class Producer(object):
self.req_acks, self.ack_timeout,
async_retry_options, self.thread_stop_event),
kwargs={'log_messages_on_error': async_log_messages_on_error,
- 'stop_timeout': async_stop_timeout}
+ 'stop_timeout': async_stop_timeout,
+ 'codec_compresslevel': self.codec_compresslevel}
)
# Thread will die if main thread exits
@@ -382,7 +386,7 @@ class Producer(object):
'Current queue size %d.' % self.queue.qsize())
resp = []
else:
- messages = create_message_set([(m, key) for m in msg], self.codec, key)
+ messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request(