diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 22:54:28 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 22:54:28 -0700 |
commit | 90c729438a2e3f1b194e58231e41bd16bd7b7172 (patch) | |
tree | b22cef6b10fd167fb22b8318e1294f6137427f3b /test | |
parent | 452e7c2190b83f320f58e7f650302696dde458ed (diff) | |
download | kafka-python-protocol_versions.tar.gz |
Use version-indexed lists for request/response protocol structsprotocol_versions
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client.py | 20 | ||||
-rw-r--r-- | test/test_client_async.py | 8 | ||||
-rw-r--r-- | test/test_conn.py | 6 | ||||
-rw-r--r-- | test/test_consumer_group.py | 2 | ||||
-rw-r--r-- | test/test_coordinator.py | 67 | ||||
-rw-r--r-- | test/test_fetcher.py | 24 |
6 files changed, 64 insertions, 63 deletions
diff --git a/test/test_client.py b/test/test_client.py index 42d7dbd..38235fd 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -137,7 +137,7 @@ class TestSimpleClient(unittest.TestCase): (NO_ERROR, 2, 0, [0, 1], [0, 1]) ]) ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) # client loads metadata at init client = SimpleClient(hosts=['broker_1:4567']) @@ -179,7 +179,7 @@ class TestSimpleClient(unittest.TestCase): (NO_LEADER, 1, -1, [], []), ]), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) @@ -209,7 +209,7 @@ class TestSimpleClient(unittest.TestCase): (NO_LEADER, 1, -1, [], []), ]), ] - decode_metadata_response.return_value = MetadataResponse(brokers, topics) + decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) @@ -237,7 +237,7 @@ class TestSimpleClient(unittest.TestCase): topics = [ (NO_LEADER, 'topic_no_partitions', []) ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) @@ -249,7 +249,7 @@ class TestSimpleClient(unittest.TestCase): (NO_ERROR, 0, 0, [0, 1], [0, 1]) ]) ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) # calling _get_leader_for_partition (from any broker aware request) # will try loading metadata again for the same topic @@ -275,7 +275,7 @@ class TestSimpleClient(unittest.TestCase): (NO_LEADER, 'topic_no_partitions', []), (UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) @@ -304,7 +304,7 @@ class TestSimpleClient(unittest.TestCase): (NO_LEADER, 1, -1, [], []), ]), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) self.assertDictEqual( @@ -330,7 +330,7 @@ class TestSimpleClient(unittest.TestCase): (NO_ERROR, 1, 1, [1, 0], [1, 0]) ]), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) @@ -350,7 +350,7 @@ class TestSimpleClient(unittest.TestCase): (NO_LEADER, 1, -1, [], []), ]), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) @@ -375,7 +375,7 @@ class TestSimpleClient(unittest.TestCase): topics = [ (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics) client = SimpleClient(hosts=['broker_1:4567']) diff --git a/test/test_client_async.py b/test/test_client_async.py index eaac8e1..2cf348c 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -37,7 +37,7 @@ def conn(mocker): conn.return_value = conn conn.state = ConnectionStates.CONNECTED conn.send.return_value = Future().success( - MetadataResponse( + MetadataResponse[0]( [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics conn.blacked_out.return_value = False @@ -51,7 +51,7 @@ def test_bootstrap_success(conn): cli = KafkaClient() conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config) conn.connect.assert_called_with() - conn.send.assert_called_once_with(MetadataRequest([])) + conn.send.assert_called_once_with(MetadataRequest[0]([])) assert cli._bootstrap_fails == 0 assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12), BrokerMetadata(1, 'bar', 34)]) @@ -230,12 +230,12 @@ def test_send(conn): conn.state = ConnectionStates.CONNECTED cli._maybe_connect(0) # ProduceRequest w/ 0 required_acks -> no response - request = ProduceRequest(0, 0, []) + request = ProduceRequest[0](0, 0, []) ret = cli.send(0, request) assert conn.send.called_with(request, expect_response=False) assert isinstance(ret, Future) - request = MetadataRequest([]) + request = MetadataRequest[0]([]) cli.send(0, request) assert conn.send.called_with(request, expect_response=True) diff --git a/test/test_conn.py b/test/test_conn.py index 5432ebd..a55e39b 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -111,7 +111,7 @@ def test_send_max_ifr(conn): def test_send_no_response(socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED - req = MetadataRequest([]) + req = MetadataRequest[0]([]) header = RequestHeader(req, client_id=conn.config['client_id']) payload_bytes = len(header.encode()) + len(req.encode()) third = payload_bytes // 3 @@ -128,7 +128,7 @@ def test_send_no_response(socket, conn): def test_send_response(socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED - req = MetadataRequest([]) + req = MetadataRequest[0]([]) header = RequestHeader(req, client_id=conn.config['client_id']) payload_bytes = len(header.encode()) + len(req.encode()) third = payload_bytes // 3 @@ -144,7 +144,7 @@ def test_send_response(socket, conn): def test_send_error(socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED - req = MetadataRequest([]) + req = MetadataRequest[0]([]) header = RequestHeader(req, client_id=conn.config['client_id']) try: error = ConnectionError diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index c02eddc..fe66d2b 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -146,7 +146,7 @@ def conn(mocker): conn.return_value = conn conn.state = ConnectionStates.CONNECTED conn.send.return_value = Future().success( - MetadataResponse( + MetadataResponse[0]( [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics return conn diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 1dc7788..629b72f 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -16,9 +16,8 @@ from kafka.conn import ConnectionStates import kafka.errors as Errors from kafka.future import Future from kafka.protocol.commit import ( - OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2, - OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1, - OffsetFetchResponse) + OffsetCommitRequest, OffsetCommitResponse, + OffsetFetchRequest, OffsetFetchResponse) from kafka.protocol.metadata import MetadataResponse from kafka.util import WeakMethod @@ -29,7 +28,7 @@ def conn(mocker): conn.return_value = conn conn.state = ConnectionStates.CONNECTED conn.send.return_value = Future().success( - MetadataResponse( + MetadataResponse[0]( [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics return conn @@ -98,7 +97,7 @@ def test_pattern_subscription(coordinator, api_version): assert coordinator._subscription.needs_partition_assignment is False cluster = coordinator._client.cluster - cluster.update_metadata(MetadataResponse( + cluster.update_metadata(MetadataResponse[0]( # brokers [(0, 'foo', 12), (1, 'bar', 34)], # topics @@ -428,9 +427,9 @@ def test_send_offset_commit_request_fail(patched_coord, offsets): @pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetCommitRequest_v0), - ((0, 8, 2), OffsetCommitRequest_v1), - ((0, 9), OffsetCommitRequest_v2)]) + ((0, 8, 1), OffsetCommitRequest[0]), + ((0, 8, 2), OffsetCommitRequest[1]), + ((0, 9), OffsetCommitRequest[2])]) def test_send_offset_commit_request_versions(patched_coord, offsets, api_version, req_type): # assuming fixture sets coordinator=0, least_loaded_node=1 @@ -460,36 +459,36 @@ def test_send_offset_commit_request_success(patched_coord, offsets): patched_coord._client.send.return_value = _f future = patched_coord._send_offset_commit_request(offsets) (node, request), _ = patched_coord._client.send.call_args - response = OffsetCommitResponse([('foobar', [(0, 0), (1, 0)])]) + response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])]) _f.success(response) patched_coord._handle_offset_commit_response.assert_called_with( offsets, future, response) @pytest.mark.parametrize('response,error,dead,reassign', [ - (OffsetCommitResponse([('foobar', [(0, 30), (1, 30)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]), Errors.GroupAuthorizationFailedError, False, False), - (OffsetCommitResponse([('foobar', [(0, 12), (1, 12)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]), Errors.OffsetMetadataTooLargeError, False, False), - (OffsetCommitResponse([('foobar', [(0, 28), (1, 28)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]), Errors.InvalidCommitOffsetSizeError, False, False), - (OffsetCommitResponse([('foobar', [(0, 14), (1, 14)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]), Errors.GroupLoadInProgressError, False, False), - (OffsetCommitResponse([('foobar', [(0, 15), (1, 15)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]), Errors.GroupCoordinatorNotAvailableError, True, False), - (OffsetCommitResponse([('foobar', [(0, 16), (1, 16)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]), Errors.NotCoordinatorForGroupError, True, False), - (OffsetCommitResponse([('foobar', [(0, 7), (1, 7)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]), Errors.RequestTimedOutError, True, False), - (OffsetCommitResponse([('foobar', [(0, 25), (1, 25)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]), Errors.CommitFailedError, False, True), - (OffsetCommitResponse([('foobar', [(0, 22), (1, 22)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]), Errors.CommitFailedError, False, True), - (OffsetCommitResponse([('foobar', [(0, 27), (1, 27)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]), Errors.CommitFailedError, False, True), - (OffsetCommitResponse([('foobar', [(0, 17), (1, 17)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]), Errors.InvalidTopicError, False, False), - (OffsetCommitResponse([('foobar', [(0, 29), (1, 29)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]), Errors.TopicAuthorizationFailedError, False, False), ]) def test_handle_offset_commit_response(patched_coord, offsets, @@ -523,9 +522,9 @@ def test_send_offset_fetch_request_fail(patched_coord, partitions): @pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetFetchRequest_v0), - ((0, 8, 2), OffsetFetchRequest_v1), - ((0, 9), OffsetFetchRequest_v1)]) + ((0, 8, 1), OffsetFetchRequest[0]), + ((0, 8, 2), OffsetFetchRequest[1]), + ((0, 9), OffsetFetchRequest[1])]) def test_send_offset_fetch_request_versions(patched_coord, partitions, api_version, req_type): # assuming fixture sets coordinator=0, least_loaded_node=1 @@ -555,30 +554,30 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): patched_coord._client.send.return_value = _f future = patched_coord._send_offset_fetch_request(partitions) (node, request), _ = patched_coord._client.send.call_args - response = OffsetFetchResponse([('foobar', [(0, 0), (1, 0)])]) + response = OffsetFetchResponse[0]([('foobar', [(0, 0), (1, 0)])]) _f.success(response) patched_coord._handle_offset_fetch_response.assert_called_with( future, response) @pytest.mark.parametrize('response,error,dead,reassign', [ - #(OffsetFetchResponse([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]), + #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]), # Errors.GroupAuthorizationFailedError, False, False), - #(OffsetFetchResponse([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]), + #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]), # Errors.RequestTimedOutError, True, False), - #(OffsetFetchResponse([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]), + #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]), # Errors.RebalanceInProgressError, False, True), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), Errors.GroupLoadInProgressError, False, False), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), Errors.NotCoordinatorForGroupError, True, False), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), Errors.UnknownMemberIdError, False, True), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), Errors.IllegalGenerationError, False, True), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), Errors.TopicAuthorizationFailedError, False, False), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), None, False, False), ]) def test_handle_offset_fetch_response(patched_coord, offsets, diff --git a/test/test_fetcher.py b/test/test_fetcher.py index cdd324f..644adfa 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -34,17 +34,19 @@ def fetcher(client, subscription_state): def test_init_fetches(fetcher, mocker): fetch_requests = [ - FetchRequest(-1, fetcher.config['fetch_max_wait_ms'], - fetcher.config['fetch_min_bytes'], - [('foobar', [ - (0, 0, fetcher.config['max_partition_fetch_bytes']), - (1, 0, fetcher.config['max_partition_fetch_bytes']), - ])]), - FetchRequest(-1, fetcher.config['fetch_max_wait_ms'], - fetcher.config['fetch_min_bytes'], - [('foobar', [ - (2, 0, fetcher.config['max_partition_fetch_bytes']), - ])]) + FetchRequest[0]( + -1, fetcher.config['fetch_max_wait_ms'], + fetcher.config['fetch_min_bytes'], + [('foobar', [ + (0, 0, fetcher.config['max_partition_fetch_bytes']), + (1, 0, fetcher.config['max_partition_fetch_bytes']), + ])]), + FetchRequest[0]( + -1, fetcher.config['fetch_max_wait_ms'], + fetcher.config['fetch_min_bytes'], + [('foobar', [ + (2, 0, fetcher.config['max_partition_fetch_bytes']), + ])]) ] mocker.patch.object(fetcher, '_create_fetch_requests', |