summaryrefslogtreecommitdiff
path: root/rq/cli/cli.py
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2023-03-23 11:23:50 +0700
committerSelwin Ong <selwin.ong@gmail.com>2023-03-23 11:23:50 +0700
commita4a74ee377b97b8b7ce57b020c45b21ede56cff2 (patch)
treeef4c0e7c1c62f485f00f7b85277beecaf515c120 /rq/cli/cli.py
parent63ed6216043ba2b5c3d18703d9945b0ecb25259e (diff)
parent04722339d7598ff0c52f11c3680ed2dd922e6768 (diff)
downloadrq-watcher.tar.gz
Merge branch 'master' of github.com:rq/rq into watcherwatcher
Diffstat (limited to 'rq/cli/cli.py')
-rwxr-xr-xrq/cli/cli.py40
1 files changed, 33 insertions, 7 deletions
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index f781010..27058e8 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -5,6 +5,7 @@ RQ command line tool
from functools import update_wrapper
import os
import sys
+import warnings
import click
from redis.exceptions import ConnectionError
@@ -32,7 +33,7 @@ from rq.defaults import (
DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT,
DEFAULT_LOGGING_DATE_FORMAT,
- DEFAULT_SERIALIZER_CLASS,
+ DEFAULT_SERIALIZER_CLASS, DEFAULT_MAINTENANCE_TASK_INTERVAL,
)
from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry, clean_registries
@@ -104,7 +105,8 @@ def empty(cli_config, all, queues, serializer, **options):
if all:
queues = cli_config.queue_class.all(
- connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
+ connection=cli_config.connection, job_class=cli_config.job_class,
+ death_penalty_class=cli_config.death_penalty_class, serializer=serializer
)
else:
queues = [
@@ -174,7 +176,6 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
try:
with Connection(cli_config.connection):
-
if queues:
qs = list(map(cli_config.queue_class, queues))
else:
@@ -200,7 +201,13 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used')
-@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used')
+@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Worker timeout to be used')
+@click.option(
+ '--maintenance-interval',
+ type=int,
+ default=DEFAULT_MAINTENANCE_TASK_INTERVAL,
+ help='Maintenance task interval (in seconds) to be used'
+)
@click.option(
'--job-monitoring-interval',
type=int,
@@ -217,8 +224,10 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
+@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
+@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues')
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(
@@ -228,6 +237,7 @@ def worker(
name,
results_ttl,
worker_ttl,
+ maintenance_interval,
job_monitoring_interval,
disable_job_desc_logging,
verbose,
@@ -239,12 +249,14 @@ def worker(
pid,
disable_default_exception_handler,
max_jobs,
+ max_idle_time,
with_scheduler,
queues,
log_format,
date_format,
serializer,
- **options
+ dequeue_strategy,
+ **options,
):
"""Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {}
@@ -259,6 +271,17 @@ def worker(
with open(os.path.expanduser(pid), "w") as fp:
fp.write(str(os.getpid()))
+ worker_name = cli_config.worker_class.__qualname__
+ if worker_name in ["RoundRobinWorker", "RandomWorker"]:
+ strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin"
+ msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy."
+ warnings.warn(msg, DeprecationWarning)
+ click.secho(msg, fg='yellow')
+
+ if dequeue_strategy not in ("default", "random", "round_robin"):
+ click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red')
+ sys.exit(1)
+
setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
try:
@@ -283,13 +306,14 @@ def worker(
connection=cli_config.connection,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
+ maintenance_interval=maintenance_interval,
job_monitoring_interval=job_monitoring_interval,
job_class=cli_config.job_class,
queue_class=cli_config.queue_class,
exception_handlers=exception_handlers or None,
disable_default_exception_handler=disable_default_exception_handler,
log_job_description=not disable_job_desc_logging,
- serializer=serializer,
+ serializer=serializer
)
# Should we configure Sentry?
@@ -309,7 +333,9 @@ def worker(
date_format=date_format,
log_format=log_format,
max_jobs=max_jobs,
+ max_idle_time=max_idle_time,
with_scheduler=with_scheduler,
+ dequeue_strategy=dequeue_strategy
)
except ConnectionError as e:
print(e)
@@ -391,7 +417,7 @@ def enqueue(
serializer,
function,
arguments,
- **options
+ **options,
):
"""Enqueues a job from the command line"""
args, kwargs = parse_function_args(arguments)