diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 22:54:28 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 22:54:28 -0700 |
commit | 90c729438a2e3f1b194e58231e41bd16bd7b7172 (patch) | |
tree | b22cef6b10fd167fb22b8318e1294f6137427f3b /test/test_coordinator.py | |
parent | 452e7c2190b83f320f58e7f650302696dde458ed (diff) | |
download | kafka-python-protocol_versions.tar.gz |
Use version-indexed lists for request/response protocol structsprotocol_versions
Diffstat (limited to 'test/test_coordinator.py')
-rw-r--r-- | test/test_coordinator.py | 67 |
1 files changed, 33 insertions, 34 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 1dc7788..629b72f 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -16,9 +16,8 @@ from kafka.conn import ConnectionStates import kafka.errors as Errors from kafka.future import Future from kafka.protocol.commit import ( - OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2, - OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1, - OffsetFetchResponse) + OffsetCommitRequest, OffsetCommitResponse, + OffsetFetchRequest, OffsetFetchResponse) from kafka.protocol.metadata import MetadataResponse from kafka.util import WeakMethod @@ -29,7 +28,7 @@ def conn(mocker): conn.return_value = conn conn.state = ConnectionStates.CONNECTED conn.send.return_value = Future().success( - MetadataResponse( + MetadataResponse[0]( [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics return conn @@ -98,7 +97,7 @@ def test_pattern_subscription(coordinator, api_version): assert coordinator._subscription.needs_partition_assignment is False cluster = coordinator._client.cluster - cluster.update_metadata(MetadataResponse( + cluster.update_metadata(MetadataResponse[0]( # brokers [(0, 'foo', 12), (1, 'bar', 34)], # topics @@ -428,9 +427,9 @@ def test_send_offset_commit_request_fail(patched_coord, offsets): @pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetCommitRequest_v0), - ((0, 8, 2), OffsetCommitRequest_v1), - ((0, 9), OffsetCommitRequest_v2)]) + ((0, 8, 1), OffsetCommitRequest[0]), + ((0, 8, 2), OffsetCommitRequest[1]), + ((0, 9), OffsetCommitRequest[2])]) def test_send_offset_commit_request_versions(patched_coord, offsets, api_version, req_type): # assuming fixture sets coordinator=0, least_loaded_node=1 @@ -460,36 +459,36 @@ def test_send_offset_commit_request_success(patched_coord, offsets): patched_coord._client.send.return_value = _f future = patched_coord._send_offset_commit_request(offsets) (node, request), _ = patched_coord._client.send.call_args - response = OffsetCommitResponse([('foobar', [(0, 0), (1, 0)])]) + response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])]) _f.success(response) patched_coord._handle_offset_commit_response.assert_called_with( offsets, future, response) @pytest.mark.parametrize('response,error,dead,reassign', [ - (OffsetCommitResponse([('foobar', [(0, 30), (1, 30)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]), Errors.GroupAuthorizationFailedError, False, False), - (OffsetCommitResponse([('foobar', [(0, 12), (1, 12)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]), Errors.OffsetMetadataTooLargeError, False, False), - (OffsetCommitResponse([('foobar', [(0, 28), (1, 28)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]), Errors.InvalidCommitOffsetSizeError, False, False), - (OffsetCommitResponse([('foobar', [(0, 14), (1, 14)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]), Errors.GroupLoadInProgressError, False, False), - (OffsetCommitResponse([('foobar', [(0, 15), (1, 15)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]), Errors.GroupCoordinatorNotAvailableError, True, False), - (OffsetCommitResponse([('foobar', [(0, 16), (1, 16)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]), Errors.NotCoordinatorForGroupError, True, False), - (OffsetCommitResponse([('foobar', [(0, 7), (1, 7)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]), Errors.RequestTimedOutError, True, False), - (OffsetCommitResponse([('foobar', [(0, 25), (1, 25)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]), Errors.CommitFailedError, False, True), - (OffsetCommitResponse([('foobar', [(0, 22), (1, 22)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]), Errors.CommitFailedError, False, True), - (OffsetCommitResponse([('foobar', [(0, 27), (1, 27)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]), Errors.CommitFailedError, False, True), - (OffsetCommitResponse([('foobar', [(0, 17), (1, 17)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]), Errors.InvalidTopicError, False, False), - (OffsetCommitResponse([('foobar', [(0, 29), (1, 29)])]), + (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]), Errors.TopicAuthorizationFailedError, False, False), ]) def test_handle_offset_commit_response(patched_coord, offsets, @@ -523,9 +522,9 @@ def test_send_offset_fetch_request_fail(patched_coord, partitions): @pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetFetchRequest_v0), - ((0, 8, 2), OffsetFetchRequest_v1), - ((0, 9), OffsetFetchRequest_v1)]) + ((0, 8, 1), OffsetFetchRequest[0]), + ((0, 8, 2), OffsetFetchRequest[1]), + ((0, 9), OffsetFetchRequest[1])]) def test_send_offset_fetch_request_versions(patched_coord, partitions, api_version, req_type): # assuming fixture sets coordinator=0, least_loaded_node=1 @@ -555,30 +554,30 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): patched_coord._client.send.return_value = _f future = patched_coord._send_offset_fetch_request(partitions) (node, request), _ = patched_coord._client.send.call_args - response = OffsetFetchResponse([('foobar', [(0, 0), (1, 0)])]) + response = OffsetFetchResponse[0]([('foobar', [(0, 0), (1, 0)])]) _f.success(response) patched_coord._handle_offset_fetch_response.assert_called_with( future, response) @pytest.mark.parametrize('response,error,dead,reassign', [ - #(OffsetFetchResponse([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]), + #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]), # Errors.GroupAuthorizationFailedError, False, False), - #(OffsetFetchResponse([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]), + #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]), # Errors.RequestTimedOutError, True, False), - #(OffsetFetchResponse([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]), + #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]), # Errors.RebalanceInProgressError, False, True), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), Errors.GroupLoadInProgressError, False, False), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), Errors.NotCoordinatorForGroupError, True, False), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), Errors.UnknownMemberIdError, False, True), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), Errors.IllegalGenerationError, False, True), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), Errors.TopicAuthorizationFailedError, False, False), - (OffsetFetchResponse([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), None, False, False), ]) def test_handle_offset_fetch_response(patched_coord, offsets, |