summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2023-04-08 18:55:28 +0700
committerGitHub <noreply@github.com>2023-04-08 18:55:28 +0700
commitb639ee640602586e850abec8450370b1e9684427 (patch)
tree5634ccf519e0a0fa5938434e24991e263429bf42
parented7a1714603e7d203591d2eb3ecd1a1eb5288bab (diff)
downloadrq-b639ee640602586e850abec8450370b1e9684427.tar.gz
Cleaned up cli.py (#1876)
-rwxr-xr-xrq/cli/cli.py75
-rw-r--r--rq/cli/helpers.py103
2 files changed, 98 insertions, 80 deletions
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index 27058e8..fbfe2bf 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -2,7 +2,6 @@
RQ command line tool
"""
-from functools import update_wrapper
import os
import sys
import warnings
@@ -18,22 +17,18 @@ from rq.cli.helpers import (
show_both,
show_queues,
show_workers,
- CliConfig,
parse_function_args,
parse_schedule,
+ pass_cli_config,
)
from rq.contrib.legacy import cleanup_ghosts
from rq.defaults import (
- DEFAULT_CONNECTION_CLASS,
- DEFAULT_JOB_CLASS,
- DEFAULT_QUEUE_CLASS,
- DEFAULT_WORKER_CLASS,
DEFAULT_RESULT_TTL,
DEFAULT_WORKER_TTL,
DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT,
DEFAULT_LOGGING_DATE_FORMAT,
- DEFAULT_SERIALIZER_CLASS, DEFAULT_MAINTENANCE_TASK_INTERVAL,
+ DEFAULT_MAINTENANCE_TASK_INTERVAL,
)
from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry, clean_registries
@@ -45,50 +40,6 @@ from rq.job import JobStatus
blue = make_colorizer('darkblue')
-# Disable the warning that Click displays (as of Click version 5.0) when users
-# use unicode_literals in Python 2.
-# See http://click.pocoo.org/dev/python3/#unicode-literals for more details.
-click.disable_unicode_literals_warning = True
-
-
-shared_options = [
- click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.'),
- click.option('--config', '-c', envvar='RQ_CONFIG', help='Module containing RQ settings.'),
- click.option(
- '--worker-class', '-w', envvar='RQ_WORKER_CLASS', default=DEFAULT_WORKER_CLASS, help='RQ Worker class to use'
- ),
- click.option('--job-class', '-j', envvar='RQ_JOB_CLASS', default=DEFAULT_JOB_CLASS, help='RQ Job class to use'),
- click.option('--queue-class', envvar='RQ_QUEUE_CLASS', default=DEFAULT_QUEUE_CLASS, help='RQ Queue class to use'),
- click.option(
- '--connection-class',
- envvar='RQ_CONNECTION_CLASS',
- default=DEFAULT_CONNECTION_CLASS,
- help='Redis client class to use',
- ),
- click.option('--path', '-P', default=['.'], help='Specify the import path.', multiple=True),
- click.option(
- '--serializer',
- '-S',
- default=DEFAULT_SERIALIZER_CLASS,
- help='Path to serializer, defaults to rq.serializers.DefaultSerializer',
- ),
-]
-
-
-def pass_cli_config(func):
- # add all the shared options to the command
- for option in shared_options:
- func = option(func)
-
- # pass the cli config object into the command
- def wrapper(*args, **kwargs):
- ctx = click.get_current_context()
- cli_config = CliConfig(**kwargs)
- return ctx.invoke(func, cli_config, *args[1:], **kwargs)
-
- return update_wrapper(wrapper, func)
-
-
@click.group()
@click.version_option(version)
def main():
@@ -105,8 +56,10 @@ def empty(cli_config, all, queues, serializer, **options):
if all:
queues = cli_config.queue_class.all(
- connection=cli_config.connection, job_class=cli_config.job_class,
- death_penalty_class=cli_config.death_penalty_class, serializer=serializer
+ connection=cli_config.connection,
+ job_class=cli_config.job_class,
+ death_penalty_class=cli_config.death_penalty_class,
+ serializer=serializer,
)
else:
queues = [
@@ -206,7 +159,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
'--maintenance-interval',
type=int,
default=DEFAULT_MAINTENANCE_TASK_INTERVAL,
- help='Maintenance task interval (in seconds) to be used'
+ help='Maintenance task interval (in seconds) to be used',
)
@click.option(
'--job-monitoring-interval',
@@ -227,7 +180,9 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
-@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues')
+@click.option(
+ '--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues'
+)
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(
@@ -274,12 +229,14 @@ def worker(
worker_name = cli_config.worker_class.__qualname__
if worker_name in ["RoundRobinWorker", "RandomWorker"]:
strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin"
- msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy."
+ msg = f"WARNING: {worker_name} is deprecated. Use `--dequeue-strategy {strategy_alternative}` instead."
warnings.warn(msg, DeprecationWarning)
click.secho(msg, fg='yellow')
if dequeue_strategy not in ("default", "random", "round_robin"):
- click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red')
+ click.secho(
+ "ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red'
+ )
sys.exit(1)
setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
@@ -313,7 +270,7 @@ def worker(
exception_handlers=exception_handlers or None,
disable_default_exception_handler=disable_default_exception_handler,
log_job_description=not disable_job_desc_logging,
- serializer=serializer
+ serializer=serializer,
)
# Should we configure Sentry?
@@ -335,7 +292,7 @@ def worker(
max_jobs=max_jobs,
max_idle_time=max_idle_time,
with_scheduler=with_scheduler,
- dequeue_strategy=dequeue_strategy
+ dequeue_strategy=dequeue_strategy,
)
except ConnectionError as e:
print(e)
diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py
index 0721198..bea2c37 100644
--- a/rq/cli/helpers.py
+++ b/rq/cli/helpers.py
@@ -2,7 +2,8 @@ import sys
import importlib
import time
import os
-from functools import partial
+
+from functools import partial, update_wrapper
from enum import Enum
from datetime import datetime, timezone, timedelta
@@ -13,8 +14,15 @@ from shutil import get_terminal_size
import click
from redis import Redis
from redis.sentinel import Sentinel
-from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, \
- DEFAULT_DEATH_PENALTY_CLASS
+
+from rq.defaults import (
+ DEFAULT_CONNECTION_CLASS,
+ DEFAULT_DEATH_PENALTY_CLASS,
+ DEFAULT_JOB_CLASS,
+ DEFAULT_QUEUE_CLASS,
+ DEFAULT_WORKER_CLASS,
+ DEFAULT_SERIALIZER_CLASS,
+)
from rq.logutils import setup_loghandlers
from rq.utils import import_attribute, parse_timeout
from rq.worker import WorkerStatus
@@ -42,7 +50,7 @@ def get_redis_from_config(settings, connection_class=Redis):
elif settings.get('SENTINEL') is not None:
instances = settings['SENTINEL'].get('INSTANCES', [('localhost', 26379)])
master_name = settings['SENTINEL'].get('MASTER_NAME', 'mymaster')
-
+
connection_kwargs = {
'db': settings['SENTINEL'].get('DB', 0),
'username': settings['SENTINEL'].get('USERNAME', None),
@@ -52,10 +60,8 @@ def get_redis_from_config(settings, connection_class=Redis):
}
connection_kwargs.update(settings['SENTINEL'].get('CONNECTION_KWARGS', {}))
sentinel_kwargs = settings['SENTINEL'].get('SENTINEL_KWARGS', {})
-
- sn = Sentinel(
- instances, sentinel_kwargs=sentinel_kwargs, **connection_kwargs
- )
+
+ sn = Sentinel(instances, sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
return sn.master_for(master_name)
ssl = settings.get('REDIS_SSL', False)
@@ -124,13 +130,22 @@ def show_queues(queues, raw, by_queue, queue_class, worker_class):
count = counts[q]
if not raw:
chart = green('|' + '█' * int(ratio * count))
- line = '%-12s %s %d, %d executing, %d finished, %d failed' \
- % (q.name, chart, count, q.started_job_registry.count, \
- q.finished_job_registry.count, q.failed_job_registry.count)
+ line = '%-12s %s %d, %d executing, %d finished, %d failed' % (
+ q.name,
+ chart,
+ count,
+ q.started_job_registry.count,
+ q.finished_job_registry.count,
+ q.failed_job_registry.count,
+ )
else:
- line = 'queue %s %d, %d executing, %d finished, %d failed' \
- % (q.name, count, q.started_job_registry.count, \
- q.finished_job_registry.count, q.failed_job_registry.count)
+ line = 'queue %s %d, %d executing, %d finished, %d failed' % (
+ q.name,
+ count,
+ q.started_job_registry.count,
+ q.finished_job_registry.count,
+ q.failed_job_registry.count,
+ )
click.echo(line)
num_jobs += count
@@ -155,14 +170,22 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
queue_names = ', '.join(worker.queue_names())
name = '%s (%s %s %s)' % (worker.name, worker.hostname, worker.ip_address, worker.pid)
if not raw:
- line = '%s: %s %s. jobs: %d finished, %d failed' \
- % (name, state_symbol(worker.get_state()), queue_names, \
- worker.successful_job_count, worker.failed_job_count)
+ line = '%s: %s %s. jobs: %d finished, %d failed' % (
+ name,
+ state_symbol(worker.get_state()),
+ queue_names,
+ worker.successful_job_count,
+ worker.failed_job_count,
+ )
click.echo(line)
else:
- line = 'worker %s %s %s. jobs: %d finished, %d failed' \
- % (name, worker.get_state(), queue_names,\
- worker.successful_job_count, worker.failed_job_count)
+ line = 'worker %s %s %s. jobs: %d finished, %d failed' % (
+ name,
+ worker.get_state(),
+ queue_names,
+ worker.successful_job_count,
+ worker.failed_job_count,
+ )
click.echo(line)
else:
@@ -318,7 +341,7 @@ class CliConfig:
connection_class=DEFAULT_CONNECTION_CLASS,
path=None,
*args,
- **kwargs
+ **kwargs,
):
self._connection = None
self.url = url
@@ -363,3 +386,41 @@ class CliConfig:
else:
self._connection = get_redis_from_config(os.environ, self.connection_class)
return self._connection
+
+
+shared_options = [
+ click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.'),
+ click.option('--config', '-c', envvar='RQ_CONFIG', help='Module containing RQ settings.'),
+ click.option(
+ '--worker-class', '-w', envvar='RQ_WORKER_CLASS', default=DEFAULT_WORKER_CLASS, help='RQ Worker class to use'
+ ),
+ click.option('--job-class', '-j', envvar='RQ_JOB_CLASS', default=DEFAULT_JOB_CLASS, help='RQ Job class to use'),
+ click.option('--queue-class', envvar='RQ_QUEUE_CLASS', default=DEFAULT_QUEUE_CLASS, help='RQ Queue class to use'),
+ click.option(
+ '--connection-class',
+ envvar='RQ_CONNECTION_CLASS',
+ default=DEFAULT_CONNECTION_CLASS,
+ help='Redis client class to use',
+ ),
+ click.option('--path', '-P', default=['.'], help='Specify the import path.', multiple=True),
+ click.option(
+ '--serializer',
+ '-S',
+ default=DEFAULT_SERIALIZER_CLASS,
+ help='Path to serializer, defaults to rq.serializers.DefaultSerializer',
+ ),
+]
+
+
+def pass_cli_config(func):
+ # add all the shared options to the command
+ for option in shared_options:
+ func = option(func)
+
+ # pass the cli config object into the command
+ def wrapper(*args, **kwargs):
+ ctx = click.get_current_context()
+ cli_config = CliConfig(**kwargs)
+ return ctx.invoke(func, cli_config, *args[1:], **kwargs)
+
+ return update_wrapper(wrapper, func)