summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2023-04-26 09:16:37 +0700
committerGitHub <noreply@github.com>2023-04-26 09:16:37 +0700
commit36f5c88ca21ff30fb799e7d10b2f7b9bef4e585a (patch)
tree88531c9bd6acaf1cb6e5859b69d224a3f7b0c0a4
parent4073aa364283b52d8a255ae74179a452daa3a903 (diff)
downloadrq-36f5c88ca21ff30fb799e7d10b2f7b9bef4e585a.tar.gz
Added BaseWorker class (#1887)
* Added BaseWorker class * Moved logging related functions to logutils * Remove uneeded colorize function * Updated worker.get_current_job_id() to not fail when job ID is None * job.restore() shouldn't crash if is not present * Fix various as_text() related crashes
-rwxr-xr-xrq/cli/cli.py7
-rw-r--r--rq/job.py24
-rw-r--r--rq/logutils.py101
-rw-r--r--rq/queue.py190
-rw-r--r--rq/utils.py107
-rw-r--r--rq/worker.py148
6 files changed, 301 insertions, 276 deletions
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index fbfe2bf..55421f6 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -31,13 +31,12 @@ from rq.defaults import (
DEFAULT_MAINTENANCE_TASK_INTERVAL,
)
from rq.exceptions import InvalidJobOperationError
+from rq.job import JobStatus
+from rq.logutils import blue
from rq.registry import FailedJobRegistry, clean_registries
-from rq.utils import import_attribute, get_call_string, make_colorizer
from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended
+from rq.utils import import_attribute, get_call_string
from rq.worker_registration import clean_worker_registry
-from rq.job import JobStatus
-
-blue = make_colorizer('darkblue')
@click.group()
diff --git a/rq/job.py b/rq/job.py
index 07ec6cb..a0574e6 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -158,7 +158,7 @@ class Job:
serializer=None,
*,
on_success: Optional[Union['Callback', Callable[..., Any]]] = None,
- on_failure: Optional[Union['Callback', Callable[..., Any]]] = None
+ on_failure: Optional[Union['Callback', Callable[..., Any]]] = None,
) -> 'Job':
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
@@ -239,16 +239,18 @@ class Job:
if on_success:
if not isinstance(on_success, Callback):
- warnings.warn('Passing a `Callable` `on_success` is deprecated, pass `Callback` instead',
- DeprecationWarning)
+ warnings.warn(
+ 'Passing a `Callable` `on_success` is deprecated, pass `Callback` instead', DeprecationWarning
+ )
on_success = Callback(on_success) # backward compatibility
job._success_callback_name = on_success.name
job._success_callback_timeout = on_success.timeout
if on_failure:
if not isinstance(on_failure, Callback):
- warnings.warn('Passing a `Callable` `on_failure` is deprecated, pass `Callback` instead',
- DeprecationWarning)
+ warnings.warn(
+ 'Passing a `Callable` `on_failure` is deprecated, pass `Callback` instead', DeprecationWarning
+ )
on_failure = Callback(on_failure) # backward compatibility
job._failure_callback_name = on_failure.name
job._failure_callback_timeout = on_failure.timeout
@@ -302,7 +304,8 @@ class Job:
status (JobStatus): The Job Status
"""
if refresh:
- self._status = as_text(self.connection.hget(self.key, 'status'))
+ status = self.connection.hget(self.key, 'status')
+ self._status = as_text(status) if status else None
return self._status
def set_status(self, status: JobStatus, pipeline: Optional['Pipeline'] = None) -> None:
@@ -872,9 +875,9 @@ class Job:
self.data = raw_data
self.created_at = str_to_date(obj.get('created_at'))
- self.origin = as_text(obj.get('origin'))
+ self.origin = as_text(obj.get('origin')) if obj.get('origin') else None
self.worker_name = obj.get('worker_name').decode() if obj.get('worker_name') else None
- self.description = as_text(obj.get('description'))
+ self.description = as_text(obj.get('description')) if obj.get('description') else None
self.enqueued_at = str_to_date(obj.get('enqueued_at'))
self.started_at = str_to_date(obj.get('started_at'))
self.ended_at = str_to_date(obj.get('ended_at'))
@@ -1360,8 +1363,7 @@ class Job:
self.success_callback(self, self.connection, result)
def execute_failure_callback(self, death_penalty_class: Type[BaseDeathPenalty], *exc_info):
- """Executes failure_callback with possible timeout
- """
+ """Executes failure_callback with possible timeout"""
if not self.failure_callback:
return
@@ -1369,7 +1371,7 @@ class Job:
try:
with death_penalty_class(self.failure_callback_timeout, JobTimeoutException, job_id=self.id):
self.failure_callback(self, self.connection, *exc_info)
- except Exception: # noqa
+ except Exception: # noqa
logger.exception(f'Job {self.id}: error while executing failure callback')
raise
diff --git a/rq/logutils.py b/rq/logutils.py
index 33e0949..b36ece8 100644
--- a/rq/logutils.py
+++ b/rq/logutils.py
@@ -2,10 +2,109 @@ import logging
import sys
from typing import Union
-from rq.utils import ColorizingStreamHandler
from rq.defaults import DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT
+class _Colorizer:
+ def __init__(self):
+ esc = "\x1b["
+
+ self.codes = {}
+ self.codes[""] = ""
+ self.codes["reset"] = esc + "39;49;00m"
+
+ self.codes["bold"] = esc + "01m"
+ self.codes["faint"] = esc + "02m"
+ self.codes["standout"] = esc + "03m"
+ self.codes["underline"] = esc + "04m"
+ self.codes["blink"] = esc + "05m"
+ self.codes["overline"] = esc + "06m"
+
+ dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"]
+ light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"]
+
+ x = 30
+ for d, l in zip(dark_colors, light_colors):
+ self.codes[d] = esc + "%im" % x
+ self.codes[l] = esc + "%i;01m" % x
+ x += 1
+
+ del d, l, x
+
+ self.codes["darkteal"] = self.codes["turquoise"]
+ self.codes["darkyellow"] = self.codes["brown"]
+ self.codes["fuscia"] = self.codes["fuchsia"]
+ self.codes["white"] = self.codes["bold"]
+
+ if hasattr(sys.stdout, "isatty"):
+ self.notty = not sys.stdout.isatty()
+ else:
+ self.notty = True
+
+ def colorize(self, color_key, text):
+ if self.notty:
+ return text
+ else:
+ return self.codes[color_key] + text + self.codes["reset"]
+
+
+colorizer = _Colorizer()
+
+
+def make_colorizer(color: str):
+ """Creates a function that colorizes text with the given color.
+
+ For example::
+
+ ..codeblock::python
+
+ >>> green = make_colorizer('darkgreen')
+ >>> red = make_colorizer('red')
+ >>>
+ >>> # You can then use:
+ >>> print("It's either " + green('OK') + ' or ' + red('Oops'))
+ """
+
+ def inner(text):
+ return colorizer.colorize(color, text)
+
+ return inner
+
+
+green = make_colorizer('darkgreen')
+yellow = make_colorizer('darkyellow')
+blue = make_colorizer('darkblue')
+red = make_colorizer('darkred')
+
+
+class ColorizingStreamHandler(logging.StreamHandler):
+ levels = {
+ logging.WARNING: yellow,
+ logging.ERROR: red,
+ logging.CRITICAL: red,
+ }
+
+ def __init__(self, exclude=None, *args, **kwargs):
+ self.exclude = exclude
+ super().__init__(*args, **kwargs)
+
+ @property
+ def is_tty(self):
+ isatty = getattr(self.stream, 'isatty', None)
+ return isatty and isatty()
+
+ def format(self, record):
+ message = logging.StreamHandler.format(self, record)
+ if self.is_tty:
+ # Don't colorize any traceback
+ parts = message.split('\n', 1)
+ parts[0] = " ".join([parts[0].split(" ", 1)[0], parts[0].split(" ", 1)[1]])
+
+ message = '\n'.join(parts)
+
+ return message
+
+
def setup_loghandlers(
level: Union[int, str, None] = None,
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
diff --git a/rq/queue.py b/rq/queue.py
index 7a2737f..ebfdd47 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -22,13 +22,11 @@ from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
from .job import Job, JobStatus
+from .logutils import blue, green, yellow
from .types import FunctionReferenceType, JobDependencyType
from .serializers import resolve_serializer
-from .utils import backend_class, get_version, import_attribute, make_colorizer, parse_timeout, utcnow, compact
+from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow, compact
-green = make_colorizer('darkgreen')
-yellow = make_colorizer('darkyellow')
-blue = make_colorizer('darkblue')
logger = logging.getLogger("rq.queue")
@@ -71,8 +69,11 @@ class Queue:
@classmethod
def all(
- cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None,
- serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None
+ cls,
+ connection: Optional['Redis'] = None,
+ job_class: Optional[Type['Job']] = None,
+ serializer=None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> List['Queue']:
"""Returns an iterable of all Queues.
@@ -89,8 +90,11 @@ class Queue:
def to_queue(queue_key):
return cls.from_queue_key(
- as_text(queue_key), connection=connection, job_class=job_class,
- serializer=serializer, death_penalty_class=death_penalty_class
+ as_text(queue_key),
+ connection=connection,
+ job_class=job_class,
+ serializer=serializer,
+ death_penalty_class=death_penalty_class,
)
all_registerd_queues = connection.smembers(cls.redis_queues_keys)
@@ -99,12 +103,12 @@ class Queue:
@classmethod
def from_queue_key(
- cls,
- queue_key: str,
- connection: Optional['Redis'] = None,
- job_class: Optional[Type['Job']] = None,
- serializer: Any = None,
- death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
+ cls,
+ queue_key: str,
+ connection: Optional['Redis'] = None,
+ job_class: Optional[Type['Job']] = None,
+ serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> 'Queue':
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
@@ -126,20 +130,25 @@ class Queue:
prefix = cls.redis_queue_namespace_prefix
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
- name = queue_key[len(prefix):]
- return cls(name, connection=connection, job_class=job_class, serializer=serializer,
- death_penalty_class=death_penalty_class)
+ name = queue_key[len(prefix) :]
+ return cls(
+ name,
+ connection=connection,
+ job_class=job_class,
+ serializer=serializer,
+ death_penalty_class=death_penalty_class,
+ )
def __init__(
- self,
- name: str = 'default',
- default_timeout: Optional[int] = None,
- connection: Optional['Redis'] = None,
- is_async: bool = True,
- job_class: Union[str, Type['Job'], None] = None,
- serializer: Any = None,
- death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
- **kwargs,
+ self,
+ name: str = 'default',
+ default_timeout: Optional[int] = None,
+ connection: Optional['Redis'] = None,
+ is_async: bool = True,
+ job_class: Union[str, Type['Job'], None] = None,
+ serializer: Any = None,
+ death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
+ **kwargs,
):
"""Initializes a Queue object.
@@ -207,6 +216,7 @@ class Queue:
@property
def scheduler_pid(self) -> int:
from rq.scheduler import RQScheduler
+
pid = self.connection.get(RQScheduler.get_locking_key(self.name))
return int(pid.decode()) if pid is not None else None
@@ -444,10 +454,10 @@ class Queue:
self.connection.rename(self.key, COMPACT_QUEUE)
while True:
- job_id = as_text(self.connection.lpop(COMPACT_QUEUE))
+ job_id = self.connection.lpop(COMPACT_QUEUE)
if job_id is None:
break
- if self.job_class.exists(job_id, self.connection):
+ if self.job_class.exists(as_text(job_id), self.connection):
self.connection.rpush(self.key, job_id)
def push_job_id(self, job_id: str, pipeline: Optional['Pipeline'] = None, at_front: bool = False):
@@ -469,23 +479,23 @@ class Queue:
self.log.debug('Pushed job %s into %s', blue(job_id), green(self.name))
def create_job(
- self,
- func: 'FunctionReferenceType',
- args: Union[Tuple, List, None] = None,
- kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None,
- result_ttl: Optional[int] = None,
- ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None,
- description: Optional[str] = None,
- depends_on: Optional['JobDependencyType'] = None,
- job_id: Optional[str] = None,
- meta: Optional[Dict] = None,
- status: JobStatus = JobStatus.QUEUED,
- retry: Optional['Retry'] = None,
- *,
- on_success: Optional[Callable] = None,
- on_failure: Optional[Callable] = None,
+ self,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ depends_on: Optional['JobDependencyType'] = None,
+ job_id: Optional[str] = None,
+ meta: Optional[Dict] = None,
+ status: JobStatus = JobStatus.QUEUED,
+ retry: Optional['Retry'] = None,
+ *,
+ on_success: Optional[Callable] = None,
+ on_failure: Optional[Callable] = None,
) -> Job:
"""Creates a job based on parameters given
@@ -611,23 +621,23 @@ class Queue:
return job
def enqueue_call(
- self,
- func: 'FunctionReferenceType',
- args: Union[Tuple, List, None] = None,
- kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None,
- result_ttl: Optional[int] = None,
- ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None,
- description: Optional[str] = None,
- depends_on: Optional['JobDependencyType'] = None,
- job_id: Optional[str] = None,
- at_front: bool = False,
- meta: Optional[Dict] = None,
- retry: Optional['Retry'] = None,
- on_success: Optional[Callable[..., Any]] = None,
- on_failure: Optional[Callable[..., Any]] = None,
- pipeline: Optional['Pipeline'] = None,
+ self,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ depends_on: Optional['JobDependencyType'] = None,
+ job_id: Optional[str] = None,
+ at_front: bool = False,
+ meta: Optional[Dict] = None,
+ retry: Optional['Retry'] = None,
+ on_success: Optional[Callable[..., Any]] = None,
+ on_failure: Optional[Callable[..., Any]] = None,
+ pipeline: Optional['Pipeline'] = None,
) -> Job:
"""Creates a job to represent the delayed function call and enqueues it.
@@ -678,20 +688,20 @@ class Queue:
@staticmethod
def prepare_data(
- func: 'FunctionReferenceType',
- args: Union[Tuple, List, None] = None,
- kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None,
- result_ttl: Optional[int] = None,
- ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None,
- description: Optional[str] = None,
- job_id: Optional[str] = None,
- at_front: bool = False,
- meta: Optional[Dict] = None,
- retry: Optional['Retry'] = None,
- on_success: Optional[Callable] = None,
- on_failure: Optional[Callable] = None,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ job_id: Optional[str] = None,
+ at_front: bool = False,
+ meta: Optional[Dict] = None,
+ retry: Optional['Retry'] = None,
+ on_success: Optional[Callable] = None,
+ on_failure: Optional[Callable] = None,
) -> EnqueueData:
"""Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
And can keep this logic within EnqueueData
@@ -1003,7 +1013,6 @@ class Queue:
return self._enqueue_job(job, pipeline=pipeline, at_front=at_front)
return job
-
def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
"""Enqueues a job for delayed execution without checking dependencies.
@@ -1073,7 +1082,7 @@ class Queue:
return job
def enqueue_dependents(
- self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
+ self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
):
"""Enqueues all jobs in the given job's dependents set and clears it.
@@ -1110,7 +1119,7 @@ class Queue:
dependent_job_ids, connection=self.connection, serializer=self.serializer
)
if dependent_job
- and dependent_job.dependencies_are_met(
+ and dependent_job.dependencies_are_met(
parent_job=job,
pipeline=pipe,
exclude_job_id=exclude_job_id,
@@ -1210,13 +1219,13 @@ class Queue:
@classmethod
def dequeue_any(
- cls,
- queues: List['Queue'],
- timeout: Optional[int],
- connection: Optional['Redis'] = None,
- job_class: Optional['Job'] = None,
- serializer: Any = None,
- death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
+ cls,
+ queues: List['Queue'],
+ timeout: Optional[int],
+ connection: Optional['Redis'] = None,
+ job_class: Optional['Job'] = None,
+ serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> Tuple['Job', 'Queue']:
"""Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important.
@@ -1250,8 +1259,13 @@ class Queue:
if result is None:
return None
queue_key, job_id = map(as_text, result)
- queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class,
- serializer=serializer, death_penalty_class=death_penalty_class)
+ queue = cls.from_queue_key(
+ queue_key,
+ connection=connection,
+ job_class=job_class,
+ serializer=serializer,
+ death_penalty_class=death_penalty_class,
+ )
try:
job = job_class.fetch(job_id, connection=connection, serializer=serializer)
except NoSuchJobError:
diff --git a/rq/utils.py b/rq/utils.py
index b10e262..ca51779 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -26,105 +26,6 @@ from .exceptions import TimeoutFormatError
logger = logging.getLogger(__name__)
-class _Colorizer:
- def __init__(self):
- esc = "\x1b["
-
- self.codes = {}
- self.codes[""] = ""
- self.codes["reset"] = esc + "39;49;00m"
-
- self.codes["bold"] = esc + "01m"
- self.codes["faint"] = esc + "02m"
- self.codes["standout"] = esc + "03m"
- self.codes["underline"] = esc + "04m"
- self.codes["blink"] = esc + "05m"
- self.codes["overline"] = esc + "06m"
-
- dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"]
- light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"]
-
- x = 30
- for d, l in zip(dark_colors, light_colors):
- self.codes[d] = esc + "%im" % x
- self.codes[l] = esc + "%i;01m" % x
- x += 1
-
- del d, l, x
-
- self.codes["darkteal"] = self.codes["turquoise"]
- self.codes["darkyellow"] = self.codes["brown"]
- self.codes["fuscia"] = self.codes["fuchsia"]
- self.codes["white"] = self.codes["bold"]
-
- if hasattr(sys.stdout, "isatty"):
- self.notty = not sys.stdout.isatty()
- else:
- self.notty = True
-
- def reset_color(self):
- return self.codes["reset"]
-
- def colorize(self, color_key, text):
- if self.notty:
- return text
- else:
- return self.codes[color_key] + text + self.codes["reset"]
-
-
-colorizer = _Colorizer()
-
-
-def make_colorizer(color: str):
- """Creates a function that colorizes text with the given color.
-
- For example::
-
- ..codeblock::python
-
- >>> green = make_colorizer('darkgreen')
- >>> red = make_colorizer('red')
- >>>
- >>> # You can then use:
- >>> print("It's either " + green('OK') + ' or ' + red('Oops'))
- """
-
- def inner(text):
- return colorizer.colorize(color, text)
-
- return inner
-
-
-class ColorizingStreamHandler(logging.StreamHandler):
- levels = {
- logging.WARNING: make_colorizer('darkyellow'),
- logging.ERROR: make_colorizer('darkred'),
- logging.CRITICAL: make_colorizer('darkred'),
- }
-
- def __init__(self, exclude=None, *args, **kwargs):
- self.exclude = exclude
- super().__init__(*args, **kwargs)
-
- @property
- def is_tty(self):
- isatty = getattr(self.stream, 'isatty', None)
- return isatty and isatty()
-
- def format(self, record):
- message = logging.StreamHandler.format(self, record)
- if self.is_tty:
- colorize = self.levels.get(record.levelno, lambda x: x)
-
- # Don't colorize any traceback
- parts = message.split('\n', 1)
- parts[0] = " ".join([parts[0].split(" ", 1)[0], colorize(parts[0].split(" ", 1)[1])])
-
- message = '\n'.join(parts)
-
- return message
-
-
def compact(lst: List[Any]) -> List[Any]:
"""Excludes `None` values from a list-like object.
@@ -137,7 +38,7 @@ def compact(lst: List[Any]) -> List[Any]:
return [item for item in lst if item is not None]
-def as_text(v: Union[bytes, str]) -> Optional[str]:
+def as_text(v: Union[bytes, str]) -> str:
"""Converts a bytes value to a string using `utf-8`.
Args:
@@ -149,9 +50,7 @@ def as_text(v: Union[bytes, str]) -> Optional[str]:
Returns:
value (Optional[str]): Either the decoded string or None
"""
- if v is None:
- return None
- elif isinstance(v, bytes):
+ if isinstance(v, bytes):
return v.decode('utf-8')
elif isinstance(v, str):
return v
@@ -181,7 +80,7 @@ def import_attribute(name: str) -> Callable[..., Any]:
E.g.: package_a.package_b.module_a.ClassA.my_static_method
Thus we remove the bits from the end of the name until we can import it
-
+
Args:
name (str): The name (reference) to the path.
diff --git a/rq/worker.py b/rq/worker.py
index 34c6c95..2b41aad 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -13,8 +13,7 @@ import warnings
from datetime import timedelta
from enum import Enum
from random import shuffle
-from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type,
- Union)
+from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, Union
from uuid import uuid4
if TYPE_CHECKING:
@@ -50,14 +49,23 @@ from .defaults import (
from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus
-from .logutils import setup_loghandlers
+from .logutils import blue, green, setup_loghandlers, yellow
from .queue import Queue
from .registry import StartedJobRegistry, clean_registries
from .scheduler import RQScheduler
from .serializers import resolve_serializer
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
-from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text
+from .utils import (
+ backend_class,
+ ensure_list,
+ get_version,
+ utcformat,
+ utcnow,
+ utcparse,
+ compact,
+ as_text,
+)
from .version import VERSION
from .serializers import resolve_serializer
@@ -70,10 +78,6 @@ except ImportError:
pass
-green = make_colorizer('darkgreen')
-yellow = make_colorizer('darkyellow')
-blue = make_colorizer('darkblue')
-
logger = logging.getLogger("rq.worker")
@@ -112,12 +116,13 @@ class WorkerStatus(str, Enum):
IDLE = 'idle'
-class Worker:
+class BaseWorker:
redis_worker_namespace_prefix = 'rq:worker:'
redis_workers_keys = worker_registration.REDIS_WORKER_KEYS
death_penalty_class = UnixSignalDeathPenalty
queue_class = Queue
job_class = Job
+
# `log_result_lifespan` controls whether "Result is kept for XXX seconds"
# messages are logged after every job, by default they are.
log_result_lifespan = True
@@ -182,6 +187,51 @@ class Worker:
"""
return len(worker_registration.get_keys(queue=queue, connection=connection))
+ def get_redis_server_version(self):
+ """Return Redis server version of connection"""
+ if not self.redis_server_version:
+ self.redis_server_version = get_version(self.connection)
+ return self.redis_server_version
+
+ def validate_queues(self):
+ """Sanity check for the given queues."""
+ for queue in self.queues:
+ if not isinstance(queue, self.queue_class):
+ raise TypeError('{0} is not of type {1} or string types'.format(queue, self.queue_class))
+
+ def queue_names(self) -> List[str]:
+ """Returns the queue names of this worker's queues.
+
+ Returns:
+ List[str]: The queue names.
+ """
+ return [queue.name for queue in self.queues]
+
+ def queue_keys(self) -> List[str]:
+ """Returns the Redis keys representing this worker's queues.
+
+ Returns:
+ List[str]: The list of strings with queues keys
+ """
+ return [queue.key for queue in self.queues]
+
+ @property
+ def key(self):
+ """Returns the worker's Redis hash key."""
+ return self.redis_worker_namespace_prefix + self.name
+
+ @property
+ def pubsub_channel_name(self):
+ """Returns the worker's Redis hash key."""
+ return PUBSUB_CHANNEL_TEMPLATE % self.name
+
+ @property
+ def supports_redis_streams(self) -> bool:
+ """Only supported by Redis server >= 5.0 is required."""
+ return self.get_redis_server_version() >= (5, 0, 0)
+
+
+class Worker(BaseWorker):
@classmethod
def find_by_key(
cls,
@@ -249,7 +299,7 @@ class Worker:
disable_default_exception_handler: bool = False,
prepare_for_work: bool = True,
serializer=None,
- work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None
+ work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None,
): # noqa
self.default_result_ttl = default_result_ttl
self.worker_ttl = default_worker_ttl
@@ -267,8 +317,13 @@ class Worker:
self.serializer = resolve_serializer(serializer)
queues = [
- self.queue_class(name=q, connection=connection, job_class=self.job_class,
- serializer=self.serializer, death_penalty_class=self.death_penalty_class,)
+ self.queue_class(
+ name=q,
+ connection=connection,
+ job_class=self.job_class,
+ serializer=self.serializer,
+ death_penalty_class=self.death_penalty_class,
+ )
if isinstance(q, str)
else q
for q in ensure_list(queues)
@@ -344,49 +399,6 @@ class Worker:
connection.connection_pool.connection_kwargs.update(timeout_config)
return connection
- def get_redis_server_version(self):
- """Return Redis server version of connection"""
- if not self.redis_server_version:
- self.redis_server_version = get_version(self.connection)
- return self.redis_server_version
-
- def validate_queues(self):
- """Sanity check for the given queues."""
- for queue in self.queues:
- if not isinstance(queue, self.queue_class):
- raise TypeError('{0} is not of type {1} or string types'.format(queue, self.queue_class))
-
- def queue_names(self) -> List[str]:
- """Returns the queue names of this worker's queues.
-
- Returns:
- List[str]: The queue names.
- """
- return [queue.name for queue in self.queues]
-
- def queue_keys(self) -> List[str]:
- """Returns the Redis keys representing this worker's queues.
-
- Returns:
- List[str]: The list of strings with queues keys
- """
- return [queue.key for queue in self.queues]
-
- @property
- def key(self):
- """Returns the worker's Redis hash key."""
- return self.redis_worker_namespace_prefix + self.name
-
- @property
- def pubsub_channel_name(self):
- """Returns the worker's Redis hash key."""
- return PUBSUB_CHANNEL_TEMPLATE % self.name
-
- @property
- def supports_redis_streams(self) -> bool:
- """Only supported by Redis server >= 5.0 is required."""
- return self.get_redis_server_version() >= (5, 0, 0)
-
@property
def horse_pid(self):
"""The horse's process ID. Only available in the worker. Will return
@@ -538,7 +550,10 @@ class Worker:
job_id (Optional[str): The job id
"""
connection = pipeline if pipeline is not None else self.connection
- return as_text(connection.hget(self.key, 'current_job'))
+ result = connection.hget(self.key, 'current_job')
+ if result is None:
+ return None
+ return as_text(result)
def get_current_job(self) -> Optional['Job']:
"""Returns the currently executing job instance.
@@ -706,7 +721,7 @@ class Worker:
return
if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
pos = self._ordered_queues.index(reference_queue)
- self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
+ self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
return
if self._dequeue_strategy == DequeueStrategy.RANDOM:
shuffle(self._ordered_queues)
@@ -716,7 +731,7 @@ class Worker:
self,
logging_level: str = "INFO",
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
- log_format: str = DEFAULT_LOGGING_FORMAT
+ log_format: str = DEFAULT_LOGGING_FORMAT,
):
"""Bootstraps the worker.
Runs the basic tasks that should run when the worker actually starts working.
@@ -781,7 +796,7 @@ class Worker:
max_jobs: Optional[int] = None,
max_idle_time: Optional[int] = None,
with_scheduler: bool = False,
- dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT
+ dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
) -> bool:
"""Starts the work loop.
@@ -881,7 +896,9 @@ class Worker:
pass
self.scheduler._process.join()
- def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']:
+ def dequeue_job_and_maintain_ttl(
+ self, timeout: Optional[int], max_idle_time: Optional[int] = None
+ ) -> Tuple['Job', 'Queue']:
"""Dequeues a job while maintaining the TTL.
Returns:
@@ -1167,10 +1184,7 @@ class Worker:
self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string)
self.handle_work_horse_killed(job, retpid, ret_val, rusage)
- self.handle_job_failure(
- job, queue=queue,
- exc_string=exc_string
- )
+ self.handle_job_failure(job, queue=queue, exc_string=exc_string)
def execute_job(self, job: 'Job', queue: 'Queue'):
"""Spawns a work horse to perform the actual work and passes it a job.
@@ -1458,9 +1472,7 @@ class Worker:
extra.update({'queue': job.origin, 'job_id': job.id})
# func_name
- self.log.error(
- '[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra
- )
+ self.log.error('[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra)
for handler in self._exc_handlers:
self.log.debug('Invoking exception handler %s', handler)
@@ -1598,7 +1610,7 @@ class RoundRobinWorker(Worker):
def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue)
- self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
+ self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
class RandomWorker(Worker):