summaryrefslogtreecommitdiff
path: root/rq/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'rq/scheduler.py')
-rw-r--r--rq/scheduler.py40
1 files changed, 26 insertions, 14 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py
index 84802b6..67b6431 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -6,6 +6,7 @@ import traceback
from datetime import datetime
from enum import Enum
from multiprocessing import Process
+from typing import List
from redis import SSLConnection, UnixDomainSocketConnection
@@ -35,14 +36,14 @@ class RQScheduler:
Status = SchedulerStatus
def __init__(
- self,
- queues,
- connection,
- interval=1,
- logging_level=logging.INFO,
- date_format=DEFAULT_LOGGING_DATE_FORMAT,
- log_format=DEFAULT_LOGGING_FORMAT,
- serializer=None,
+ self,
+ queues,
+ connection,
+ interval=1,
+ logging_level=logging.INFO,
+ date_format=DEFAULT_LOGGING_DATE_FORMAT,
+ log_format=DEFAULT_LOGGING_FORMAT,
+ serializer=None,
):
self._queue_names = set(parse_names(queues))
self._acquired_locks = set()
@@ -110,7 +111,7 @@ class RQScheduler:
"""Returns names of queue it successfully acquires lock on"""
successful_locks = set()
pid = os.getpid()
- self.log.debug("Trying to acquire locks for %s", ", ".join(self._queue_names))
+ self.log.debug('Trying to acquire locks for %s', ', '.join(self._queue_names))
for name in self._queue_names:
if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=self.interval + 60):
successful_locks.add(name)
@@ -166,7 +167,7 @@ class RQScheduler:
jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer)
for job in jobs:
if job is not None:
- queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front))
+ queue._enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front))
registry.remove(job, pipeline=pipeline)
pipeline.execute()
self._status = self.Status.STARTED
@@ -184,7 +185,7 @@ class RQScheduler:
def heartbeat(self):
"""Updates the TTL on scheduler keys and the locks"""
- self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks))
+ self.log.debug('Scheduler sending heartbeat to %s', ', '.join(self.acquired_locks))
if len(self._queue_names) > 1:
with self.connection.pipeline() as pipeline:
for name in self._acquired_locks:
@@ -196,7 +197,7 @@ class RQScheduler:
self.connection.expire(key, self.interval + 60)
def stop(self):
- self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names))
+ self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._queue_names))
self.release_locks()
self._status = self.Status.STOPPED
@@ -232,10 +233,21 @@ class RQScheduler:
def run(scheduler):
- scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid())
+ scheduler.log.info('Scheduler for %s started with PID %s', ', '.join(scheduler._queue_names), os.getpid())
try:
scheduler.work()
except: # noqa
scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc())
raise
- scheduler.log.info("Scheduler with PID %s has stopped", os.getpid())
+ scheduler.log.info('Scheduler with PID %d has stopped', os.getpid())
+
+
+def parse_names(queues_or_names) -> List[str]:
+ """Given a list of strings or queues, returns queue names"""
+ names = []
+ for queue_or_name in queues_or_names:
+ if isinstance(queue_or_name, Queue):
+ names.append(queue_or_name.name)
+ else:
+ names.append(str(queue_or_name))
+ return names