summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBruno ReniƩ <brutasse@gmail.com>2014-08-29 15:23:48 +0200
committerBruno ReniƩ <brutasse@gmail.com>2014-08-29 15:23:48 +0200
commit2a220e11003eb26971519a73e020d4759e2b3a31 (patch)
tree235104f553fd91d97b84acc07dec681a06b8b642
parent9c83b2bd764f494c3d3a074054914a2b5fd99328 (diff)
downloadkafka-python-2a220e11003eb26971519a73e020d4759e2b3a31.tar.gz
Fix more tests, only multiprocessing consumer ones remaining
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/conn.py4
-rw-r--r--test/test_client_integration.py10
-rw-r--r--test/test_consumer_integration.py5
-rw-r--r--test/test_failover_integration.py2
-rw-r--r--test/test_producer_integration.py30
-rw-r--r--test/test_protocol.py1
-rw-r--r--test/testutil.py4
8 files changed, 39 insertions, 23 deletions
diff --git a/kafka/client.py b/kafka/client.py
index e098470..410573a 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -145,7 +145,7 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
- conn = self._get_conn(broker.host, broker.port)
+ conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
@@ -233,8 +233,8 @@ class KafkaClient(object):
A reinit() has to be done on the copy before it can be used again
"""
c = copy.deepcopy(self)
- for k, v in c.conns.items():
- c.conns[k] = v.copy()
+ for key in c.conns:
+ c.conns[key] = self.conns[key].copy()
return c
def reinit(self):
diff --git a/kafka/conn.py b/kafka/conn.py
index 41cd424..aef0299 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -155,6 +155,10 @@ class KafkaConnection(local):
return a new KafkaConnection object
"""
c = copy.deepcopy(self)
+ # Python 3 doesn't copy custom attributes of the threadlocal subclass
+ c.host = copy.copy(self.host)
+ c.port = copy.copy(self.port)
+ c.timeout = copy.copy(self.timeout)
c._sock = None
return c
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 3eb917f..029301e 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -56,7 +56,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
# ensure_topic_exists should fail with KafkaTimeoutError
with self.assertRaises(KafkaTimeoutError):
- self.client.ensure_topic_exists("this_topic_doesnt_exist", timeout=0)
+ self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
####################
# Offset Tests #
@@ -64,12 +64,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@kafka_versions("0.8.1", "0.8.1.1")
def test_commit_fetch_offsets(self):
- req = OffsetCommitRequest(self.topic, 0, 42, "metadata")
- (resp,) = self.client.send_offset_commit_request("group", [req])
+ req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
+ (resp,) = self.client.send_offset_commit_request(b"group", [req])
self.assertEquals(resp.error, 0)
req = OffsetFetchRequest(self.topic, 0)
- (resp,) = self.client.send_offset_fetch_request("group", [req])
+ (resp,) = self.client.send_offset_fetch_request(b"group", [req])
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 42)
- self.assertEquals(resp.metadata, "") # Metadata isn't stored for now
+ self.assertEquals(resp.metadata, b"") # Metadata isn't stored for now
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 2e8e859..1b2c73e 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -4,6 +4,9 @@ from datetime import datetime
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
+
+from six.moves import xrange
+
from .fixtures import ZookeeperFixture, KafkaFixture
from .testutil import *
@@ -267,7 +270,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
kwargs.setdefault('auto_commit', True)
consumer_class = kwargs.pop('consumer', SimpleConsumer)
- group = kwargs.pop('group', self.id())
+ group = kwargs.pop('group', self.id().encode('utf-8'))
topic = kwargs.pop('topic', self.topic)
if consumer_class == SimpleConsumer:
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index d3121d6..ee84aad 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -60,7 +60,7 @@ class TestFailover(KafkaIntegrationTestCase):
while not recovered and (time.time() - started) < timeout:
try:
logging.debug("attempting to send 'success' message after leader killed")
- producer.send_messages(topic, partition, 'success')
+ producer.send_messages(topic, partition, b'success')
logging.debug("success!")
recovered = True
except (FailedPayloadsError, ConnectionError):
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index c39d6b5..674cc58 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -32,13 +32,15 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request(
- [ create_message("Test message %d" % i) for i in range(100) ],
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
start_offset,
100,
)
self.assert_produce_request(
- [ create_message("Test message %d" % i) for i in range(100) ],
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
start_offset+100,
100,
)
@@ -48,7 +50,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request(
- [ create_message("Test message %d" % i) for i in range(10000) ],
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(10000)],
start_offset,
10000,
)
@@ -57,8 +60,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
- message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
- message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
+ message1 = create_gzip_message([
+ ("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)])
+ message2 = create_gzip_message([
+ ("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)])
self.assert_produce_request(
[ message1, message2 ],
@@ -85,8 +90,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
msg_count = 1+100
messages = [
- create_message("Just a plain message"),
- create_gzip_message(["Gzipped %d" % i for i in range(100)]),
+ create_message(b"Just a plain message"),
+ create_gzip_message([
+ ("Gzipped %d" % i).encode('utf-8') for i in range(100)]),
]
# All snappy integration tests fail with nosnappyjava
@@ -101,14 +107,18 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request([
- create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ create_gzip_message([
+ ("Gzipped batch 1, message %d" % i).encode('utf-8')
+ for i in range(50000)])
],
start_offset,
50000,
)
self.assert_produce_request([
- create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ create_gzip_message([
+ ("Gzipped batch 1, message %d" % i).encode('utf-8')
+ for i in range(50000)])
],
start_offset+50000,
50000,
@@ -144,7 +154,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_produce__new_topic_fails_with_reasonable_error(self):
- new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4()))
+ new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
producer = SimpleProducer(self.client)
# At first it doesn't exist
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 2107f82..a4f6f64 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -108,7 +108,6 @@ class TestProtocol(unittest.TestCase):
def test_encode_message(self):
message = create_message(b"test", b"key")
encoded = KafkaProtocol._encode_message(message)
- print("CRC", -1427009701)
expect = b"".join([
struct.pack(">i", -1427009701), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
diff --git a/test/testutil.py b/test/testutil.py
index 114dff9..9262fca 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -59,8 +59,8 @@ class KafkaIntegrationTestCase(unittest.TestCase):
return
if not self.topic:
- self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10).decode('utf-8'))
- self.topic = self.topic.encode('utf-8')
+ topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10).decode('utf-8'))
+ self.topic = topic.encode('utf-8')
if self.create_client:
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))