summaryrefslogtreecommitdiff
path: root/test/test_client_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
committerDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
commit828377377da43749af0d27ee256ef31bf714cf17 (patch)
treefbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /test/test_client_integration.py
parent71e7568fcb8132899f366b37c32645fd5a40dc4b (diff)
parent9a8af1499ca425366d934487469d9977fae7fe5f (diff)
downloadkafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz
Merge branch '0.9'
Conflicts: kafka/codec.py kafka/version.py test/test_producer.py test/test_producer_integration.py
Diffstat (limited to 'test/test_client_integration.py')
-rw-r--r--test/test_client_integration.py40
1 files changed, 20 insertions, 20 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 6872dbf..c5d3b58 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -1,8 +1,8 @@
import os
from kafka.common import (
- FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
- KafkaTimeoutError, ProduceRequest
+ FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
+ KafkaTimeoutError, ProduceRequestPayload
)
from kafka.protocol import create_message
@@ -28,11 +28,11 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
cls.zk.close()
def test_consume_none(self):
- fetch = FetchRequest(self.bytes_topic, 0, 0, 1024)
+ fetch = FetchRequestPayload(self.topic, 0, 0, 1024)
fetch_resp, = self.client.send_fetch_request([fetch])
self.assertEqual(fetch_resp.error, 0)
- self.assertEqual(fetch_resp.topic, self.bytes_topic)
+ self.assertEqual(fetch_resp.topic, self.topic)
self.assertEqual(fetch_resp.partition, 0)
messages = list(fetch_resp.messages)
@@ -46,25 +46,25 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
# ensure_topic_exists should fail with KafkaTimeoutError
with self.assertRaises(KafkaTimeoutError):
- self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
+ self.client.ensure_topic_exists('this_topic_doesnt_exist', timeout=0)
def test_send_produce_request_maintains_request_response_order(self):
- self.client.ensure_topic_exists(b'foo')
- self.client.ensure_topic_exists(b'bar')
+ self.client.ensure_topic_exists('foo')
+ self.client.ensure_topic_exists('bar')
requests = [
- ProduceRequest(
- b'foo', 0,
+ ProduceRequestPayload(
+ 'foo', 0,
[create_message(b'a'), create_message(b'b')]),
- ProduceRequest(
- b'bar', 1,
+ ProduceRequestPayload(
+ 'bar', 1,
[create_message(b'a'), create_message(b'b')]),
- ProduceRequest(
- b'foo', 1,
+ ProduceRequestPayload(
+ 'foo', 1,
[create_message(b'a'), create_message(b'b')]),
- ProduceRequest(
- b'bar', 0,
+ ProduceRequestPayload(
+ 'bar', 0,
[create_message(b'a'), create_message(b'b')]),
]
@@ -82,12 +82,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@kafka_versions('>=0.8.1')
def test_commit_fetch_offsets(self):
- req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata")
- (resp,) = self.client.send_offset_commit_request(b"group", [req])
+ req = OffsetCommitRequestPayload(self.topic, 0, 42, 'metadata')
+ (resp,) = self.client.send_offset_commit_request('group', [req])
self.assertEqual(resp.error, 0)
- req = OffsetFetchRequest(self.bytes_topic, 0)
- (resp,) = self.client.send_offset_fetch_request(b"group", [req])
+ req = OffsetFetchRequestPayload(self.topic, 0)
+ (resp,) = self.client.send_offset_fetch_request('group', [req])
self.assertEqual(resp.error, 0)
self.assertEqual(resp.offset, 42)
- self.assertEqual(resp.metadata, b"") # Metadata isn't stored for now
+ self.assertEqual(resp.metadata, '') # Metadata isn't stored for now