diff options
Diffstat (limited to 'tests/test_worker.py')
-rw-r--r-- | tests/test_worker.py | 95 |
1 files changed, 91 insertions, 4 deletions
diff --git a/tests/test_worker.py b/tests/test_worker.py index cfce473..dfa0f1d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -19,6 +19,7 @@ import pytest from unittest import mock from unittest.mock import Mock +from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL from tests import RQTestCase, slow from tests.fixtures import ( access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing, @@ -607,6 +608,31 @@ class TestWorker(RQTestCase): # Should not have created evidence of execution self.assertEqual(os.path.exists(SENTINEL_FILE), False) + @slow + def test_max_idle_time(self): + q = Queue() + w = Worker([q]) + q.enqueue(say_hello, args=('Frank',)) + self.assertIsNotNone(w.dequeue_job_and_maintain_ttl(1)) + + # idle for 1 second + self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=1)) + + # idle for 3 seconds + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=3)) + self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer + + # idle for 2 seconds because idle_time is less than timeout + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2)) + self.assertLess((utcnow()-now).total_seconds(), 4) # 4 for some buffer + + # idle for 3 seconds because idle_time is less than two rounds of timeout + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(2, max_idle_time=3)) + self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer + @slow # noqa def test_timeouts(self): """Worker kills jobs after timeout.""" @@ -639,7 +665,6 @@ class TestWorker(RQTestCase): q = Queue() w = Worker([q]) - # Put it on the queue with a timeout value self.assertIsNone(w.dequeue_job_and_maintain_ttl(None)) def test_worker_ttl_param_resolves_timeout(self): @@ -936,7 +961,15 @@ class TestWorker(RQTestCase): worker.last_cleaned_at = utcnow() self.assertFalse(worker.should_run_maintenance_tasks) - worker.last_cleaned_at = utcnow() - timedelta(seconds=3700) + worker.last_cleaned_at = utcnow() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL + 100) + self.assertTrue(worker.should_run_maintenance_tasks) + + # custom maintenance_interval + worker = Worker(queue, maintenance_interval=10) + self.assertTrue(worker.should_run_maintenance_tasks) + worker.last_cleaned_at = utcnow() + self.assertFalse(worker.should_run_maintenance_tasks) + worker.last_cleaned_at = utcnow() - timedelta(seconds=11) self.assertTrue(worker.should_run_maintenance_tasks) def test_worker_calls_clean_registries(self): @@ -1103,6 +1136,59 @@ class TestWorker(RQTestCase): worker = Worker.find_by_key(w2.key) self.assertEqual(worker.python_version, python_version) + def test_dequeue_random_strategy(self): + qs = [Queue('q%d' % i) for i in range(5)] + + for i in range(5): + for j in range(3): + qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j)) + + w = Worker(qs) + w.work(burst=True, dequeue_strategy="random") + + start_times = [] + for i in range(5): + for j in range(3): + job = Job.fetch('q%d_%d' % (i, j)) + start_times.append(('q%d_%d' % (i, j), job.started_at)) + sorted_by_time = sorted(start_times, key=lambda tup: tup[1]) + sorted_ids = [tup[0] for tup in sorted_by_time] + expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)] + expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)] + + self.assertNotEqual(sorted_ids, expected_rr) + self.assertNotEqual(sorted_ids, expected_ser) + expected_rr.reverse() + expected_ser.reverse() + self.assertNotEqual(sorted_ids, expected_rr) + self.assertNotEqual(sorted_ids, expected_ser) + sorted_ids.sort() + expected_ser.sort() + self.assertEqual(sorted_ids, expected_ser) + + def test_dequeue_round_robin(self): + qs = [Queue('q%d' % i) for i in range(5)] + + for i in range(5): + for j in range(3): + qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j)) + + w = Worker(qs) + w.work(burst=True, dequeue_strategy="round_robin") + + start_times = [] + for i in range(5): + for j in range(3): + job = Job.fetch('q%d_%d' % (i, j)) + start_times.append(('q%d_%d' % (i, j), job.started_at)) + sorted_by_time = sorted(start_times, key=lambda tup: tup[1]) + sorted_ids = [tup[0] for tup in sorted_by_time] + expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0', + 'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1', + 'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2'] + + self.assertEqual(expected, sorted_ids) + def wait_and_kill_work_horse(pid, time_to_wait=0.0): time.sleep(time_to_wait) @@ -1224,13 +1310,14 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): w.prepare_job_execution(job) w.fork_work_horse(job, queue) job.timeout = 5 - time.sleep(1) with open(sentinel_file) as f: subprocess_pid = int(f.read().strip()) self.assertTrue(psutil.pid_exists(subprocess_pid)) - w.monitor_work_horse(job, queue) + with mock.patch.object(w, 'handle_work_horse_killed', wraps=w.handle_work_horse_killed) as mocked: + w.monitor_work_horse(job, queue) + self.assertEqual(mocked.call_count, 1) fudge_factor = 1 total_time = w.job_monitoring_interval + 65 + fudge_factor |