summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlowercase00 <21188280+lowercase00@users.noreply.github.com>2023-03-04 20:23:00 -0300
committerGitHub <noreply@github.com>2023-03-05 06:23:00 +0700
commit654649743c8582eb1017e45d12aa315965a901a9 (patch)
treef68b5b4ddc74a53748367326812cc7fb38305078
parent0ba3971d55824e79af66d9e033811cee49de4ef5 (diff)
downloadrq-654649743c8582eb1017e45d12aa315965a901a9.tar.gz
New dequeue strategy (#1806)
* New dequeue strategy This implements a new parameter `dequeue_strategy` that should replace the `RoundRobinWorker` and `RandomWorker`. Changes includes: feature, docs, tests, deprecation warning. * Fix dequeue strategy name * Black & Fix warning * feat: tests, warnings, refactor naming * feat: improve worker check * fix: revert to str subclass * fix: dequeue strategy into bootstrap * org: move DequeueStrategy to worker * refactor: round robin naming * fix: naming * fix: type annotation * fix: typo * refactor: remove kwarg from worker's init * fix: typo * move `dequeue_strategy` from `bootstrap()` into `work()`
-rw-r--r--docs/docs/workers.md15
-rwxr-xr-xrq/cli/cli.py22
-rw-r--r--rq/cli/helpers.py2
-rw-r--r--rq/job.py6
-rw-r--r--rq/queue.py1
-rw-r--r--rq/results.py2
-rw-r--r--rq/utils.py1
-rw-r--r--rq/worker.py51
-rw-r--r--rq/worker_registration.py1
-rw-r--r--tests/test_cli.py15
-rw-r--r--tests/test_worker.py53
11 files changed, 140 insertions, 29 deletions
diff --git a/docs/docs/workers.md b/docs/docs/workers.md
index d19451f..52b4ff3 100644
--- a/docs/docs/workers.md
+++ b/docs/docs/workers.md
@@ -69,6 +69,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments:
* `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'`
* `--disable-job-desc-logging`: Turn off job description logging.
* `--max-jobs`: Maximum number of jobs to execute.
+* `--dequeue-strategy`: The strategy to dequeue jobs from multiple queues (one of `default`, `random` or `round_robin`, defaults to `default`)
_New in version 1.8.0._
* `--serializer`: Path to serializer object (e.g "rq.serializers.DefaultSerializer" or "rq.serializers.JSONSerializer")
@@ -317,19 +318,21 @@ $ rq worker -w 'path.to.GeventWorker'
The default worker considers the order of queues as their priority order,
and if a task is pending in a higher priority queue
-it will be selected before any other in queues with lower priority.
+it will be selected before any other in queues with lower priority (the `default` behavior).
+To choose the strategy that should be used, `rq` provides the `--dequeue-strategy / -ds` option.
In certain circumstances it can be useful that a when a worker is listening to multiple queues,
say `q1`,`q2`,`q3`, the jobs are dequeued using a Round Robin strategy. That is, the 1st
dequeued job is taken from `q1`, the 2nd from `q2`, the 3rd from `q3`, the 4th
-from `q1`, the 5th from `q2` and so on. The custom worker class `rq.worker.RoundRobinWorker`
-implements this strategy.
+from `q1`, the 5th from `q2` and so on. To implement this strategy use `-ds round_robin` argument.
-In some other circumstances, when a worker is listening to multiple queues, it can be useful
-to pull jobs from the different queues randomly. The custom class `rq.worker.RandomWorker`
-implements this strategy. In fact, whenever a job is pulled from any queue, the list of queues is
+In other circumstances, it can be useful to pull jobs from the different queues randomly.
+To implement this strategy use `-ds random` argument.
+In fact, whenever a job is pulled from any queue with the `random` strategy, the list of queues is
shuffled, so that no queue has more priority than the other ones.
+Deprecation Warning: Those strategies were formely being implemented by using the custom classes `rq.worker.RoundRobinWorker`
+and `rq.worker.RandomWorker`. As the `--dequeue-strategy` argument allows for this option to be used with any worker, those worker classes are deprecated and will be removed from future versions.
## Custom Job and Queue Classes
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index 1bf9151..a2851aa 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -5,6 +5,7 @@ RQ command line tool
from functools import update_wrapper
import os
import sys
+import warnings
import click
from redis.exceptions import ConnectionError
@@ -174,7 +175,6 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
try:
with Connection(cli_config.connection):
-
if queues:
qs = list(map(cli_config.queue_class, queues))
else:
@@ -226,6 +226,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
+@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues')
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(
@@ -253,7 +254,8 @@ def worker(
log_format,
date_format,
serializer,
- **options
+ dequeue_strategy,
+ **options,
):
"""Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {}
@@ -268,6 +270,17 @@ def worker(
with open(os.path.expanduser(pid), "w") as fp:
fp.write(str(os.getpid()))
+ worker_name = cli_config.worker_class.__qualname__
+ if worker_name in ["RoundRobinWorker", "RandomWorker"]:
+ strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin"
+ msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy."
+ warnings.warn(msg, DeprecationWarning)
+ click.secho(msg, fg='yellow')
+
+ if dequeue_strategy not in ("default", "random", "round_robin"):
+ click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red')
+ sys.exit(1)
+
setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
try:
@@ -299,7 +312,7 @@ def worker(
exception_handlers=exception_handlers or None,
disable_default_exception_handler=disable_default_exception_handler,
log_job_description=not disable_job_desc_logging,
- serializer=serializer,
+ serializer=serializer
)
# Should we configure Sentry?
@@ -321,6 +334,7 @@ def worker(
max_jobs=max_jobs,
max_idle_time=max_idle_time,
with_scheduler=with_scheduler,
+ dequeue_strategy=dequeue_strategy
)
except ConnectionError as e:
print(e)
@@ -402,7 +416,7 @@ def enqueue(
serializer,
function,
arguments,
- **options
+ **options,
):
"""Enqueues a job from the command line"""
args, kwargs = parse_function_args(arguments)
diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py
index fb20109..53bc019 100644
--- a/rq/cli/helpers.py
+++ b/rq/cli/helpers.py
@@ -100,7 +100,6 @@ def state_symbol(state):
def show_queues(queues, raw, by_queue, queue_class, worker_class):
-
num_jobs = 0
termwidth = get_terminal_size().columns
chartwidth = min(20, termwidth - 20)
@@ -141,7 +140,6 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
workers.add(worker)
if not by_queue:
-
for worker in workers:
queue_names = ', '.join(worker.queue_names())
name = '%s (%s %s %s)' % (worker.name, worker.hostname, worker.ip_address, worker.pid)
diff --git a/rq/job.py b/rq/job.py
index 0dcbf28..6e0333d 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -1322,9 +1322,8 @@ class Job:
# for backward compatibility
if self.supports_redis_streams:
from .results import Result
- Result.create(
- self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline
- )
+
+ Result.create(self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline)
if result_ttl != 0:
finished_job_registry = self.finished_job_registry
@@ -1344,6 +1343,7 @@ class Job:
)
if self.supports_redis_streams:
from .results import Result
+
Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline)
def get_retry_interval(self) -> int:
diff --git a/rq/queue.py b/rq/queue.py
index 45d6a40..8564786 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -1067,7 +1067,6 @@ class Queue:
dependents_key = job.dependents_key
while True:
-
try:
# if a pipeline is passed, the caller is responsible for calling WATCH
# to ensure all jobs are enqueued
diff --git a/rq/results.py b/rq/results.py
index 8ff770d..317b290 100644
--- a/rq/results.py
+++ b/rq/results.py
@@ -85,7 +85,7 @@ class Result(object):
# response = job.connection.zrange(cls.get_key(job.id), 0, 10, desc=True, withscores=True)
response = job.connection.xrevrange(cls.get_key(job.id), '+', '-')
results = []
- for (result_id, payload) in response:
+ for result_id, payload in response:
results.append(
cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer)
)
diff --git a/rq/utils.py b/rq/utils.py
index a304fd3..9cd1255 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -96,7 +96,6 @@ def make_colorizer(color: str):
class ColorizingStreamHandler(logging.StreamHandler):
-
levels = {
logging.WARNING: make_colorizer('darkyellow'),
logging.ERROR: make_colorizer('darkred'),
diff --git a/rq/worker.py b/rq/worker.py
index cb63c65..2ccfb78 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -11,12 +11,12 @@ import sys
import time
import traceback
import warnings
-
from datetime import timedelta
from enum import Enum
-from uuid import uuid4
from random import shuffle
-from typing import Any, Callable, List, Optional, TYPE_CHECKING, Tuple, Type, Union
+from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type,
+ Union)
+from uuid import uuid4
if TYPE_CHECKING:
from redis import Redis
@@ -26,7 +26,9 @@ try:
from signal import SIGKILL
except ImportError:
from signal import SIGTERM as SIGKILL
+
from contextlib import suppress
+
import redis.exceptions
from . import worker_registration
@@ -43,17 +45,20 @@ from .defaults import (
DEFAULT_LOGGING_DATE_FORMAT,
)
from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException
+
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import Queue
from .registry import StartedJobRegistry, clean_registries
from .scheduler import RQScheduler
+from .serializers import resolve_serializer
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text
from .version import VERSION
from .serializers import resolve_serializer
+
try:
from setproctitle import setproctitle as setprocname
except ImportError:
@@ -91,6 +96,12 @@ def signal_name(signum):
return 'SIG_UNKNOWN'
+class DequeueStrategy(str, Enum):
+ DEFAULT = "default"
+ ROUND_ROBIN = "round_robin"
+ RANDOM = "random"
+
+
class WorkerStatus(str, Enum):
STARTED = 'started'
SUSPENDED = 'suspended'
@@ -283,6 +294,7 @@ class Worker:
self.scheduler: Optional[RQScheduler] = None
self.pubsub = None
self.pubsub_thread = None
+ self._dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT
self.disable_default_exception_handler = disable_default_exception_handler
@@ -671,20 +683,36 @@ class Worker:
self.pubsub.unsubscribe()
self.pubsub.close()
- def reorder_queues(self, reference_queue):
- """Method placeholder to workers that implement some reordering strategy.
- `pass` here means that the queue will remain with the same job order.
+ def reorder_queues(self, reference_queue: 'Queue'):
+ """Reorder the queues according to the strategy.
+ As this can be defined both in the `Worker` initialization or in the `work` method,
+ it doesn't take the strategy directly, but rather uses the private `_dequeue_strategy` attribute.
Args:
- reference_queue (Union[Queue, str]): The queue
+ reference_queue (Union[Queue, str]): The queues to reorder
"""
- pass
+ if self._dequeue_strategy is None:
+ self._dequeue_strategy = DequeueStrategy.DEFAULT
+
+ if self._dequeue_strategy not in ("default", "random", "round_robin"):
+ raise ValueError(
+ f"Dequeue strategy {self._dequeue_strategy} is not allowed. Use `default`, `random` or `round_robin`."
+ )
+ if self._dequeue_strategy == DequeueStrategy.DEFAULT:
+ return
+ if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
+ pos = self._ordered_queues.index(reference_queue)
+ self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
+ return
+ if self._dequeue_strategy == DequeueStrategy.RANDOM:
+ shuffle(self._ordered_queues)
+ return
def bootstrap(
self,
logging_level: str = "INFO",
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
- log_format: str = DEFAULT_LOGGING_FORMAT,
+ log_format: str = DEFAULT_LOGGING_FORMAT
):
"""Bootstraps the worker.
Runs the basic tasks that should run when the worker actually starts working.
@@ -749,6 +777,7 @@ class Worker:
max_jobs: Optional[int] = None,
max_idle_time: Optional[int] = None,
with_scheduler: bool = False,
+ dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT
) -> bool:
"""Starts the work loop.
@@ -767,11 +796,13 @@ class Worker:
max_jobs (Optional[int], optional): Max number of jobs. Defaults to None.
max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None.
with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False.
+ dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs. Defaults to DequeueStrategy.DEFAULT
Returns:
worked (bool): Will return True if any job was processed, False otherwise.
"""
self.bootstrap(logging_level, date_format, log_format)
+ self._dequeue_strategy = dequeue_strategy
completed_jobs = 0
if with_scheduler:
self._start_scheduler(burst, logging_level, date_format, log_format)
@@ -1588,7 +1619,7 @@ class RoundRobinWorker(Worker):
def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue)
- self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
+ self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
class RandomWorker(Worker):
diff --git a/rq/worker_registration.py b/rq/worker_registration.py
index 0f31f1d..fe4dc04 100644
--- a/rq/worker_registration.py
+++ b/rq/worker_registration.py
@@ -86,7 +86,6 @@ def clean_worker_registry(queue: 'Queue'):
keys = list(get_keys(queue))
with queue.connection.pipeline() as pipeline:
-
for key in keys:
pipeline.exists(key)
results = pipeline.execute()
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 07b9c39..0cdca78 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -326,6 +326,21 @@ class TestRQCli(RQTestCase):
result = runner.invoke(main, args + ['--quiet', '--verbose'])
self.assertNotEqual(result.exit_code, 0)
+ def test_worker_dequeue_strategy(self):
+ """--quiet and --verbose logging options are supported"""
+ runner = CliRunner()
+ args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'random']
+ result = runner.invoke(main, args)
+ self.assert_normal_execution(result)
+
+ args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'round_robin']
+ result = runner.invoke(main, args)
+ self.assert_normal_execution(result)
+
+ args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'wrong']
+ result = runner.invoke(main, args)
+ self.assertEqual(result.exit_code, 1)
+
def test_exception_handlers(self):
"""rq worker -u <url> -b --exception-handler <handler>"""
connection = Redis.from_url(self.redis_url)
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 239252c..dfa0f1d 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -1136,6 +1136,59 @@ class TestWorker(RQTestCase):
worker = Worker.find_by_key(w2.key)
self.assertEqual(worker.python_version, python_version)
+ def test_dequeue_random_strategy(self):
+ qs = [Queue('q%d' % i) for i in range(5)]
+
+ for i in range(5):
+ for j in range(3):
+ qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
+
+ w = Worker(qs)
+ w.work(burst=True, dequeue_strategy="random")
+
+ start_times = []
+ for i in range(5):
+ for j in range(3):
+ job = Job.fetch('q%d_%d' % (i, j))
+ start_times.append(('q%d_%d' % (i, j), job.started_at))
+ sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
+ sorted_ids = [tup[0] for tup in sorted_by_time]
+ expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)]
+ expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)]
+
+ self.assertNotEqual(sorted_ids, expected_rr)
+ self.assertNotEqual(sorted_ids, expected_ser)
+ expected_rr.reverse()
+ expected_ser.reverse()
+ self.assertNotEqual(sorted_ids, expected_rr)
+ self.assertNotEqual(sorted_ids, expected_ser)
+ sorted_ids.sort()
+ expected_ser.sort()
+ self.assertEqual(sorted_ids, expected_ser)
+
+ def test_dequeue_round_robin(self):
+ qs = [Queue('q%d' % i) for i in range(5)]
+
+ for i in range(5):
+ for j in range(3):
+ qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
+
+ w = Worker(qs)
+ w.work(burst=True, dequeue_strategy="round_robin")
+
+ start_times = []
+ for i in range(5):
+ for j in range(3):
+ job = Job.fetch('q%d_%d' % (i, j))
+ start_times.append(('q%d_%d' % (i, j), job.started_at))
+ sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
+ sorted_ids = [tup[0] for tup in sorted_by_time]
+ expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0',
+ 'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1',
+ 'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2']
+
+ self.assertEqual(expected, sorted_ids)
+
def wait_and_kill_work_horse(pid, time_to_wait=0.0):
time.sleep(time_to_wait)