summaryrefslogtreecommitdiff
path: root/test/test_producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer.py')
-rw-r--r--test/test_producer.py54
1 files changed, 52 insertions, 2 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index 1f6608a..41bd52e 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,11 +1,11 @@
import gc
import platform
-import sys
+import time
import threading
import pytest
-from kafka import KafkaConsumer, KafkaProducer
+from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from kafka.producer.buffer import SimpleBufferPool
from test.conftest import version
from test.testutil import random_string
@@ -78,3 +78,53 @@ def test_kafka_producer_gc_cleanup():
del(producer)
gc.collect()
assert threading.active_count() == threads
+
+
+@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
+def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
+ connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
+ producer = KafkaProducer(bootstrap_servers=connect_str,
+ retries=5,
+ max_block_ms=10000,
+ compression_type=compression)
+ if producer.config['api_version'] >= (0, 10):
+ magic = 1
+ else:
+ magic = 0
+
+ topic = random_string(5)
+ future = producer.send(
+ topic,
+ value=b"Simple value", key=b"Simple key", timestamp_ms=9999999,
+ partition=0)
+ record = future.get(timeout=5)
+ assert record is not None
+ assert record.topic == topic
+ assert record.partition == 0
+ assert record.topic_partition == TopicPartition(topic, 0)
+ assert record.offset == 0
+ if magic >= 1:
+ assert record.timestamp == 9999999
+ else:
+ assert record.timestamp == -1 # NO_TIMESTAMP
+
+ if magic == 1:
+ assert record.checksum == 1370034956
+ else:
+ assert record.checksum == 3296137851
+
+ assert record.serialized_key_size == 10
+ assert record.serialized_value_size == 12
+
+ # generated timestamp case is skipped for broker 0.9 and below
+ if magic == 0:
+ return
+
+ send_time = time.time() * 1000
+ future = producer.send(
+ topic,
+ value=b"Simple value", key=b"Simple key", timestamp_ms=None,
+ partition=0)
+ record = future.get(timeout=5)
+ assert abs(record.timestamp - send_time) <= 1000 # Allow 1s deviation