diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-10 22:58:02 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-10 23:39:13 -0700 |
commit | 55e377bd475617cf489607c7c53890f89f590094 (patch) | |
tree | 582b3d92cac88d7c74e7f07268077fc492e157bb /kafka/protocol.py | |
parent | 04dbd0e7912c43e0d5cf32b29f0250dc67937df7 (diff) | |
download | kafka-python-55e377bd475617cf489607c7c53890f89f590094.tar.gz |
Use b''.join([]) instead of += to speedup code
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r-- | kafka/protocol.py | 124 |
1 files changed, 71 insertions, 53 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py index 9e01f5a..266e963 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -71,11 +71,13 @@ class KafkaProtocol(object): Offset => int64 MessageSize => int32 """ - message_set = b"" + message_set = [] for message in messages: encoded_message = KafkaProtocol._encode_message(message) - message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) - return message_set + message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0, + len(encoded_message), + encoded_message)) + return b''.join(message_set) @classmethod def _encode_message(cls, message): @@ -95,9 +97,11 @@ class KafkaProtocol(object): Value => bytes """ if message.magic == 0: - msg = struct.pack('>BB', message.magic, message.attributes) - msg += write_int_string(message.key) - msg += write_int_string(message.value) + msg = b''.join([ + struct.pack('>BB', message.magic, message.attributes), + write_int_string(message.key), + write_int_string(message.value) + ]) crc = crc32(msg) msg = struct.pack('>I%ds' % len(msg), crc, msg) else: @@ -197,21 +201,24 @@ class KafkaProtocol(object): payloads = [] if payloads is None else payloads grouped_payloads = group_by_topic_and_partition(payloads) - message = cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.PRODUCE_KEY) + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.PRODUCE_KEY)) - message += struct.pack('>hii', acks, timeout, len(grouped_payloads)) + message.append(struct.pack('>hii', acks, timeout, + len(grouped_payloads))) for topic, topic_payloads in grouped_payloads.items(): - message += struct.pack('>h%dsi' % len(topic), - len(topic), topic, len(topic_payloads)) + message.append(struct.pack('>h%dsi' % len(topic), len(topic), topic, + len(topic_payloads))) for partition, payload in topic_payloads.items(): msg_set = KafkaProtocol._encode_message_set(payload.messages) - message += struct.pack('>ii%ds' % len(msg_set), partition, - len(msg_set), msg_set) + message.append(struct.pack('>ii%ds' % len(msg_set), partition, + len(msg_set), msg_set)) - return struct.pack('>i%ds' % len(message), len(message), message) + msg = b''.join(message) + return struct.pack('>i%ds' % len(msg), len(msg), msg) @classmethod def decode_produce_response(cls, data): @@ -254,21 +261,23 @@ class KafkaProtocol(object): payloads = [] if payloads is None else payloads grouped_payloads = group_by_topic_and_partition(payloads) - message = cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.FETCH_KEY) + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.FETCH_KEY)) # -1 is the replica id - message += struct.pack('>iiii', -1, max_wait_time, min_bytes, - len(grouped_payloads)) + message.append(struct.pack('>iiii', -1, max_wait_time, min_bytes, + len(grouped_payloads))) for topic, topic_payloads in grouped_payloads.items(): - message += write_short_string(topic) - message += struct.pack('>i', len(topic_payloads)) + message.append(write_short_string(topic)) + message.append(struct.pack('>i', len(topic_payloads))) for partition, payload in topic_payloads.items(): - message += struct.pack('>iqi', partition, payload.offset, - payload.max_bytes) + message.append(struct.pack('>iqi', partition, payload.offset, + payload.max_bytes)) - return struct.pack('>i%ds' % len(message), len(message), message) + msg = b''.join(message) + return struct.pack('>i%ds' % len(msg), len(msg), msg) @classmethod def decode_fetch_response(cls, data): @@ -301,21 +310,23 @@ class KafkaProtocol(object): payloads = [] if payloads is None else payloads grouped_payloads = group_by_topic_and_partition(payloads) - message = cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_KEY) + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.OFFSET_KEY)) # -1 is the replica id - message += struct.pack('>ii', -1, len(grouped_payloads)) + message.append(struct.pack('>ii', -1, len(grouped_payloads))) for topic, topic_payloads in grouped_payloads.items(): - message += write_short_string(topic) - message += struct.pack('>i', len(topic_payloads)) + message.append(write_short_string(topic)) + message.append(struct.pack('>i', len(topic_payloads))) for partition, payload in topic_payloads.items(): - message += struct.pack('>iqi', partition, payload.time, - payload.max_offsets) + message.append(struct.pack('>iqi', partition, payload.time, + payload.max_offsets)) - return struct.pack('>i%ds' % len(message), len(message), message) + msg = b''.join(message) + return struct.pack('>i%ds' % len(msg), len(msg), msg) @classmethod def decode_offset_response(cls, data): @@ -360,15 +371,17 @@ class KafkaProtocol(object): else: topics = payloads - message = cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.METADATA_KEY) + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.METADATA_KEY)) - message += struct.pack('>i', len(topics)) + message.append(struct.pack('>i', len(topics))) for topic in topics: - message += struct.pack('>h%ds' % len(topic), len(topic), topic) + message.append(struct.pack('>h%ds' % len(topic), len(topic), topic)) - return write_int_string(message) + msg = b''.join(message) + return write_int_string(msg) @classmethod def decode_metadata_response(cls, data): @@ -435,20 +448,22 @@ class KafkaProtocol(object): """ grouped_payloads = group_by_topic_and_partition(payloads) - message = cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_COMMIT_KEY) - message += write_short_string(group) - message += struct.pack('>i', len(grouped_payloads)) + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.OFFSET_COMMIT_KEY)) + message.append(write_short_string(group)) + message.append(struct.pack('>i', len(grouped_payloads))) for topic, topic_payloads in grouped_payloads.items(): - message += write_short_string(topic) - message += struct.pack('>i', len(topic_payloads)) + message.append(write_short_string(topic)) + message.append(struct.pack('>i', len(topic_payloads))) for partition, payload in topic_payloads.items(): - message += struct.pack('>iq', partition, payload.offset) - message += write_short_string(payload.metadata) + message.append(struct.pack('>iq', partition, payload.offset)) + message.append(write_short_string(payload.metadata)) - return struct.pack('>i%ds' % len(message), len(message), message) + msg = b''.join(message) + return struct.pack('>i%ds' % len(msg), len(msg), msg) @classmethod def decode_offset_commit_response(cls, data): @@ -484,20 +499,23 @@ class KafkaProtocol(object): payloads: list of OffsetFetchRequest """ grouped_payloads = group_by_topic_and_partition(payloads) - message = cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_FETCH_KEY) - message += write_short_string(group) - message += struct.pack('>i', len(grouped_payloads)) + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.OFFSET_FETCH_KEY)) + + message.append(write_short_string(group)) + message.append(struct.pack('>i', len(grouped_payloads))) for topic, topic_payloads in grouped_payloads.items(): - message += write_short_string(topic) - message += struct.pack('>i', len(topic_payloads)) + message.append(write_short_string(topic)) + message.append(struct.pack('>i', len(topic_payloads))) for partition, payload in topic_payloads.items(): - message += struct.pack('>i', partition) + message.append(struct.pack('>i', partition)) - return struct.pack('>i%ds' % len(message), len(message), message) + msg = b''.join(message) + return struct.pack('>i%ds' % len(msg), len(msg), msg) @classmethod def decode_offset_fetch_response(cls, data): |