diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2018-10-29 00:09:45 -0700 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-10-29 02:34:36 -0700 |
commit | 9eb63e180e090f121ffa4bb3369bea5ac51ac548 (patch) | |
tree | 1049f4e5a96def4e3505a5aafb4fa96e8ebdedcb /test/testutil.py | |
parent | 3e332e83258f7cfffd4df13ba9f17647dc302c43 (diff) | |
download | kafka-python-migrate-from-unittest-to-pytest.tar.gz |
test-this-toomigrate-from-unittest-to-pytest
Diffstat (limited to 'test/testutil.py')
-rw-r--r-- | test/testutil.py | 26 |
1 files changed, 3 insertions, 23 deletions
diff --git a/test/testutil.py b/test/testutil.py index feb6f6d..6f6cafb 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -3,20 +3,19 @@ from __future__ import absolute_import import functools import operator import os -import socket import time import uuid import pytest from . import unittest -from kafka import SimpleClient, create_message +from kafka import SimpleClient from kafka.errors import ( LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError ) -from kafka.structs import OffsetRequestPayload, ProduceRequestPayload +from kafka.structs import OffsetRequestPayload from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order @@ -67,26 +66,6 @@ def kafka_versions(*versions): return real_kafka_versions -_MESSAGES = {} -def msg(message): - """Format, encode and deduplicate a message - """ - global _MESSAGES #pylint: disable=global-statement - if message not in _MESSAGES: - _MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4())) - - return _MESSAGES[message].encode('utf-8') - -def send_messages(client, topic, partition, messages): - """Send messages to a topic's partition - """ - messages = [create_message(msg(str(m))) for m in messages] - produce = ProduceRequestPayload(topic, partition, messages=messages) - resp, = client.send_produce_request([produce]) - assert resp.error == 0 - - return [x.value for x in messages] - def current_offset(client, topic, partition, kafka_broker=None): """Get the current offset of a topic's partition """ @@ -101,6 +80,7 @@ def current_offset(client, topic, partition, kafka_broker=None): else: return offsets.offsets[0] + class KafkaIntegrationTestCase(unittest.TestCase): create_client = True topic = None |