diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2018-11-10 12:48:33 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-11-10 12:48:33 -0800 |
commit | bb5bc1fcfc09c9c9994edbbae0af2ff6802c353d (patch) | |
tree | acce192f70b0eeafd9dd68f80d5b2f6739247b42 /test/testutil.py | |
parent | cd47701ba63fc77309066e27b73f50d0150e3e1b (diff) | |
download | kafka-python-bb5bc1fcfc09c9c9994edbbae0af2ff6802c353d.tar.gz |
Migrate from `Unittest` to `pytest` (#1620)
Diffstat (limited to 'test/testutil.py')
-rw-r--r-- | test/testutil.py | 26 |
1 files changed, 3 insertions, 23 deletions
diff --git a/test/testutil.py b/test/testutil.py index feb6f6d..6f6cafb 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -3,20 +3,19 @@ from __future__ import absolute_import import functools import operator import os -import socket import time import uuid import pytest from . import unittest -from kafka import SimpleClient, create_message +from kafka import SimpleClient from kafka.errors import ( LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError ) -from kafka.structs import OffsetRequestPayload, ProduceRequestPayload +from kafka.structs import OffsetRequestPayload from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order @@ -67,26 +66,6 @@ def kafka_versions(*versions): return real_kafka_versions -_MESSAGES = {} -def msg(message): - """Format, encode and deduplicate a message - """ - global _MESSAGES #pylint: disable=global-statement - if message not in _MESSAGES: - _MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4())) - - return _MESSAGES[message].encode('utf-8') - -def send_messages(client, topic, partition, messages): - """Send messages to a topic's partition - """ - messages = [create_message(msg(str(m))) for m in messages] - produce = ProduceRequestPayload(topic, partition, messages=messages) - resp, = client.send_produce_request([produce]) - assert resp.error == 0 - - return [x.value for x in messages] - def current_offset(client, topic, partition, kafka_broker=None): """Get the current offset of a topic's partition """ @@ -101,6 +80,7 @@ def current_offset(client, topic, partition, kafka_broker=None): else: return offsets.offsets[0] + class KafkaIntegrationTestCase(unittest.TestCase): create_client = True topic = None |