summaryrefslogtreecommitdiff
path: root/tests/test_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_worker.py')
-rw-r--r--tests/test_worker.py95
1 files changed, 91 insertions, 4 deletions
diff --git a/tests/test_worker.py b/tests/test_worker.py
index cfce473..dfa0f1d 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -19,6 +19,7 @@ import pytest
from unittest import mock
from unittest.mock import Mock
+from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL
from tests import RQTestCase, slow
from tests.fixtures import (
access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing,
@@ -607,6 +608,31 @@ class TestWorker(RQTestCase):
# Should not have created evidence of execution
self.assertEqual(os.path.exists(SENTINEL_FILE), False)
+ @slow
+ def test_max_idle_time(self):
+ q = Queue()
+ w = Worker([q])
+ q.enqueue(say_hello, args=('Frank',))
+ self.assertIsNotNone(w.dequeue_job_and_maintain_ttl(1))
+
+ # idle for 1 second
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=1))
+
+ # idle for 3 seconds
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=3))
+ self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer
+
+ # idle for 2 seconds because idle_time is less than timeout
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2))
+ self.assertLess((utcnow()-now).total_seconds(), 4) # 4 for some buffer
+
+ # idle for 3 seconds because idle_time is less than two rounds of timeout
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(2, max_idle_time=3))
+ self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer
+
@slow # noqa
def test_timeouts(self):
"""Worker kills jobs after timeout."""
@@ -639,7 +665,6 @@ class TestWorker(RQTestCase):
q = Queue()
w = Worker([q])
- # Put it on the queue with a timeout value
self.assertIsNone(w.dequeue_job_and_maintain_ttl(None))
def test_worker_ttl_param_resolves_timeout(self):
@@ -936,7 +961,15 @@ class TestWorker(RQTestCase):
worker.last_cleaned_at = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks)
- worker.last_cleaned_at = utcnow() - timedelta(seconds=3700)
+ worker.last_cleaned_at = utcnow() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL + 100)
+ self.assertTrue(worker.should_run_maintenance_tasks)
+
+ # custom maintenance_interval
+ worker = Worker(queue, maintenance_interval=10)
+ self.assertTrue(worker.should_run_maintenance_tasks)
+ worker.last_cleaned_at = utcnow()
+ self.assertFalse(worker.should_run_maintenance_tasks)
+ worker.last_cleaned_at = utcnow() - timedelta(seconds=11)
self.assertTrue(worker.should_run_maintenance_tasks)
def test_worker_calls_clean_registries(self):
@@ -1103,6 +1136,59 @@ class TestWorker(RQTestCase):
worker = Worker.find_by_key(w2.key)
self.assertEqual(worker.python_version, python_version)
+ def test_dequeue_random_strategy(self):
+ qs = [Queue('q%d' % i) for i in range(5)]
+
+ for i in range(5):
+ for j in range(3):
+ qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
+
+ w = Worker(qs)
+ w.work(burst=True, dequeue_strategy="random")
+
+ start_times = []
+ for i in range(5):
+ for j in range(3):
+ job = Job.fetch('q%d_%d' % (i, j))
+ start_times.append(('q%d_%d' % (i, j), job.started_at))
+ sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
+ sorted_ids = [tup[0] for tup in sorted_by_time]
+ expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)]
+ expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)]
+
+ self.assertNotEqual(sorted_ids, expected_rr)
+ self.assertNotEqual(sorted_ids, expected_ser)
+ expected_rr.reverse()
+ expected_ser.reverse()
+ self.assertNotEqual(sorted_ids, expected_rr)
+ self.assertNotEqual(sorted_ids, expected_ser)
+ sorted_ids.sort()
+ expected_ser.sort()
+ self.assertEqual(sorted_ids, expected_ser)
+
+ def test_dequeue_round_robin(self):
+ qs = [Queue('q%d' % i) for i in range(5)]
+
+ for i in range(5):
+ for j in range(3):
+ qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
+
+ w = Worker(qs)
+ w.work(burst=True, dequeue_strategy="round_robin")
+
+ start_times = []
+ for i in range(5):
+ for j in range(3):
+ job = Job.fetch('q%d_%d' % (i, j))
+ start_times.append(('q%d_%d' % (i, j), job.started_at))
+ sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
+ sorted_ids = [tup[0] for tup in sorted_by_time]
+ expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0',
+ 'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1',
+ 'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2']
+
+ self.assertEqual(expected, sorted_ids)
+
def wait_and_kill_work_horse(pid, time_to_wait=0.0):
time.sleep(time_to_wait)
@@ -1224,13 +1310,14 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
w.prepare_job_execution(job)
w.fork_work_horse(job, queue)
job.timeout = 5
-
time.sleep(1)
with open(sentinel_file) as f:
subprocess_pid = int(f.read().strip())
self.assertTrue(psutil.pid_exists(subprocess_pid))
- w.monitor_work_horse(job, queue)
+ with mock.patch.object(w, 'handle_work_horse_killed', wraps=w.handle_work_horse_killed) as mocked:
+ w.monitor_work_horse(job, queue)
+ self.assertEqual(mocked.call_count, 1)
fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor