diff options
Diffstat (limited to 'test/test_producer.py')
-rw-r--r-- | test/test_producer.py | 54 |
1 files changed, 52 insertions, 2 deletions
diff --git a/test/test_producer.py b/test/test_producer.py index 1f6608a..41bd52e 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,11 +1,11 @@ import gc import platform -import sys +import time import threading import pytest -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaConsumer, KafkaProducer, TopicPartition from kafka.producer.buffer import SimpleBufferPool from test.conftest import version from test.testutil import random_string @@ -78,3 +78,53 @@ def test_kafka_producer_gc_cleanup(): del(producer) gc.collect() assert threading.active_count() == threads + + +@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +def test_kafka_producer_proper_record_metadata(kafka_broker, compression): + connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) + producer = KafkaProducer(bootstrap_servers=connect_str, + retries=5, + max_block_ms=10000, + compression_type=compression) + if producer.config['api_version'] >= (0, 10): + magic = 1 + else: + magic = 0 + + topic = random_string(5) + future = producer.send( + topic, + value=b"Simple value", key=b"Simple key", timestamp_ms=9999999, + partition=0) + record = future.get(timeout=5) + assert record is not None + assert record.topic == topic + assert record.partition == 0 + assert record.topic_partition == TopicPartition(topic, 0) + assert record.offset == 0 + if magic >= 1: + assert record.timestamp == 9999999 + else: + assert record.timestamp == -1 # NO_TIMESTAMP + + if magic == 1: + assert record.checksum == 1370034956 + else: + assert record.checksum == 3296137851 + + assert record.serialized_key_size == 10 + assert record.serialized_value_size == 12 + + # generated timestamp case is skipped for broker 0.9 and below + if magic == 0: + return + + send_time = time.time() * 1000 + future = producer.send( + topic, + value=b"Simple value", key=b"Simple key", timestamp_ms=None, + partition=0) + record = future.get(timeout=5) + assert abs(record.timestamp - send_time) <= 1000 # Allow 1s deviation |