diff options
author | Dana Powers <dana.powers@rd.io> | 2015-03-08 16:03:06 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-03-08 16:03:06 -0700 |
commit | 92aa7e94288cbfc4aed0dfbd52021d21694bced4 (patch) | |
tree | c33e7341b9624eb22f89051887e83449e8146a98 /test/test_conn.py | |
parent | 6ef982c5e8bde6ea50f721ddb4bb11b7fd51559b (diff) | |
parent | 5137163fa44b4a6a8a315c30f959e816f657e921 (diff) | |
download | kafka-python-92aa7e94288cbfc4aed0dfbd52021d21694bced4.tar.gz |
Merge branch 'vshlapakov-feature-async-threading'
PR 330: Threading for async batching
Conflicts:
kafka/producer/base.py
Diffstat (limited to 'test/test_conn.py')
-rw-r--r-- | test/test_conn.py | 44 |
1 files changed, 44 insertions, 0 deletions
diff --git a/test/test_conn.py b/test/test_conn.py index 2c8f3b2..c4f219b 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,5 +1,6 @@ import socket import struct +from threading import Thread import mock from . import unittest @@ -162,3 +163,46 @@ class ConnTest(unittest.TestCase): self.conn.send(self.config['request_id'], self.config['payload']) self.assertEqual(self.MockCreateConn.call_count, 1) self.conn._sock.sendall.assert_called_with(self.config['payload']) + + +class TestKafkaConnection(unittest.TestCase): + + @mock.patch('socket.create_connection') + def test_copy(self, socket): + """KafkaConnection copies work as expected""" + + conn = KafkaConnection('kafka', 9092) + self.assertEqual(socket.call_count, 1) + + copy = conn.copy() + self.assertEqual(socket.call_count, 1) + self.assertEqual(copy.host, 'kafka') + self.assertEqual(copy.port, 9092) + self.assertEqual(copy._sock, None) + + copy.reinit() + self.assertEqual(socket.call_count, 2) + self.assertNotEqual(copy._sock, None) + + @mock.patch('socket.create_connection') + def test_copy_thread(self, socket): + """KafkaConnection copies work in other threads""" + + err = [] + copy = KafkaConnection('kafka', 9092).copy() + + def thread_func(err, copy): + try: + self.assertEqual(copy.host, 'kafka') + self.assertEqual(copy.port, 9092) + self.assertNotEqual(copy._sock, None) + except Exception as e: + err.append(e) + else: + err.append(None) + thread = Thread(target=thread_func, args=(err, copy)) + thread.start() + thread.join() + + self.assertEqual(err, [None]) + self.assertEqual(socket.call_count, 2) |