diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-07 17:57:24 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-07 17:57:24 -0800 |
commit | 9a8af1499ca425366d934487469d9977fae7fe5f (patch) | |
tree | 01ad6026ca2ecf62e554f4fabacc045b46c01737 /test | |
parent | d4e85ecd1d8acac1a0f74d164b67faefd99987e4 (diff) | |
download | kafka-python-0.9.tar.gz |
Fix KafkaClient->SimpleClient references0.9
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client.py | 64 | ||||
-rw-r--r-- | test/test_consumer_group.py | 4 | ||||
-rw-r--r-- | test/test_failover_integration.py | 6 | ||||
-rw-r--r-- | test/test_producer.py | 10 | ||||
-rw-r--r-- | test/testutil.py | 4 |
5 files changed, 44 insertions, 44 deletions
diff --git a/test/test_client.py b/test/test_client.py index 8c62eb9..5a35c83 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -5,7 +5,7 @@ from mock import ANY, MagicMock, patch import six from . import unittest -from kafka import KafkaClient +from kafka import SimpleClient from kafka.common import ( ProduceRequestPayload, BrokerMetadata, @@ -35,33 +35,33 @@ def mock_conn(conn, success=True): conn.return_value = mocked -class TestKafkaClient(unittest.TestCase): +class TestSimpleClient(unittest.TestCase): def test_init_with_list(self): - with patch.object(KafkaClient, 'load_metadata_for_topics'): - client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) + with patch.object(SimpleClient, 'load_metadata_for_topics'): + client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) self.assertEqual( sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]), sorted(client.hosts)) def test_init_with_csv(self): - with patch.object(KafkaClient, 'load_metadata_for_topics'): - client = KafkaClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092') + with patch.object(SimpleClient, 'load_metadata_for_topics'): + client = SimpleClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092') self.assertEqual( sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]), sorted(client.hosts)) def test_init_with_unicode_csv(self): - with patch.object(KafkaClient, 'load_metadata_for_topics'): - client = KafkaClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092') + with patch.object(SimpleClient, 'load_metadata_for_topics'): + client = SimpleClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092') self.assertEqual( sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]), sorted(client.hosts)) - @patch.object(KafkaClient, '_get_conn') - @patch.object(KafkaClient, 'load_metadata_for_topics') + @patch.object(SimpleClient, '_get_conn') + @patch.object(SimpleClient, 'load_metadata_for_topics') def test_send_broker_unaware_request_fail(self, load_metadata, conn): mocked_conns = { ('kafka01', 9092): MagicMock(), @@ -74,7 +74,7 @@ class TestKafkaClient(unittest.TestCase): return mocked_conns[(host, port)] conn.side_effect = mock_get_conn - client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) + client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092']) req = KafkaProtocol.encode_metadata_request() with self.assertRaises(KafkaUnavailableError): @@ -102,10 +102,10 @@ class TestKafkaClient(unittest.TestCase): return mocked_conns[(host, port)] # patch to avoid making requests before we want it - with patch.object(KafkaClient, 'load_metadata_for_topics'): - with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + with patch.object(SimpleClient, 'load_metadata_for_topics'): + with patch.object(SimpleClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + client = SimpleClient(hosts='kafka01:9092,kafka02:9092') resp = client._send_broker_unaware_request(payloads=['fake request'], encoder_fn=MagicMock(), decoder_fn=lambda x: x) @@ -113,7 +113,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEqual('valid response', resp) mocked_conns[('kafka02', 9092)].recv.assert_called_once_with() - @patch('kafka.client.KafkaClient._get_conn') + @patch('kafka.SimpleClient._get_conn') @patch('kafka.client.KafkaProtocol') def test_load_metadata(self, protocol, conn): @@ -143,7 +143,7 @@ class TestKafkaClient(unittest.TestCase): protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # client loads metadata at init - client = KafkaClient(hosts=['broker_1:4567']) + client = SimpleClient(hosts=['broker_1:4567']) self.assertDictEqual({ TopicPartition('topic_1', 0): brokers[1], TopicPartition('topic_noleader', 0): None, @@ -163,7 +163,7 @@ class TestKafkaClient(unittest.TestCase): # This should not raise client.load_metadata_for_topics('topic_no_leader') - @patch('kafka.client.KafkaClient._get_conn') + @patch('kafka.SimpleClient._get_conn') @patch('kafka.client.KafkaProtocol') def test_has_metadata_for_topic(self, protocol, conn): @@ -184,7 +184,7 @@ class TestKafkaClient(unittest.TestCase): ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) - client = KafkaClient(hosts=['broker_1:4567']) + client = SimpleClient(hosts=['broker_1:4567']) # Topics with no partitions return False self.assertFalse(client.has_metadata_for_topic('topic_still_creating')) @@ -193,7 +193,7 @@ class TestKafkaClient(unittest.TestCase): # Topic with partition metadata, but no leaders return True self.assertTrue(client.has_metadata_for_topic('topic_noleaders')) - @patch('kafka.client.KafkaClient._get_conn') + @patch('kafka.SimpleClient._get_conn') @patch('kafka.client.KafkaProtocol.decode_metadata_response') def test_ensure_topic_exists(self, decode_metadata_response, conn): @@ -214,7 +214,7 @@ class TestKafkaClient(unittest.TestCase): ] decode_metadata_response.return_value = MetadataResponse(brokers, topics) - client = KafkaClient(hosts=['broker_1:4567']) + client = SimpleClient(hosts=['broker_1:4567']) with self.assertRaises(UnknownTopicOrPartitionError): client.ensure_topic_exists('topic_doesnt_exist', timeout=1) @@ -225,7 +225,7 @@ class TestKafkaClient(unittest.TestCase): # This should not raise client.ensure_topic_exists('topic_noleaders', timeout=1) - @patch('kafka.client.KafkaClient._get_conn') + @patch('kafka.SimpleClient._get_conn') @patch('kafka.client.KafkaProtocol') def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): "Get leader for partitions reload metadata if it is not available" @@ -242,7 +242,7 @@ class TestKafkaClient(unittest.TestCase): ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) - client = KafkaClient(hosts=['broker_1:4567']) + client = SimpleClient(hosts=['broker_1:4567']) # topic metadata is loaded but empty self.assertDictEqual({}, client.topics_to_brokers) @@ -263,7 +263,7 @@ class TestKafkaClient(unittest.TestCase): TopicPartition('topic_one_partition', 0): brokers[0]}, client.topics_to_brokers) - @patch('kafka.client.KafkaClient._get_conn') + @patch('kafka.SimpleClient._get_conn') @patch('kafka.client.KafkaProtocol') def test_get_leader_for_unassigned_partitions(self, protocol, conn): @@ -280,7 +280,7 @@ class TestKafkaClient(unittest.TestCase): ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) - client = KafkaClient(hosts=['broker_1:4567']) + client = SimpleClient(hosts=['broker_1:4567']) self.assertDictEqual({}, client.topics_to_brokers) @@ -290,7 +290,7 @@ class TestKafkaClient(unittest.TestCase): with self.assertRaises(UnknownTopicOrPartitionError): client._get_leader_for_partition('topic_unknown', 0) - @patch('kafka.client.KafkaClient._get_conn') + @patch('kafka.SimpleClient._get_conn') @patch('kafka.client.KafkaProtocol') def test_get_leader_exceptions_when_noleader(self, protocol, conn): @@ -309,7 +309,7 @@ class TestKafkaClient(unittest.TestCase): ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) - client = KafkaClient(hosts=['broker_1:4567']) + client = SimpleClient(hosts=['broker_1:4567']) self.assertDictEqual( { TopicPartition('topic_noleader', 0): None, @@ -337,7 +337,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) - @patch.object(KafkaClient, '_get_conn') + @patch.object(SimpleClient, '_get_conn') @patch('kafka.client.KafkaProtocol') def test_send_produce_request_raises_when_noleader(self, protocol, conn): mock_conn(conn) @@ -355,7 +355,7 @@ class TestKafkaClient(unittest.TestCase): ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) - client = KafkaClient(hosts=['broker_1:4567']) + client = SimpleClient(hosts=['broker_1:4567']) requests = [ProduceRequestPayload( "topic_noleader", 0, @@ -364,7 +364,7 @@ class TestKafkaClient(unittest.TestCase): with self.assertRaises(LeaderNotAvailableError): client.send_produce_request(requests) - @patch('kafka.client.KafkaClient._get_conn') + @patch('kafka.SimpleClient._get_conn') @patch('kafka.client.KafkaProtocol') def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn): @@ -380,7 +380,7 @@ class TestKafkaClient(unittest.TestCase): ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) - client = KafkaClient(hosts=['broker_1:4567']) + client = SimpleClient(hosts=['broker_1:4567']) requests = [ProduceRequestPayload( "topic_doesnt_exist", 0, @@ -403,9 +403,9 @@ class TestKafkaClient(unittest.TestCase): self.assertGreaterEqual(t.interval, 1.0) def test_correlation_rollover(self): - with patch.object(KafkaClient, 'load_metadata_for_topics'): + with patch.object(SimpleClient, 'load_metadata_for_topics'): big_num = 2**31 - 3 - client = KafkaClient(hosts=[], correlation_id=big_num) + client = SimpleClient(hosts=[], correlation_id=big_num) self.assertEqual(big_num + 1, client._next_id()) self.assertEqual(big_num + 2, client._next_id()) self.assertEqual(0, client._next_id()) diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 4fd4cdf..6160372 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -7,7 +7,7 @@ import time import pytest import six -from kafka import KafkaClient, SimpleProducer +from kafka import SimpleClient, SimpleProducer from kafka.common import TopicPartition from kafka.conn import BrokerConnection, ConnectionStates from kafka.consumer.group import KafkaConsumer @@ -47,7 +47,7 @@ def kafka_broker(version, zookeeper, request): @pytest.fixture def simple_client(kafka_broker): connect_str = 'localhost:' + str(kafka_broker.port) - return KafkaClient(connect_str) + return SimpleClient(connect_str) @pytest.fixture diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 5ffaa04..b54ace0 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -2,7 +2,7 @@ import logging import os import time -from kafka import KafkaClient, SimpleConsumer, KeyedProducer +from kafka import SimpleClient, SimpleConsumer, KeyedProducer from kafka.common import ( TopicPartition, FailedPayloadsError, ConnectionError, RequestTimedOutError ) @@ -34,7 +34,7 @@ class TestFailover(KafkaIntegrationTestCase): self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers] - self.client = KafkaClient(hosts, timeout=2) + self.client = SimpleClient(hosts, timeout=2) super(TestFailover, self).setUp() def tearDown(self): @@ -214,7 +214,7 @@ class TestFailover(KafkaIntegrationTestCase): hosts = ','.join(['%s:%d' % (broker.host, broker.port) for broker in self.brokers]) - client = KafkaClient(hosts) + client = SimpleClient(hosts) consumer = SimpleConsumer(client, None, topic, partitions=partitions, auto_commit=False, diff --git a/test/test_producer.py b/test/test_producer.py index 227d4ad..aa4f0be 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -8,7 +8,7 @@ import time from mock import MagicMock, patch from . import unittest -from kafka import KafkaClient, SimpleProducer, KeyedProducer +from kafka import SimpleClient, SimpleProducer, KeyedProducer from kafka.common import ( AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, ProduceResponsePayload, RetryOptions, TopicPartition @@ -89,11 +89,11 @@ class TestKafkaProducer(unittest.TestCase): def test_producer_sync_fail_on_error(self): error = FailedPayloadsError('failure') - with patch.object(KafkaClient, 'load_metadata_for_topics'): - with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]): - with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]): + with patch.object(SimpleClient, 'load_metadata_for_topics'): + with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]): + with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): - client = KafkaClient(MagicMock()) + client = SimpleClient(MagicMock()) producer = SimpleProducer(client, async=False, sync_fail_on_error=False) # This should not raise diff --git a/test/testutil.py b/test/testutil.py index 98fe805..2f3770e 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -10,7 +10,7 @@ import uuid from six.moves import xrange from . import unittest -from kafka import KafkaClient +from kafka import SimpleClient from kafka.common import OffsetRequestPayload __all__ = [ @@ -62,7 +62,7 @@ class KafkaIntegrationTestCase(unittest.TestCase): self.topic = topic if self.create_client: - self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port)) + self.client = SimpleClient('%s:%d' % (self.server.host, self.server.port)) self.client.ensure_topic_exists(self.topic) |