1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
import json
import os
import signal
from typing import TYPE_CHECKING, Any, Dict
if TYPE_CHECKING:
from redis import Redis
from .worker import Worker
from rq.exceptions import InvalidJobOperation
from rq.job import Job
PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s'
def send_command(connection: 'Redis', worker_name: str, command: str, **kwargs):
"""
Sends a command to a worker.
A command is just a string, availble commands are:
- `shutdown`: Shuts down a worker
- `kill-horse`: Command for the worker to kill the current working horse
- `stop-job`: A command for the worker to stop the currently running job
The command string will be parsed into a dictionary and send to a PubSub Topic.
Workers listen to the PubSub, and `handle` the specific command.
Args:
connection (Redis): A Redis Connection
worker_name (str): The Job ID
"""
payload = {'command': command}
if kwargs:
payload.update(kwargs)
connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))
def parse_payload(payload: Dict[Any, Any]) -> Dict[Any, Any]:
"""
Returns a dict of command data
Args:
payload (dict): Parses the payload dict.
"""
return json.loads(payload.get('data').decode())
def send_shutdown_command(connection: 'Redis', worker_name: str):
"""
Sends a command to shutdown a worker.
Args:
connection (Redis): A Redis Connection
worker_name (str): The Job ID
"""
send_command(connection, worker_name, 'shutdown')
def send_kill_horse_command(connection: 'Redis', worker_name: str):
"""
Tell worker to kill it's horse
Args:
connection (Redis): A Redis Connection
worker_name (str): The Job ID
"""
send_command(connection, worker_name, 'kill-horse')
def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None):
"""
Instruct a worker to stop a job
Args:
connection (Redis): A Redis Connection
job_id (str): The Job ID
serializer (): The serializer
"""
job = Job.fetch(job_id, connection=connection, serializer=serializer)
if not job.worker_name:
raise InvalidJobOperation('Job is not currently executing')
send_command(connection, job.worker_name, 'stop-job', job_id=job_id)
def handle_command(worker: 'Worker', payload: Dict[Any, Any]):
"""Parses payload and routes commands to the worker.
Args:
worker (Worker): The worker to use
payload (Dict[Any, Any]): The Payload
"""
if payload['command'] == 'stop-job':
handle_stop_job_command(worker, payload)
elif payload['command'] == 'shutdown':
handle_shutdown_command(worker)
elif payload['command'] == 'kill-horse':
handle_kill_worker_command(worker, payload)
def handle_shutdown_command(worker: 'Worker'):
"""Perform shutdown command.
Args:
worker (Worker): The worker to use.
"""
worker.log.info('Received shutdown command, sending SIGINT signal.')
pid = os.getpid()
os.kill(pid, signal.SIGINT)
def handle_kill_worker_command(worker: 'Worker', payload: Dict[Any, Any]):
"""
Stops work horse
Args:
worker (Worker): The worker to stop
payload (Dict[Any, Any]): The payload.
"""
worker.log.info('Received kill horse command.')
if worker.horse_pid:
worker.log.info('Kiling horse...')
worker.kill_horse()
else:
worker.log.info('Worker is not working, kill horse command ignored')
def handle_stop_job_command(worker: 'Worker', payload: Dict[Any, Any]):
"""Handles stop job command.
Args:
worker (Worker): The worker to use
payload (Dict[Any, Any]): The payload.
"""
job_id = payload.get('job_id')
worker.log.debug('Received command to stop job %s', job_id)
if job_id and worker.get_current_job_id() == job_id:
# Sets the '_stopped_job_id' so that the job failure handler knows it
# was intentional.
worker._stopped_job_id = job_id
worker.kill_horse()
else:
worker.log.info('Not working on job %s, command ignored.', job_id)
|