diff options
Diffstat (limited to 'rq/cli/cli.py')
| -rwxr-xr-x | rq/cli/cli.py | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 1bf9151..a2851aa 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -5,6 +5,7 @@ RQ command line tool from functools import update_wrapper import os import sys +import warnings import click from redis.exceptions import ConnectionError @@ -174,7 +175,6 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, try: with Connection(cli_config.connection): - if queues: qs = list(map(cli_config.queue_class, queues)) else: @@ -226,6 +226,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') +@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues') @click.argument('queues', nargs=-1) @pass_cli_config def worker( @@ -253,7 +254,8 @@ def worker( log_format, date_format, serializer, - **options + dequeue_strategy, + **options, ): """Starts an RQ worker.""" settings = read_config_file(cli_config.config) if cli_config.config else {} @@ -268,6 +270,17 @@ def worker( with open(os.path.expanduser(pid), "w") as fp: fp.write(str(os.getpid())) + worker_name = cli_config.worker_class.__qualname__ + if worker_name in ["RoundRobinWorker", "RandomWorker"]: + strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin" + msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy." + warnings.warn(msg, DeprecationWarning) + click.secho(msg, fg='yellow') + + if dequeue_strategy not in ("default", "random", "round_robin"): + click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red') + sys.exit(1) + setup_loghandlers_from_args(verbose, quiet, date_format, log_format) try: @@ -299,7 +312,7 @@ def worker( exception_handlers=exception_handlers or None, disable_default_exception_handler=disable_default_exception_handler, log_job_description=not disable_job_desc_logging, - serializer=serializer, + serializer=serializer ) # Should we configure Sentry? @@ -321,6 +334,7 @@ def worker( max_jobs=max_jobs, max_idle_time=max_idle_time, with_scheduler=with_scheduler, + dequeue_strategy=dequeue_strategy ) except ConnectionError as e: print(e) @@ -402,7 +416,7 @@ def enqueue( serializer, function, arguments, - **options + **options, ): """Enqueues a job from the command line""" args, kwargs = parse_function_args(arguments) |
