diff options
author | trbs <trbs@trbs.net> | 2015-09-12 18:22:50 +0200 |
---|---|---|
committer | trbs <trbs@trbs.net> | 2015-09-12 18:22:50 +0200 |
commit | 4d516fbd496096ae30191e13b7c2e378654b3759 (patch) | |
tree | e5e3177168dc1a4439d1d3e4ce4a6d21c47853e8 | |
parent | b525e1a8d63e4fcb0ede43c05739bc84c85cc79c (diff) | |
download | kafka-python-4d516fbd496096ae30191e13b7c2e378654b3759.tar.gz |
allow to specify compression level for codecs which support this
-rw-r--r-- | kafka/codec.py | 7 | ||||
-rw-r--r-- | kafka/producer/base.py | 12 | ||||
-rw-r--r-- | kafka/protocol.py | 8 |
3 files changed, 17 insertions, 10 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index 19f405b..a9373c7 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -22,12 +22,15 @@ def has_snappy(): return _HAS_SNAPPY -def gzip_encode(payload): +def gzip_encode(payload, compresslevel=None): + if not compresslevel: + compresslevel = 9 + with BytesIO() as buf: # Gzip context manager introduced in python 2.6 # so old-fashioned way until we decide to not support 2.6 - gzipper = gzip.GzipFile(fileobj=buf, mode="w") + gzipper = gzip.GzipFile(fileobj=buf, mode="w", compresslevel=compresslevel) try: gzipper.write(payload) finally: diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 3c826cd..7b7b7c1 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -47,7 +47,8 @@ SYNC_FAIL_ON_ERROR_DEFAULT = True def _send_upstream(queue, client, codec, batch_time, batch_size, req_acks, ack_timeout, retry_options, stop_event, log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, - stop_timeout=ASYNC_STOP_TIMEOUT_SECS): + stop_timeout=ASYNC_STOP_TIMEOUT_SECS, + codec_compresslevel=None): """Private method to manage producing messages asynchronously Listens on the queue for a specified number of messages or until @@ -123,7 +124,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Send collected requests upstream for topic_partition, msg in msgset.items(): - messages = create_message_set(msg, codec, key) + messages = create_message_set(msg, codec, key, codec_compresslevel) req = ProduceRequest(topic_partition.topic, topic_partition.partition, tuple(messages)) @@ -267,6 +268,7 @@ class Producer(object): req_acks=ACK_AFTER_LOCAL_WRITE, ack_timeout=DEFAULT_ACK_TIMEOUT, codec=None, + codec_compresslevel=None, sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, async=False, batch_send=False, # deprecated, use async @@ -297,6 +299,7 @@ class Producer(object): raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) self.codec = codec + self.codec_compresslevel = codec_compresslevel if self.async: # Messages are sent through this queue @@ -314,7 +317,8 @@ class Producer(object): self.req_acks, self.ack_timeout, async_retry_options, self.thread_stop_event), kwargs={'log_messages_on_error': async_log_messages_on_error, - 'stop_timeout': async_stop_timeout} + 'stop_timeout': async_stop_timeout, + 'codec_compresslevel': self.codec_compresslevel} ) # Thread will die if main thread exits @@ -382,7 +386,7 @@ class Producer(object): 'Current queue size %d.' % self.queue.qsize()) resp = [] else: - messages = create_message_set([(m, key) for m in msg], self.codec, key) + messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel) req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request( diff --git a/kafka/protocol.py b/kafka/protocol.py index d5adf89..a916974 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -547,7 +547,7 @@ def create_message(payload, key=None): return Message(0, 0, key, payload) -def create_gzip_message(payloads, key=None): +def create_gzip_message(payloads, key=None, compresslevel=None): """ Construct a Gzipped Message containing multiple Messages @@ -562,7 +562,7 @@ def create_gzip_message(payloads, key=None): message_set = KafkaProtocol._encode_message_set( [create_message(payload, pl_key) for payload, pl_key in payloads]) - gzipped = gzip_encode(message_set) + gzipped = gzip_encode(message_set, compresslevel=compresslevel) codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP return Message(0, 0x00 | codec, key, gzipped) @@ -589,7 +589,7 @@ def create_snappy_message(payloads, key=None): return Message(0, 0x00 | codec, key, snapped) -def create_message_set(messages, codec=CODEC_NONE, key=None): +def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): """Create a message set using the given codec. If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise, @@ -598,7 +598,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None): if codec == CODEC_NONE: return [create_message(m, k) for m, k in messages] elif codec == CODEC_GZIP: - return [create_gzip_message(messages, key)] + return [create_gzip_message(messages, key, compresslevel)] elif codec == CODEC_SNAPPY: return [create_snappy_message(messages, key)] else: |