diff options
-rw-r--r-- | kafka/codec.py | 55 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 22 | ||||
-rw-r--r-- | kafka/producer/base.py | 4 | ||||
-rw-r--r-- | kafka/protocol.py | 6 | ||||
-rw-r--r-- | test/test_producer_integration.py | 16 | ||||
-rw-r--r-- | test/test_protocol.py | 66 |
6 files changed, 120 insertions, 49 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index 2279200..19f405b 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,8 +1,7 @@ -from io import BytesIO import gzip +from io import BytesIO import struct -import six from six.moves import xrange _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) @@ -10,9 +9,9 @@ _XERIAL_V1_FORMAT = 'bccccccBii' try: import snappy - _has_snappy = True + _HAS_SNAPPY = True except ImportError: - _has_snappy = False + _HAS_SNAPPY = False def has_gzip(): @@ -20,26 +19,36 @@ def has_gzip(): def has_snappy(): - return _has_snappy + return _HAS_SNAPPY def gzip_encode(payload): - buffer = BytesIO() - handle = gzip.GzipFile(fileobj=buffer, mode="w") - handle.write(payload) - handle.close() - buffer.seek(0) - result = buffer.read() - buffer.close() + with BytesIO() as buf: + + # Gzip context manager introduced in python 2.6 + # so old-fashioned way until we decide to not support 2.6 + gzipper = gzip.GzipFile(fileobj=buf, mode="w") + try: + gzipper.write(payload) + finally: + gzipper.close() + + result = buf.getvalue() + return result def gzip_decode(payload): - buffer = BytesIO(payload) - handle = gzip.GzipFile(fileobj=buffer, mode='r') - result = handle.read() - handle.close() - buffer.close() + with BytesIO(payload) as buf: + + # Gzip context manager introduced in python 2.6 + # so old-fashioned way until we decide to not support 2.6 + gzipper = gzip.GzipFile(fileobj=buf, mode='r') + try: + result = gzipper.read() + finally: + gzipper.close() + return result @@ -47,8 +56,8 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): """Encodes the given data with snappy if xerial_compatible is set then the stream is encoded in a fashion compatible with the xerial snappy library - The block size (xerial_blocksize) controls how frequent the blocking occurs - 32k is the default in the xerial library. + The block size (xerial_blocksize) controls how frequent the blocking + occurs 32k is the default in the xerial library. The format winds up being +-------------+------------+--------------+------------+--------------+ @@ -63,7 +72,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): length will always be <= blocksize. """ - if not _has_snappy: + if not has_snappy(): raise NotImplementedError("Snappy codec is not available") if xerial_compatible: @@ -74,7 +83,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): out = BytesIO() header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat - in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) out.write(header) for chunk in _chunker(): @@ -113,13 +122,13 @@ def _detect_xerial_stream(payload): """ if len(payload) > 16: - header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) + header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) return header == _XERIAL_V1_HEADER return False def snappy_decode(payload): - if not _has_snappy: + if not has_snappy(): raise NotImplementedError("Snappy codec is not available") if _detect_xerial_stream(payload): diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 4dc04dc..bec3100 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -123,26 +123,28 @@ class MultiProcessConsumer(Consumer): self.pause = Event() # Requests the consumers to pause fetch self.size = Value('i', 0) # Indicator of number of messages to fetch - partitions = self.offsets.keys() + # dict.keys() returns a view in py3 + it's not a thread-safe operation + # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3 + # It's safer to copy dict as it only runs during the init. + partitions = list(self.offsets.copy().keys()) - # If unspecified, start one consumer per partition + # By default, start one consumer process for all partitions # The logic below ensures that # * we do not cross the num_procs limit # * we have an even distribution of partitions among processes - if not partitions_per_proc: - partitions_per_proc = round(len(partitions) * 1.0 / num_procs) - if partitions_per_proc < num_procs * 0.5: - partitions_per_proc += 1 + + if partitions_per_proc: + num_procs = len(partitions) / partitions_per_proc + if num_procs * partitions_per_proc < len(partitions): + num_procs += 1 # The final set of chunks - chunker = lambda *x: [] + list(x) - chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) + chunks = [partitions[proc::num_procs] for proc in range(num_procs)] self.procs = [] for chunk in chunks: - chunk = filter(lambda x: x is not None, chunk) args = (client.copy(), - group, topic, list(chunk), + group, topic, chunk, self.queue, self.start, self.exit, self.pause, self.size) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index e32b168..6a5a94e 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -58,7 +58,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Adjust the timeout to match the remaining period count -= 1 timeout = send_at - time.time() - msgset[topic_partition].append(msg) + msgset[topic_partition].append((msg, key)) # Send collected requests upstream reqs = [] @@ -192,7 +192,7 @@ class Producer(object): self.queue.put((TopicAndPartition(topic, partition), m, key)) resp = [] else: - messages = create_message_set(msg, self.codec, key) + messages = create_message_set([(m, key) for m in msg], self.codec, key) req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, diff --git a/kafka/protocol.py b/kafka/protocol.py index 2a39de6..b34a95d 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -559,7 +559,7 @@ def create_gzip_message(payloads, key=None): """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload, key) for payload in payloads]) + [create_message(payload, pl_key) for payload, pl_key in payloads]) gzipped = gzip_encode(message_set) codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP @@ -580,7 +580,7 @@ def create_snappy_message(payloads, key=None): """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload, key) for payload in payloads]) + [create_message(payload, pl_key) for payload, pl_key in payloads]) snapped = snappy_encode(message_set) codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY @@ -595,7 +595,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None): return a list containing a single codec-encoded message. """ if codec == CODEC_NONE: - return [create_message(m, key) for m in messages] + return [create_message(m, k) for m, k in messages] elif codec == CODEC_GZIP: return [create_gzip_message(messages, key)] elif codec == CODEC_SNAPPY: diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 38df69f..1804af0 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -71,9 +71,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) message1 = create_gzip_message([ - ("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)]) + (("Gzipped 1 %d" % i).encode('utf-8'), None) for i in range(100)]) message2 = create_gzip_message([ - ("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)]) + (("Gzipped 2 %d" % i).encode('utf-8'), None) for i in range(100)]) self.assert_produce_request( [ message1, message2 ], @@ -87,8 +87,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) self.assert_produce_request([ - create_snappy_message(["Snappy 1 %d" % i for i in range(100)]), - create_snappy_message(["Snappy 2 %d" % i for i in range(100)]), + create_snappy_message([("Snappy 1 %d" % i, None) for i in range(100)]), + create_snappy_message([("Snappy 2 %d" % i, None) for i in range(100)]), ], start_offset, 200, @@ -102,13 +102,13 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): messages = [ create_message(b"Just a plain message"), create_gzip_message([ - ("Gzipped %d" % i).encode('utf-8') for i in range(100)]), + (("Gzipped %d" % i).encode('utf-8'), None) for i in range(100)]), ] # All snappy integration tests fail with nosnappyjava if False and has_snappy(): msg_count += 100 - messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)])) + messages.append(create_snappy_message([("Snappy %d" % i, None) for i in range(100)])) self.assert_produce_request(messages, start_offset, msg_count) @@ -118,7 +118,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_produce_request([ create_gzip_message([ - ("Gzipped batch 1, message %d" % i).encode('utf-8') + (("Gzipped batch 1, message %d" % i).encode('utf-8'), None) for i in range(50000)]) ], start_offset, @@ -127,7 +127,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_produce_request([ create_gzip_message([ - ("Gzipped batch 1, message %d" % i).encode('utf-8') + (("Gzipped batch 1, message %d" % i).encode('utf-8'), None) for i in range(50000)]) ], start_offset+50000, diff --git a/test/test_protocol.py b/test/test_protocol.py index d20f591..0938228 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -32,7 +32,7 @@ class TestProtocol(unittest.TestCase): self.assertEqual(msg.value, payload) def test_create_gzip(self): - payloads = [b"v1", b"v2"] + payloads = [(b"v1", None), (b"v2", None)] msg = create_gzip_message(payloads) self.assertEqual(msg.magic, 0) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) @@ -59,9 +59,39 @@ class TestProtocol(unittest.TestCase): self.assertEqual(decoded, expect) + def test_create_gzip_keyed(self): + payloads = [(b"v1", b"k1"), (b"v2", b"k2")] + msg = create_gzip_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) + self.assertEqual(msg.key, None) + # Need to decode to check since gzipped payload is non-deterministic + decoded = gzip_decode(msg.value) + expect = b"".join([ + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", 1474775406), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k1", # Key + struct.pack(">i", 2), # Length of value + b"v1", # Value + + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", -16383415), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k2", # Key + struct.pack(">i", 2), # Length of value + b"v2", # Value + ]) + + self.assertEqual(decoded, expect) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_create_snappy(self): - payloads = [b"v1", b"v2"] + payloads = [(b"v1", None), (b"v2", None)] msg = create_snappy_message(payloads) self.assertEqual(msg.magic, 0) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) @@ -87,6 +117,36 @@ class TestProtocol(unittest.TestCase): self.assertEqual(decoded, expect) + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_create_snappy_keyed(self): + payloads = [(b"v1", b"k1"), (b"v2", b"k2")] + msg = create_snappy_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) + self.assertEqual(msg.key, None) + decoded = snappy_decode(msg.value) + expect = b"".join([ + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", 1474775406), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k1", # Key + struct.pack(">i", 2), # Length of value + b"v1", # Value + + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", -16383415), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k2", # Key + struct.pack(">i", 2), # Length of value + b"v2", # Value + ]) + + self.assertEqual(decoded, expect) + def test_encode_message_header(self): expect = b"".join([ struct.pack(">h", 10), # API Key @@ -701,7 +761,7 @@ class TestProtocol(unittest.TestCase): yield def test_create_message_set(self): - messages = [1, 2, 3] + messages = [(1, "k1"), (2, "k2"), (3, "k3")] # Default codec is CODEC_NONE. Expect list of regular messages. expect = [sentinel.message] * len(messages) |