summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2020-10-20 06:53:36 +0700
committerSelwin Ong <selwin.ong@gmail.com>2020-10-20 06:53:36 +0700
commitcf36ed9b4c548390e835aa0a006e5f593fa32e26 (patch)
treefd59f7a2965802536b47145de585bc78f0e23676
parentbefbbfe7ee0bb73eb043987a876651a15967c9f8 (diff)
parent9adcd7e50c511ceba622ba20c826a0b701a917ea (diff)
downloadrq-pubsub.tar.gz
Merge branch 'master' into pubsubpubsub
-rw-r--r--.github/workflows/workflow.yml4
-rw-r--r--dev-requirements.txt3
-rw-r--r--rq/worker.py3
-rw-r--r--tests/fixtures.py12
-rw-r--r--tests/test_worker.py17
5 files changed, 30 insertions, 9 deletions
diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml
index 8ae68cf..f86e716 100644
--- a/.github/workflows/workflow.yml
+++ b/.github/workflows/workflow.yml
@@ -33,7 +33,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install redis==${{ matrix.redis-py-version }}
- pip install pytest pytest-cov sentry-sdk codecov mock
+ pip install pytest pytest-cov sentry-sdk codecov mock psutil
pip install -e .
- name: Test with pytest
@@ -43,4 +43,4 @@ jobs:
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
- file: ./coverage.xml \ No newline at end of file
+ file: ./coverage.xml
diff --git a/dev-requirements.txt b/dev-requirements.txt
index 615fac9..ccb6eff 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -1,2 +1,3 @@
mock
-pytest \ No newline at end of file
+pytest
+psutil
diff --git a/rq/worker.py b/rq/worker.py
index a34cd12..13ed298 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -402,7 +402,7 @@ class Worker(object):
Kill the horse but catch "No such process" error has the horse could already be dead.
"""
try:
- os.kill(self.horse_pid, sig)
+ os.killpg(os.getpgid(self.horse_pid), sig)
self.log.info('Killed horse pid %s', self.horse_pid)
except OSError as e:
if e.errno == errno.ESRCH:
@@ -728,6 +728,7 @@ class Worker(object):
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0:
+ os.setsid()
self.main_work_horse(job, queue)
os._exit(0) # just in case
else:
diff --git a/tests/fixtures.py b/tests/fixtures.py
index 057b932..5d8e8bc 100644
--- a/tests/fixtures.py
+++ b/tests/fixtures.py
@@ -10,6 +10,7 @@ import os
import time
import signal
import sys
+import subprocess
from rq import Connection, get_current_job, get_current_connection, Queue
from rq.decorators import job
@@ -66,6 +67,17 @@ def create_file_after_timeout(path, timeout):
time.sleep(timeout)
create_file(path)
+def create_file_after_timeout_and_setsid(path, timeout):
+ os.setsid()
+ create_file_after_timeout(path, timeout)
+
+def launch_process_within_worker_and_store_pid(path, timeout):
+
+ p = subprocess.Popen(['sleep', str(timeout)])
+ with open(path, 'w') as f:
+ f.write('{}'.format(p.pid))
+
+ p.wait()
def access_self():
assert get_current_connection() is not None
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 4ea8a30..1ab5865 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import json
import os
+import psutil
import shutil
import signal
import subprocess
@@ -23,9 +24,10 @@ from mock import Mock
from tests import RQTestCase, slow
from tests.fixtures import (
- access_self, create_file, create_file_after_timeout, div_by_zero, do_nothing,
+ access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing,
kill_worker, long_running_job, modify_self, modify_self_and_error,
- run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, raise_exc_mock
+ run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, raise_exc_mock,
+ launch_process_within_worker_and_store_pid
)
from rq import Queue, SimpleWorker, Worker, get_current_connection
@@ -37,7 +39,6 @@ from rq.utils import utcnow
from rq.version import VERSION
from rq.worker import HerokuWorker, WorkerStatus
-
class CustomJob(Job):
pass
@@ -1166,12 +1167,16 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
if os.path.exists(sentinel_file):
os.remove(sentinel_file)
- fooq.enqueue(create_file_after_timeout, sentinel_file, 100)
+ fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100)
job, queue = w.dequeue_job_and_maintain_ttl(5)
w.fork_work_horse(job, queue)
job.timeout = 5
w.job_monitoring_interval = 1
now = utcnow()
+ time.sleep(1)
+ with open(sentinel_file) as f:
+ subprocess_pid = int(f.read().strip())
+ self.assertTrue(psutil.pid_exists(subprocess_pid))
w.monitor_work_horse(job, queue)
fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor
@@ -1180,6 +1185,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
failed_job_registry = FailedJobRegistry(queue=fooq)
self.assertTrue(job in failed_job_registry)
self.assertEqual(fooq.count, 0)
+ self.assertFalse(psutil.pid_exists(subprocess_pid))
def schedule_access_self():
@@ -1283,9 +1289,10 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
w = HerokuWorker('foo')
path = os.path.join(self.sandbox, 'shouldnt_exist')
- p = Process(target=create_file_after_timeout, args=(path, 2))
+ p = Process(target=create_file_after_timeout_and_setsid, args=(path, 2))
p.start()
self.assertEqual(p.exitcode, None)
+ time.sleep(0.1)
w._horse_pid = p.pid
w.handle_warm_shutdown_request()