diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-22 16:56:28 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-22 16:56:28 +0900 |
commit | a345dcd2ca1b0f8934864c512a4a78c65034dd36 (patch) | |
tree | 0b7ea8c67b015f944b9a401f5e024a2eff7c7db9 /test/test_producer.py | |
parent | 4dbf34abce9b4addbb304520e2f692fbaef60ae5 (diff) | |
download | kafka-python-a345dcd2ca1b0f8934864c512a4a78c65034dd36.tar.gz |
Fix timestamp not passed to RecordMetadata (#1273)
* Fix timestamp not being passed to RecordMetadata properly
* Add more tests for LegacyBatch
* Fix producer test for recordmetadata
Diffstat (limited to 'test/test_producer.py')
-rw-r--r-- | test/test_producer.py | 54 |
1 files changed, 52 insertions, 2 deletions
diff --git a/test/test_producer.py b/test/test_producer.py index 1f6608a..41bd52e 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,11 +1,11 @@ import gc import platform -import sys +import time import threading import pytest -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaConsumer, KafkaProducer, TopicPartition from kafka.producer.buffer import SimpleBufferPool from test.conftest import version from test.testutil import random_string @@ -78,3 +78,53 @@ def test_kafka_producer_gc_cleanup(): del(producer) gc.collect() assert threading.active_count() == threads + + +@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +def test_kafka_producer_proper_record_metadata(kafka_broker, compression): + connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) + producer = KafkaProducer(bootstrap_servers=connect_str, + retries=5, + max_block_ms=10000, + compression_type=compression) + if producer.config['api_version'] >= (0, 10): + magic = 1 + else: + magic = 0 + + topic = random_string(5) + future = producer.send( + topic, + value=b"Simple value", key=b"Simple key", timestamp_ms=9999999, + partition=0) + record = future.get(timeout=5) + assert record is not None + assert record.topic == topic + assert record.partition == 0 + assert record.topic_partition == TopicPartition(topic, 0) + assert record.offset == 0 + if magic >= 1: + assert record.timestamp == 9999999 + else: + assert record.timestamp == -1 # NO_TIMESTAMP + + if magic == 1: + assert record.checksum == 1370034956 + else: + assert record.checksum == 3296137851 + + assert record.serialized_key_size == 10 + assert record.serialized_value_size == 12 + + # generated timestamp case is skipped for broker 0.9 and below + if magic == 0: + return + + send_time = time.time() * 1000 + future = producer.send( + topic, + value=b"Simple value", key=b"Simple key", timestamp_ms=None, + partition=0) + record = future.get(timeout=5) + assert abs(record.timestamp - send_time) <= 1000 # Allow 1s deviation |