diff options
Diffstat (limited to 'rq/scheduler.py')
-rw-r--r-- | rq/scheduler.py | 40 |
1 files changed, 26 insertions, 14 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py index 84802b6..67b6431 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -6,6 +6,7 @@ import traceback from datetime import datetime from enum import Enum from multiprocessing import Process +from typing import List from redis import SSLConnection, UnixDomainSocketConnection @@ -35,14 +36,14 @@ class RQScheduler: Status = SchedulerStatus def __init__( - self, - queues, - connection, - interval=1, - logging_level=logging.INFO, - date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, - serializer=None, + self, + queues, + connection, + interval=1, + logging_level=logging.INFO, + date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT, + serializer=None, ): self._queue_names = set(parse_names(queues)) self._acquired_locks = set() @@ -110,7 +111,7 @@ class RQScheduler: """Returns names of queue it successfully acquires lock on""" successful_locks = set() pid = os.getpid() - self.log.debug("Trying to acquire locks for %s", ", ".join(self._queue_names)) + self.log.debug('Trying to acquire locks for %s', ', '.join(self._queue_names)) for name in self._queue_names: if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=self.interval + 60): successful_locks.add(name) @@ -166,7 +167,7 @@ class RQScheduler: jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer) for job in jobs: if job is not None: - queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) + queue._enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) registry.remove(job, pipeline=pipeline) pipeline.execute() self._status = self.Status.STARTED @@ -184,7 +185,7 @@ class RQScheduler: def heartbeat(self): """Updates the TTL on scheduler keys and the locks""" - self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks)) + self.log.debug('Scheduler sending heartbeat to %s', ', '.join(self.acquired_locks)) if len(self._queue_names) > 1: with self.connection.pipeline() as pipeline: for name in self._acquired_locks: @@ -196,7 +197,7 @@ class RQScheduler: self.connection.expire(key, self.interval + 60) def stop(self): - self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names)) + self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._queue_names)) self.release_locks() self._status = self.Status.STOPPED @@ -232,10 +233,21 @@ class RQScheduler: def run(scheduler): - scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid()) + scheduler.log.info('Scheduler for %s started with PID %s', ', '.join(scheduler._queue_names), os.getpid()) try: scheduler.work() except: # noqa scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc()) raise - scheduler.log.info("Scheduler with PID %s has stopped", os.getpid()) + scheduler.log.info('Scheduler with PID %d has stopped', os.getpid()) + + +def parse_names(queues_or_names) -> List[str]: + """Given a list of strings or queues, returns queue names""" + names = [] + for queue_or_name in queues_or_names: + if isinstance(queue_or_name, Queue): + names.append(queue_or_name.name) + else: + names.append(str(queue_or_name)) + return names |