summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 22:54:28 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 22:54:28 -0700
commit90c729438a2e3f1b194e58231e41bd16bd7b7172 (patch)
treeb22cef6b10fd167fb22b8318e1294f6137427f3b /test
parent452e7c2190b83f320f58e7f650302696dde458ed (diff)
downloadkafka-python-protocol_versions.tar.gz
Use version-indexed lists for request/response protocol structsprotocol_versions
Diffstat (limited to 'test')
-rw-r--r--test/test_client.py20
-rw-r--r--test/test_client_async.py8
-rw-r--r--test/test_conn.py6
-rw-r--r--test/test_consumer_group.py2
-rw-r--r--test/test_coordinator.py67
-rw-r--r--test/test_fetcher.py24
6 files changed, 64 insertions, 63 deletions
diff --git a/test/test_client.py b/test/test_client.py
index 42d7dbd..38235fd 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -137,7 +137,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 2, 0, [0, 1], [0, 1])
])
]
- protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
# client loads metadata at init
client = SimpleClient(hosts=['broker_1:4567'])
@@ -179,7 +179,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -209,7 +209,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -237,7 +237,7 @@ class TestSimpleClient(unittest.TestCase):
topics = [
(NO_LEADER, 'topic_no_partitions', [])
]
- protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -249,7 +249,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 0, 0, [0, 1], [0, 1])
])
]
- protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
# calling _get_leader_for_partition (from any broker aware request)
# will try loading metadata again for the same topic
@@ -275,7 +275,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 'topic_no_partitions', []),
(UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []),
]
- protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -304,7 +304,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
self.assertDictEqual(
@@ -330,7 +330,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 1, 1, [1, 0], [1, 0])
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
@@ -350,7 +350,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -375,7 +375,7 @@ class TestSimpleClient(unittest.TestCase):
topics = [
(UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []),
]
- protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
diff --git a/test/test_client_async.py b/test/test_client_async.py
index eaac8e1..2cf348c 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -37,7 +37,7 @@ def conn(mocker):
conn.return_value = conn
conn.state = ConnectionStates.CONNECTED
conn.send.return_value = Future().success(
- MetadataResponse(
+ MetadataResponse[0](
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
conn.blacked_out.return_value = False
@@ -51,7 +51,7 @@ def test_bootstrap_success(conn):
cli = KafkaClient()
conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config)
conn.connect.assert_called_with()
- conn.send.assert_called_once_with(MetadataRequest([]))
+ conn.send.assert_called_once_with(MetadataRequest[0]([]))
assert cli._bootstrap_fails == 0
assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12),
BrokerMetadata(1, 'bar', 34)])
@@ -230,12 +230,12 @@ def test_send(conn):
conn.state = ConnectionStates.CONNECTED
cli._maybe_connect(0)
# ProduceRequest w/ 0 required_acks -> no response
- request = ProduceRequest(0, 0, [])
+ request = ProduceRequest[0](0, 0, [])
ret = cli.send(0, request)
assert conn.send.called_with(request, expect_response=False)
assert isinstance(ret, Future)
- request = MetadataRequest([])
+ request = MetadataRequest[0]([])
cli.send(0, request)
assert conn.send.called_with(request, expect_response=True)
diff --git a/test/test_conn.py b/test/test_conn.py
index 5432ebd..a55e39b 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -111,7 +111,7 @@ def test_send_max_ifr(conn):
def test_send_no_response(socket, conn):
conn.connect()
assert conn.state is ConnectionStates.CONNECTED
- req = MetadataRequest([])
+ req = MetadataRequest[0]([])
header = RequestHeader(req, client_id=conn.config['client_id'])
payload_bytes = len(header.encode()) + len(req.encode())
third = payload_bytes // 3
@@ -128,7 +128,7 @@ def test_send_no_response(socket, conn):
def test_send_response(socket, conn):
conn.connect()
assert conn.state is ConnectionStates.CONNECTED
- req = MetadataRequest([])
+ req = MetadataRequest[0]([])
header = RequestHeader(req, client_id=conn.config['client_id'])
payload_bytes = len(header.encode()) + len(req.encode())
third = payload_bytes // 3
@@ -144,7 +144,7 @@ def test_send_response(socket, conn):
def test_send_error(socket, conn):
conn.connect()
assert conn.state is ConnectionStates.CONNECTED
- req = MetadataRequest([])
+ req = MetadataRequest[0]([])
header = RequestHeader(req, client_id=conn.config['client_id'])
try:
error = ConnectionError
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index c02eddc..fe66d2b 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -146,7 +146,7 @@ def conn(mocker):
conn.return_value = conn
conn.state = ConnectionStates.CONNECTED
conn.send.return_value = Future().success(
- MetadataResponse(
+ MetadataResponse[0](
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
return conn
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 1dc7788..629b72f 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -16,9 +16,8 @@ from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.commit import (
- OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2,
- OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1,
- OffsetFetchResponse)
+ OffsetCommitRequest, OffsetCommitResponse,
+ OffsetFetchRequest, OffsetFetchResponse)
from kafka.protocol.metadata import MetadataResponse
from kafka.util import WeakMethod
@@ -29,7 +28,7 @@ def conn(mocker):
conn.return_value = conn
conn.state = ConnectionStates.CONNECTED
conn.send.return_value = Future().success(
- MetadataResponse(
+ MetadataResponse[0](
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
return conn
@@ -98,7 +97,7 @@ def test_pattern_subscription(coordinator, api_version):
assert coordinator._subscription.needs_partition_assignment is False
cluster = coordinator._client.cluster
- cluster.update_metadata(MetadataResponse(
+ cluster.update_metadata(MetadataResponse[0](
# brokers
[(0, 'foo', 12), (1, 'bar', 34)],
# topics
@@ -428,9 +427,9 @@ def test_send_offset_commit_request_fail(patched_coord, offsets):
@pytest.mark.parametrize('api_version,req_type', [
- ((0, 8, 1), OffsetCommitRequest_v0),
- ((0, 8, 2), OffsetCommitRequest_v1),
- ((0, 9), OffsetCommitRequest_v2)])
+ ((0, 8, 1), OffsetCommitRequest[0]),
+ ((0, 8, 2), OffsetCommitRequest[1]),
+ ((0, 9), OffsetCommitRequest[2])])
def test_send_offset_commit_request_versions(patched_coord, offsets,
api_version, req_type):
# assuming fixture sets coordinator=0, least_loaded_node=1
@@ -460,36 +459,36 @@ def test_send_offset_commit_request_success(patched_coord, offsets):
patched_coord._client.send.return_value = _f
future = patched_coord._send_offset_commit_request(offsets)
(node, request), _ = patched_coord._client.send.call_args
- response = OffsetCommitResponse([('foobar', [(0, 0), (1, 0)])])
+ response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])])
_f.success(response)
patched_coord._handle_offset_commit_response.assert_called_with(
offsets, future, response)
@pytest.mark.parametrize('response,error,dead,reassign', [
- (OffsetCommitResponse([('foobar', [(0, 30), (1, 30)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]),
Errors.GroupAuthorizationFailedError, False, False),
- (OffsetCommitResponse([('foobar', [(0, 12), (1, 12)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]),
Errors.OffsetMetadataTooLargeError, False, False),
- (OffsetCommitResponse([('foobar', [(0, 28), (1, 28)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]),
Errors.InvalidCommitOffsetSizeError, False, False),
- (OffsetCommitResponse([('foobar', [(0, 14), (1, 14)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]),
Errors.GroupLoadInProgressError, False, False),
- (OffsetCommitResponse([('foobar', [(0, 15), (1, 15)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]),
Errors.GroupCoordinatorNotAvailableError, True, False),
- (OffsetCommitResponse([('foobar', [(0, 16), (1, 16)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]),
Errors.NotCoordinatorForGroupError, True, False),
- (OffsetCommitResponse([('foobar', [(0, 7), (1, 7)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]),
Errors.RequestTimedOutError, True, False),
- (OffsetCommitResponse([('foobar', [(0, 25), (1, 25)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]),
Errors.CommitFailedError, False, True),
- (OffsetCommitResponse([('foobar', [(0, 22), (1, 22)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]),
Errors.CommitFailedError, False, True),
- (OffsetCommitResponse([('foobar', [(0, 27), (1, 27)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]),
Errors.CommitFailedError, False, True),
- (OffsetCommitResponse([('foobar', [(0, 17), (1, 17)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]),
Errors.InvalidTopicError, False, False),
- (OffsetCommitResponse([('foobar', [(0, 29), (1, 29)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]),
Errors.TopicAuthorizationFailedError, False, False),
])
def test_handle_offset_commit_response(patched_coord, offsets,
@@ -523,9 +522,9 @@ def test_send_offset_fetch_request_fail(patched_coord, partitions):
@pytest.mark.parametrize('api_version,req_type', [
- ((0, 8, 1), OffsetFetchRequest_v0),
- ((0, 8, 2), OffsetFetchRequest_v1),
- ((0, 9), OffsetFetchRequest_v1)])
+ ((0, 8, 1), OffsetFetchRequest[0]),
+ ((0, 8, 2), OffsetFetchRequest[1]),
+ ((0, 9), OffsetFetchRequest[1])])
def test_send_offset_fetch_request_versions(patched_coord, partitions,
api_version, req_type):
# assuming fixture sets coordinator=0, least_loaded_node=1
@@ -555,30 +554,30 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):
patched_coord._client.send.return_value = _f
future = patched_coord._send_offset_fetch_request(partitions)
(node, request), _ = patched_coord._client.send.call_args
- response = OffsetFetchResponse([('foobar', [(0, 0), (1, 0)])])
+ response = OffsetFetchResponse[0]([('foobar', [(0, 0), (1, 0)])])
_f.success(response)
patched_coord._handle_offset_fetch_response.assert_called_with(
future, response)
@pytest.mark.parametrize('response,error,dead,reassign', [
- #(OffsetFetchResponse([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]),
+ #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]),
# Errors.GroupAuthorizationFailedError, False, False),
- #(OffsetFetchResponse([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]),
+ #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]),
# Errors.RequestTimedOutError, True, False),
- #(OffsetFetchResponse([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]),
+ #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]),
# Errors.RebalanceInProgressError, False, True),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]),
Errors.GroupLoadInProgressError, False, False),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]),
Errors.NotCoordinatorForGroupError, True, False),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]),
Errors.UnknownMemberIdError, False, True),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]),
Errors.IllegalGenerationError, False, True),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]),
Errors.TopicAuthorizationFailedError, False, False),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]),
None, False, False),
])
def test_handle_offset_fetch_response(patched_coord, offsets,
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index cdd324f..644adfa 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -34,17 +34,19 @@ def fetcher(client, subscription_state):
def test_init_fetches(fetcher, mocker):
fetch_requests = [
- FetchRequest(-1, fetcher.config['fetch_max_wait_ms'],
- fetcher.config['fetch_min_bytes'],
- [('foobar', [
- (0, 0, fetcher.config['max_partition_fetch_bytes']),
- (1, 0, fetcher.config['max_partition_fetch_bytes']),
- ])]),
- FetchRequest(-1, fetcher.config['fetch_max_wait_ms'],
- fetcher.config['fetch_min_bytes'],
- [('foobar', [
- (2, 0, fetcher.config['max_partition_fetch_bytes']),
- ])])
+ FetchRequest[0](
+ -1, fetcher.config['fetch_max_wait_ms'],
+ fetcher.config['fetch_min_bytes'],
+ [('foobar', [
+ (0, 0, fetcher.config['max_partition_fetch_bytes']),
+ (1, 0, fetcher.config['max_partition_fetch_bytes']),
+ ])]),
+ FetchRequest[0](
+ -1, fetcher.config['fetch_max_wait_ms'],
+ fetcher.config['fetch_min_bytes'],
+ [('foobar', [
+ (2, 0, fetcher.config['max_partition_fetch_bytes']),
+ ])])
]
mocker.patch.object(fetcher, '_create_fetch_requests',