diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2023-03-23 11:23:50 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2023-03-23 11:23:50 +0700 |
commit | a4a74ee377b97b8b7ce57b020c45b21ede56cff2 (patch) | |
tree | ef4c0e7c1c62f485f00f7b85277beecaf515c120 /rq/cli/cli.py | |
parent | 63ed6216043ba2b5c3d18703d9945b0ecb25259e (diff) | |
parent | 04722339d7598ff0c52f11c3680ed2dd922e6768 (diff) | |
download | rq-watcher.tar.gz |
Merge branch 'master' of github.com:rq/rq into watcherwatcher
Diffstat (limited to 'rq/cli/cli.py')
-rwxr-xr-x | rq/cli/cli.py | 40 |
1 files changed, 33 insertions, 7 deletions
diff --git a/rq/cli/cli.py b/rq/cli/cli.py index f781010..27058e8 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -5,6 +5,7 @@ RQ command line tool from functools import update_wrapper import os import sys +import warnings import click from redis.exceptions import ConnectionError @@ -32,7 +33,7 @@ from rq.defaults import ( DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT, - DEFAULT_SERIALIZER_CLASS, + DEFAULT_SERIALIZER_CLASS, DEFAULT_MAINTENANCE_TASK_INTERVAL, ) from rq.exceptions import InvalidJobOperationError from rq.registry import FailedJobRegistry, clean_registries @@ -104,7 +105,8 @@ def empty(cli_config, all, queues, serializer, **options): if all: queues = cli_config.queue_class.all( - connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer + connection=cli_config.connection, job_class=cli_config.job_class, + death_penalty_class=cli_config.death_penalty_class, serializer=serializer ) else: queues = [ @@ -174,7 +176,6 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, try: with Connection(cli_config.connection): - if queues: qs = list(map(cli_config.queue_class, queues)) else: @@ -200,7 +201,13 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs') @click.option('--name', '-n', help='Specify a different name') @click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used') -@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used') +@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Worker timeout to be used') +@click.option( + '--maintenance-interval', + type=int, + default=DEFAULT_MAINTENANCE_TASK_INTERVAL, + help='Maintenance task interval (in seconds) to be used' +) @click.option( '--job-monitoring-interval', type=int, @@ -217,8 +224,10 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') @click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute') +@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') +@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues') @click.argument('queues', nargs=-1) @pass_cli_config def worker( @@ -228,6 +237,7 @@ def worker( name, results_ttl, worker_ttl, + maintenance_interval, job_monitoring_interval, disable_job_desc_logging, verbose, @@ -239,12 +249,14 @@ def worker( pid, disable_default_exception_handler, max_jobs, + max_idle_time, with_scheduler, queues, log_format, date_format, serializer, - **options + dequeue_strategy, + **options, ): """Starts an RQ worker.""" settings = read_config_file(cli_config.config) if cli_config.config else {} @@ -259,6 +271,17 @@ def worker( with open(os.path.expanduser(pid), "w") as fp: fp.write(str(os.getpid())) + worker_name = cli_config.worker_class.__qualname__ + if worker_name in ["RoundRobinWorker", "RandomWorker"]: + strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin" + msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy." + warnings.warn(msg, DeprecationWarning) + click.secho(msg, fg='yellow') + + if dequeue_strategy not in ("default", "random", "round_robin"): + click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red') + sys.exit(1) + setup_loghandlers_from_args(verbose, quiet, date_format, log_format) try: @@ -283,13 +306,14 @@ def worker( connection=cli_config.connection, default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, + maintenance_interval=maintenance_interval, job_monitoring_interval=job_monitoring_interval, job_class=cli_config.job_class, queue_class=cli_config.queue_class, exception_handlers=exception_handlers or None, disable_default_exception_handler=disable_default_exception_handler, log_job_description=not disable_job_desc_logging, - serializer=serializer, + serializer=serializer ) # Should we configure Sentry? @@ -309,7 +333,9 @@ def worker( date_format=date_format, log_format=log_format, max_jobs=max_jobs, + max_idle_time=max_idle_time, with_scheduler=with_scheduler, + dequeue_strategy=dequeue_strategy ) except ConnectionError as e: print(e) @@ -391,7 +417,7 @@ def enqueue( serializer, function, arguments, - **options + **options, ): """Enqueues a job from the command line""" args, kwargs = parse_function_args(arguments) |