diff options
Diffstat (limited to 'rq/worker.py')
-rw-r--r-- | rq/worker.py | 332 |
1 files changed, 199 insertions, 133 deletions
diff --git a/rq/worker.py b/rq/worker.py index a324ad6..80c0384 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,5 +1,7 @@ +import contextlib import errno import logging +import math import os import random import signal @@ -8,14 +10,18 @@ import sys import time import traceback import warnings - from datetime import timedelta from enum import Enum -from uuid import uuid4 from random import shuffle -from typing import Any, Callable, List, Optional, TYPE_CHECKING, Tuple, Type, Union +from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, + Union) +from uuid import uuid4 if TYPE_CHECKING: + try: + from resource import struct_rusage + except ImportError: + pass from redis import Redis from redis.client import Pipeline @@ -23,12 +29,13 @@ try: from signal import SIGKILL except ImportError: from signal import SIGTERM as SIGKILL + from contextlib import suppress + import redis.exceptions from . import worker_registration from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command -from .utils import as_text from .connections import get_current_connection, push_connection, pop_connection from .defaults import ( @@ -41,18 +48,20 @@ from .defaults import ( DEFAULT_LOGGING_DATE_FORMAT, ) from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException + from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler +from .serializers import resolve_serializer from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact +from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text from .version import VERSION -from .worker_registration import clean_worker_registry, get_keys from .serializers import resolve_serializer + try: from setproctitle import setproctitle as setprocname except ImportError: @@ -90,6 +99,12 @@ def signal_name(signum): return 'SIG_UNKNOWN' +class DequeueStrategy(str, Enum): + DEFAULT = "default" + ROUND_ROBIN = "round_robin" + RANDOM = "random" + + class WorkerStatus(str, Enum): STARTED = 'started' SUSPENDED = 'suspended' @@ -108,9 +123,9 @@ class Worker: log_result_lifespan = True # `log_job_description` is used to toggle logging an entire jobs description. log_job_description = True - # factor to increase connection_wait_time incase of continous connection failures. + # factor to increase connection_wait_time in case of continuous connection failures. exponential_backoff_factor = 2.0 - # Max Wait time (in seconds) after which exponential_backoff_factor wont be applicable. + # Max Wait time (in seconds) after which exponential_backoff_factor won't be applicable. max_connection_wait_time = 60.0 @classmethod @@ -132,7 +147,7 @@ class Worker: elif connection is None: connection = get_current_connection() - worker_keys = get_keys(queue=queue, connection=connection) + worker_keys = worker_registration.get_keys(queue=queue, connection=connection) workers = [ cls.find_by_key( key, connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer @@ -152,7 +167,7 @@ class Worker: Returns: list_keys (List[str]): A list of worker keys """ - return [as_text(key) for key in get_keys(queue=queue, connection=connection)] + return [as_text(key) for key in worker_registration.get_keys(queue=queue, connection=connection)] @classmethod def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None) -> int: @@ -165,7 +180,7 @@ class Worker: Returns: length (int): The queue length. """ - return len(get_keys(queue=queue, connection=connection)) + return len(worker_registration.get_keys(queue=queue, connection=connection)) @classmethod def find_by_key( @@ -226,6 +241,7 @@ class Worker: exc_handler=None, exception_handlers=None, default_worker_ttl=DEFAULT_WORKER_TTL, + maintenance_interval: int = DEFAULT_MAINTENANCE_TASK_INTERVAL, job_class: Type['Job'] = None, queue_class=None, log_job_description: bool = True, @@ -233,11 +249,12 @@ class Worker: disable_default_exception_handler: bool = False, prepare_for_work: bool = True, serializer=None, + work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None ): # noqa - self.default_result_ttl = default_result_ttl self.worker_ttl = default_worker_ttl self.job_monitoring_interval = job_monitoring_interval + self.maintenance_interval = maintenance_interval connection = self._set_connection(connection) self.connection = connection @@ -250,7 +267,8 @@ class Worker: self.serializer = resolve_serializer(serializer) queues = [ - self.queue_class(name=q, connection=connection, job_class=self.job_class, serializer=self.serializer) + self.queue_class(name=q, connection=connection, job_class=self.job_class, + serializer=self.serializer, death_penalty_class=self.death_penalty_class,) if isinstance(q, str) else q for q in ensure_list(queues) @@ -261,6 +279,7 @@ class Worker: self.validate_queues() self._ordered_queues = self.queues[:] self._exc_handlers: List[Callable] = [] + self._work_horse_killed_handler = work_horse_killed_handler self._state: str = 'starting' self._is_horse: bool = False @@ -279,6 +298,7 @@ class Worker: self.scheduler: Optional[RQScheduler] = None self.pubsub = None self.pubsub_thread = None + self._dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT self.disable_default_exception_handler = disable_default_exception_handler @@ -532,8 +552,7 @@ class Worker: return self.job_class.fetch(job_id, self.connection, self.serializer) def _install_signal_handlers(self): - """Installs signal handlers for handling SIGINT and SIGTERM gracefully. - """ + """Installs signal handlers for handling SIGINT and SIGTERM gracefully.""" signal.signal(signal.SIGINT, self.request_stop) signal.signal(signal.SIGTERM, self.request_stop) @@ -553,18 +572,14 @@ class Worker: else: raise - def wait_for_horse(self) -> Tuple[Optional[int], Optional[int]]: + def wait_for_horse(self) -> Tuple[Optional[int], Optional[int], Optional['struct_rusage']]: """Waits for the horse process to complete. Uses `0` as argument as to include "any child in the process group of the current process". """ - pid = None - stat = None - try: - pid, stat = os.waitpid(self.horse_pid, 0) - except ChildProcessError: - # ChildProcessError: [Errno 10] No child processes - pass - return pid, stat + pid = stat = rusage = None + with contextlib.suppress(ChildProcessError): # ChildProcessError: [Errno 10] No child processes + pid, stat, rusage = os.wait4(self.horse_pid, 0) + return pid, stat, rusage def request_force_stop(self, signum, frame): """Terminates the application (cold shutdown). @@ -621,13 +636,11 @@ class Worker: self.log.info('Warm shut down requested') def check_for_suspension(self, burst: bool): - """Check to see if workers have been suspended by `rq suspend` - """ + """Check to see if workers have been suspended by `rq suspend`""" before_state = None notified = False while not self._stop_requested and is_suspended(self.connection, self): - if burst: self.log.info('Suspended in burst mode, exiting') self.log.info('Note: There could still be unfinished jobs on the queue') @@ -674,14 +687,90 @@ class Worker: self.pubsub.unsubscribe() self.pubsub.close() - def reorder_queues(self, reference_queue): - """Method placeholder to workers that implement some reordering strategy. - `pass` here means that the queue will remain with the same job order. + def reorder_queues(self, reference_queue: 'Queue'): + """Reorder the queues according to the strategy. + As this can be defined both in the `Worker` initialization or in the `work` method, + it doesn't take the strategy directly, but rather uses the private `_dequeue_strategy` attribute. Args: - reference_queue (Union[Queue, str]): The queue - """ - pass + reference_queue (Union[Queue, str]): The queues to reorder + """ + if self._dequeue_strategy is None: + self._dequeue_strategy = DequeueStrategy.DEFAULT + + if self._dequeue_strategy not in ("default", "random", "round_robin"): + raise ValueError( + f"Dequeue strategy {self._dequeue_strategy} is not allowed. Use `default`, `random` or `round_robin`." + ) + if self._dequeue_strategy == DequeueStrategy.DEFAULT: + return + if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN: + pos = self._ordered_queues.index(reference_queue) + self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1] + return + if self._dequeue_strategy == DequeueStrategy.RANDOM: + shuffle(self._ordered_queues) + return + + def bootstrap( + self, + logging_level: str = "INFO", + date_format: str = DEFAULT_LOGGING_DATE_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT + ): + """Bootstraps the worker. + Runs the basic tasks that should run when the worker actually starts working. + Used so that new workers can focus on the work loop implementation rather + than the full bootstraping process. + + Args: + logging_level (str, optional): Logging level to use. Defaults to "INFO". + date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. + log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. + """ + setup_loghandlers(logging_level, date_format, log_format) + self.register_birth() + self.log.info('Worker %s: started, version %s', self.key, VERSION) + self.subscribe() + self.set_state(WorkerStatus.STARTED) + qnames = self.queue_names() + self.log.info('*** Listening on %s...', green(', '.join(qnames))) + + def _start_scheduler( + self, + burst: bool = False, + logging_level: str = "INFO", + date_format: str = DEFAULT_LOGGING_DATE_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT, + ): + """Starts the scheduler process. + This is specifically designed to be run by the worker when running the `work()` method. + Instanciates the RQScheduler and tries to acquire a lock. + If the lock is acquired, start scheduler. + If worker is on burst mode just enqueues scheduled jobs and quits, + otherwise, starts the scheduler in a separate process. + + Args: + burst (bool, optional): Whether to work on burst mode. Defaults to False. + logging_level (str, optional): Logging level to use. Defaults to "INFO". + date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. + log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. + """ + self.scheduler = RQScheduler( + self.queues, + connection=self.connection, + logging_level=logging_level, + date_format=date_format, + log_format=log_format, + serializer=self.serializer, + ) + self.scheduler.acquire_locks() + if self.scheduler.acquired_locks: + if burst: + self.scheduler.enqueue_scheduled_jobs() + self.scheduler.release_locks() + else: + self.scheduler.start() def work( self, @@ -690,13 +779,16 @@ class Worker: date_format: str = DEFAULT_LOGGING_DATE_FORMAT, log_format: str = DEFAULT_LOGGING_FORMAT, max_jobs: Optional[int] = None, + max_idle_time: Optional[int] = None, with_scheduler: bool = False, + dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT ) -> bool: """Starts the work loop. Pops and performs all jobs on the current list of queues. When all queues are empty, block and wait for new jobs to arrive on any of the queues, unless `burst` mode is enabled. + If `max_idle_time` is provided, worker will die when it's idle for more than the provided value. The return value indicates whether any jobs were processed. @@ -706,39 +798,18 @@ class Worker: date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. max_jobs (Optional[int], optional): Max number of jobs. Defaults to None. + max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None. with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False. + dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs. Defaults to DequeueStrategy.DEFAULT Returns: worked (bool): Will return True if any job was processed, False otherwise. """ - setup_loghandlers(logging_level, date_format, log_format) + self.bootstrap(logging_level, date_format, log_format) + self._dequeue_strategy = dequeue_strategy completed_jobs = 0 - self.register_birth() - self.log.info("Worker %s: started, version %s", self.key, VERSION) - self.subscribe() - self.set_state(WorkerStatus.STARTED) - qnames = self.queue_names() - self.log.info('*** Listening on %s...', green(', '.join(qnames))) - if with_scheduler: - self.scheduler = RQScheduler( - self.queues, - connection=self.connection, - logging_level=logging_level, - date_format=date_format, - log_format=log_format, - serializer=self.serializer, - ) - self.scheduler.acquire_locks() - # If lock is acquired, start scheduler - if self.scheduler.acquired_locks: - # If worker is run on burst mode, enqueue_scheduled_jobs() - # before working. Otherwise, start scheduler in a separate process - if burst: - self.scheduler.enqueue_scheduled_jobs() - self.scheduler.release_locks() - else: - self.scheduler.start() + self._start_scheduler(burst, logging_level, date_format, log_format) self._install_signal_handlers() try: @@ -754,25 +825,26 @@ class Worker: break timeout = None if burst else self.dequeue_timeout - result = self.dequeue_job_and_maintain_ttl(timeout) + result = self.dequeue_job_and_maintain_ttl(timeout, max_idle_time) if result is None: if burst: - self.log.info("Worker %s: done, quitting", self.key) + self.log.info('Worker %s: done, quitting', self.key) + elif max_idle_time is not None: + self.log.info('Worker %s: idle for %d seconds, quitting', self.key, max_idle_time) break job, queue = result - self.reorder_queues(reference_queue=queue) self.execute_job(job, queue) self.heartbeat() completed_jobs += 1 if max_jobs is not None: if completed_jobs >= max_jobs: - self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs) + self.log.info('Worker %s: finished executing %d jobs, quitting', self.key, completed_jobs) break except redis.exceptions.TimeoutError: - self.log.error(f"Worker {self.key}: Redis connection timeout, quitting...") + self.log.error('Worker %s: Redis connection timeout, quitting...', self.key) break except StopRequested: @@ -786,15 +858,16 @@ class Worker: self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True) break finally: - if not self.is_horse: - - if self.scheduler: - self.stop_scheduler() - - self.register_death() - self.unsubscribe() + self.teardown() return bool(completed_jobs) + def teardown(self): + if not self.is_horse: + if self.scheduler: + self.stop_scheduler() + self.register_death() + self.unsubscribe() + def stop_scheduler(self): """Ensure scheduler process is stopped Will send the kill signal to scheduler process, @@ -808,7 +881,7 @@ class Worker: pass self.scheduler._process.join() - def dequeue_job_and_maintain_ttl(self, timeout: int) -> Tuple['Job', 'Queue']: + def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']: """Dequeues a job while maintaining the TTL. Returns: @@ -821,25 +894,31 @@ class Worker: self.procline('Listening on ' + qnames) self.log.debug('*** Listening on %s...', green(qnames)) connection_wait_time = 1.0 + idle_since = utcnow() + idle_time_left = max_idle_time while True: - try: self.heartbeat() if self.should_run_maintenance_tasks: self.run_maintenance_tasks() - self.log.debug(f"Dequeueing jobs on queues {green(qnames)} and timeout {timeout}") + if timeout is not None and idle_time_left is not None: + timeout = min(timeout, idle_time_left) + + self.log.debug('Dequeueing jobs on queues %s and timeout %d', green(qnames), timeout) result = self.queue_class.dequeue_any( self._ordered_queues, timeout, connection=self.connection, job_class=self.job_class, serializer=self.serializer, + death_penalty_class=self.death_penalty_class, ) if result is not None: job, queue = result - self.log.debug(f"Dequeued job {blue(job.id)} from {green(queue.name)}") + self.reorder_queues(reference_queue=queue) + self.log.debug('Dequeued job %s from %s', blue(job.id), green(queue.name)) job.redis_server_version = self.get_redis_server_version() if self.log_job_description: self.log.info('%s: %s (%s)', green(queue.name), blue(job.description), job.id) @@ -848,7 +927,11 @@ class Worker: break except DequeueTimeout: - pass + if max_idle_time is not None: + idle_for = (utcnow() - idle_since).total_seconds() + idle_time_left = math.ceil(max_idle_time - idle_for) + if idle_time_left <= 0: + break except redis.exceptions.ConnectionError as conn_err: self.log.error( 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time @@ -1028,12 +1111,12 @@ class Worker: job (Job): _description_ queue (Queue): _description_ """ - ret_val = None + retpid = ret_val = rusage = None job.started_at = utcnow() while True: try: - with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): - retpid, ret_val = self.wait_for_horse() + with self.death_penalty_class(self.job_monitoring_interval, HorseMonitorTimeoutException): + retpid, ret_val, rusage = self.wait_for_horse() break except HorseMonitorTimeoutException: # Horse has not exited yet and is still running. @@ -1073,20 +1156,20 @@ class Worker: elif self._stopped_job_id == job.id: # Work-horse killed deliberately self.log.warning('Job stopped by user, moving job to FailedJobRegistry') - self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.") + self.handle_job_failure(job, queue=queue, exc_string='Job stopped by user, work-horse terminated.') elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: if not job.ended_at: job.ended_at = utcnow() # Unhandled failure: move the job to the failed queue - self.log.warning( - ('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format( - ret_val - ) - ) + signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else '' + exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; " + self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string) + self.handle_work_horse_killed(job, retpid, ret_val, rusage) self.handle_job_failure( - job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val + job, queue=queue, + exc_string=exc_string ) def execute_job(self, job: 'Job', queue: 'Queue'): @@ -1156,7 +1239,7 @@ class Worker: """Performs misc bookkeeping like updating states prior to job execution. """ - self.log.debug(f"Preparing for execution of Job ID {job.id}") + self.log.debug('Preparing for execution of Job ID %s', job.id) with self.connection.pipeline() as pipeline: self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_working_time(0, pipeline=pipeline) @@ -1167,7 +1250,7 @@ class Worker: job.prepare_for_execution(self.name, pipeline=pipeline) pipeline.execute() - self.log.debug(f"Job preparation finished.") + self.log.debug('Job preparation finished.') msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) @@ -1267,7 +1350,7 @@ class Worker: result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: - self.log.debug(f"Saving job {job.id}'s successful execution result") + self.log.debug('Saving job %s\'s successful execution result', job.id) job._handle_success(result_ttl, pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) @@ -1280,30 +1363,6 @@ class Worker: except redis.exceptions.WatchError: continue - def execute_success_callback(self, job: 'Job', result: Any): - """Executes success_callback for a job. - with timeout . - - Args: - job (Job): The Job - result (Any): The job's result. - """ - self.log.debug(f"Running success callbacks for {job.id}") - job.heartbeat(utcnow(), CALLBACK_TIMEOUT) - with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): - job.success_callback(job, self.connection, result) - - def execute_failure_callback(self, job: 'Job'): - """Executes failure_callback with timeout - - Args: - job (Job): The Job - """ - self.log.debug(f"Running failure callbacks for {job.id}") - job.heartbeat(utcnow(), CALLBACK_TIMEOUT) - with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): - job.failure_callback(job, self.connection, *sys.exc_info()) - def perform_job(self, job: 'Job', queue: 'Queue') -> bool: """Performs the actual work of a job. Will/should only be called inside the work horse's process. @@ -1317,7 +1376,7 @@ class Worker: """ push_connection(self.connection) started_job_registry = queue.started_job_registry - self.log.debug("Started Job Registry set.") + self.log.debug('Started Job Registry set.') try: self.prepare_job_execution(job) @@ -1325,9 +1384,9 @@ class Worker: job.started_at = utcnow() timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): - self.log.debug("Performing Job...") + self.log.debug('Performing Job...') rv = job.perform() - self.log.debug(f"Finished performing Job ID {job.id}") + self.log.debug('Finished performing Job ID %s', job.id) job.ended_at = utcnow() @@ -1335,23 +1394,22 @@ class Worker: # to use the same exc handling when pickling fails job._result = rv - if job.success_callback: - self.execute_success_callback(job, rv) + job.heartbeat(utcnow(), job.success_callback_timeout) + job.execute_success_callback(self.death_penalty_class, rv) self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) except: # NOQA - self.log.debug(f"Job {job.id} raised an exception.") + self.log.debug('Job %s raised an exception.', job.id) job.ended_at = utcnow() exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) - if job.failure_callback: - try: - self.execute_failure_callback(job) - except: # noqa - self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True) - exc_info = sys.exc_info() - exc_string = ''.join(traceback.format_exception(*exc_info)) + try: + job.heartbeat(utcnow(), job.failure_callback_timeout) + job.execute_failure_callback(self.death_penalty_class, *exc_info) + except: # noqa + exc_info = sys.exc_info() + exc_string = ''.join(traceback.format_exception(*exc_info)) self.handle_job_failure( job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry @@ -1364,8 +1422,7 @@ class Worker: self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id) if rv is not None: - log_result = "{0!r}".format(as_text(str(rv))) - self.log.debug('Result: %s', yellow(log_result)) + self.log.debug('Result: %r', yellow(as_text(str(rv)))) if self.log_result_lifespan: result_ttl = job.get_result_ttl(self.default_result_ttl) @@ -1384,7 +1441,7 @@ class Worker: the other properties are accessed, which will stop exceptions from being properly logged, so we guard against it here. """ - self.log.debug(f"Handling exception for {job.id}.") + self.log.debug('Handling exception for %s.', job.id) exc_string = ''.join(traceback.format_exception(*exc_info)) try: extra = { @@ -1401,7 +1458,9 @@ class Worker: extra.update({'queue': job.origin, 'job_id': job.id}) # func_name - self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, extra=extra) + self.log.error( + '[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra + ) for handler in self._exc_handlers: self.log.debug('Invoking exception handler %s', handler) @@ -1423,6 +1482,12 @@ class Worker: """Pops the latest exception handler off of the exc handler stack.""" return self._exc_handlers.pop() + def handle_work_horse_killed(self, job, retpid, ret_val, rusage): + if self._work_horse_killed_handler is None: + return + + self._work_horse_killed_handler(job, retpid, ret_val, rusage) + def __eq__(self, other): """Equality does not take the database/connection into account""" if not isinstance(other, self.__class__): @@ -1441,7 +1506,7 @@ class Worker: if queue.acquire_cleaning_lock(): self.log.info('Cleaning registries for queue: %s', queue.name) clean_registries(queue) - clean_worker_registry(queue) + worker_registration.clean_worker_registry(queue) self.last_cleaned_at = utcnow() @property @@ -1449,7 +1514,7 @@ class Worker: """Maintenance tasks should run on first startup or every 10 minutes.""" if self.last_cleaned_at is None: return True - if (utcnow() - self.last_cleaned_at) > timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL): + if (utcnow() - self.last_cleaned_at) > timedelta(seconds=self.maintenance_interval): return True return False @@ -1490,6 +1555,7 @@ class HerokuWorker(Worker): * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn causes the horse to crash `imminent_shutdown_delay` seconds later """ + imminent_shutdown_delay = 6 frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace'] @@ -1532,7 +1598,7 @@ class RoundRobinWorker(Worker): def reorder_queues(self, reference_queue): pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] + self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1] class RandomWorker(Worker): |