diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_conn.py | 44 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 44 |
2 files changed, 87 insertions, 1 deletions
diff --git a/test/test_conn.py b/test/test_conn.py index 2c8f3b2..c4f219b 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,5 +1,6 @@ import socket import struct +from threading import Thread import mock from . import unittest @@ -162,3 +163,46 @@ class ConnTest(unittest.TestCase): self.conn.send(self.config['request_id'], self.config['payload']) self.assertEqual(self.MockCreateConn.call_count, 1) self.conn._sock.sendall.assert_called_with(self.config['payload']) + + +class TestKafkaConnection(unittest.TestCase): + + @mock.patch('socket.create_connection') + def test_copy(self, socket): + """KafkaConnection copies work as expected""" + + conn = KafkaConnection('kafka', 9092) + self.assertEqual(socket.call_count, 1) + + copy = conn.copy() + self.assertEqual(socket.call_count, 1) + self.assertEqual(copy.host, 'kafka') + self.assertEqual(copy.port, 9092) + self.assertEqual(copy._sock, None) + + copy.reinit() + self.assertEqual(socket.call_count, 2) + self.assertNotEqual(copy._sock, None) + + @mock.patch('socket.create_connection') + def test_copy_thread(self, socket): + """KafkaConnection copies work in other threads""" + + err = [] + copy = KafkaConnection('kafka', 9092).copy() + + def thread_func(err, copy): + try: + self.assertEqual(copy.host, 'kafka') + self.assertEqual(copy.port, 9092) + self.assertNotEqual(copy._sock, None) + except Exception as e: + err.append(e) + else: + err.append(None) + thread = Thread(target=thread_func, args=(err, copy)) + thread.start() + thread.join() + + self.assertEqual(err, [None]) + self.assertEqual(socket.call_count, 2) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 4723220..9c89190 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -5,7 +5,7 @@ from six.moves import xrange from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message from kafka.common import ( - ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout + ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES @@ -85,6 +85,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions('all') + def test_simple_consumer_smallest_offset_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + consumer = self.consumer(auto_offset_reset='smallest') + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + # Since auto_offset_reset is set to smallest we should read all 200 + # messages from beginning. + self.assert_message_count([message for message in consumer], 200) + + @kafka_versions('all') + def test_simple_consumer_largest_offset_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Default largest + consumer = self.consumer() + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + # Since auto_offset_reset is set to largest we should not read any + # messages. + self.assert_message_count([message for message in consumer], 0) + # Send 200 new messages to the queue + self.send_messages(0, range(200, 300)) + self.send_messages(1, range(300, 400)) + # Since the offset is set to largest we should read all the new messages. + self.assert_message_count([message for message in consumer], 200) + + @kafka_versions('all') + def test_simple_consumer_no_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Default largest + consumer = self.consumer(auto_offset_reset=None) + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + with self.assertRaises(OffsetOutOfRangeError): + consumer.get_message() + @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100)) |