summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_conn.py44
-rw-r--r--test/test_consumer_integration.py44
2 files changed, 87 insertions, 1 deletions
diff --git a/test/test_conn.py b/test/test_conn.py
index 2c8f3b2..c4f219b 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -1,5 +1,6 @@
import socket
import struct
+from threading import Thread
import mock
from . import unittest
@@ -162,3 +163,46 @@ class ConnTest(unittest.TestCase):
self.conn.send(self.config['request_id'], self.config['payload'])
self.assertEqual(self.MockCreateConn.call_count, 1)
self.conn._sock.sendall.assert_called_with(self.config['payload'])
+
+
+class TestKafkaConnection(unittest.TestCase):
+
+ @mock.patch('socket.create_connection')
+ def test_copy(self, socket):
+ """KafkaConnection copies work as expected"""
+
+ conn = KafkaConnection('kafka', 9092)
+ self.assertEqual(socket.call_count, 1)
+
+ copy = conn.copy()
+ self.assertEqual(socket.call_count, 1)
+ self.assertEqual(copy.host, 'kafka')
+ self.assertEqual(copy.port, 9092)
+ self.assertEqual(copy._sock, None)
+
+ copy.reinit()
+ self.assertEqual(socket.call_count, 2)
+ self.assertNotEqual(copy._sock, None)
+
+ @mock.patch('socket.create_connection')
+ def test_copy_thread(self, socket):
+ """KafkaConnection copies work in other threads"""
+
+ err = []
+ copy = KafkaConnection('kafka', 9092).copy()
+
+ def thread_func(err, copy):
+ try:
+ self.assertEqual(copy.host, 'kafka')
+ self.assertEqual(copy.port, 9092)
+ self.assertNotEqual(copy._sock, None)
+ except Exception as e:
+ err.append(e)
+ else:
+ err.append(None)
+ thread = Thread(target=thread_func, args=(err, copy))
+ thread.start()
+ thread.join()
+
+ self.assertEqual(err, [None])
+ self.assertEqual(socket.call_count, 2)
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 4723220..9c89190 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -5,7 +5,7 @@ from six.moves import xrange
from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
from kafka.common import (
- ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
+ ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
@@ -85,6 +85,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions('all')
+ def test_simple_consumer_smallest_offset_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ consumer = self.consumer(auto_offset_reset='smallest')
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ # Since auto_offset_reset is set to smallest we should read all 200
+ # messages from beginning.
+ self.assert_message_count([message for message in consumer], 200)
+
+ @kafka_versions('all')
+ def test_simple_consumer_largest_offset_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Default largest
+ consumer = self.consumer()
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ # Since auto_offset_reset is set to largest we should not read any
+ # messages.
+ self.assert_message_count([message for message in consumer], 0)
+ # Send 200 new messages to the queue
+ self.send_messages(0, range(200, 300))
+ self.send_messages(1, range(300, 400))
+ # Since the offset is set to largest we should read all the new messages.
+ self.assert_message_count([message for message in consumer], 200)
+
+ @kafka_versions('all')
+ def test_simple_consumer_no_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Default largest
+ consumer = self.consumer(auto_offset_reset=None)
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ with self.assertRaises(OffsetOutOfRangeError):
+ consumer.get_message()
+
@kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))