summaryrefslogtreecommitdiff
path: root/rq/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'rq/queue.py')
-rw-r--r--rq/queue.py180
1 files changed, 98 insertions, 82 deletions
diff --git a/rq/queue.py b/rq/queue.py
index 77a6f3e..43b31eb 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -71,8 +71,11 @@ class Queue:
@classmethod
def all(
- cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None,
- serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None
+ cls,
+ connection: Optional['Redis'] = None,
+ job_class: Optional[Type['Job']] = None,
+ serializer=None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> List['Queue']:
"""Returns an iterable of all Queues.
@@ -89,8 +92,11 @@ class Queue:
def to_queue(queue_key):
return cls.from_queue_key(
- as_text(queue_key), connection=connection, job_class=job_class,
- serializer=serializer, death_penalty_class=death_penalty_class
+ as_text(queue_key),
+ connection=connection,
+ job_class=job_class,
+ serializer=serializer,
+ death_penalty_class=death_penalty_class,
)
all_registerd_queues = connection.smembers(cls.redis_queues_keys)
@@ -99,12 +105,12 @@ class Queue:
@classmethod
def from_queue_key(
- cls,
- queue_key: str,
- connection: Optional['Redis'] = None,
- job_class: Optional[Type['Job']] = None,
- serializer: Any = None,
- death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
+ cls,
+ queue_key: str,
+ connection: Optional['Redis'] = None,
+ job_class: Optional[Type['Job']] = None,
+ serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> 'Queue':
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
@@ -126,20 +132,25 @@ class Queue:
prefix = cls.redis_queue_namespace_prefix
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
- name = queue_key[len(prefix):]
- return cls(name, connection=connection, job_class=job_class, serializer=serializer,
- death_penalty_class=death_penalty_class)
+ name = queue_key[len(prefix) :]
+ return cls(
+ name,
+ connection=connection,
+ job_class=job_class,
+ serializer=serializer,
+ death_penalty_class=death_penalty_class,
+ )
def __init__(
- self,
- name: str = 'default',
- default_timeout: Optional[int] = None,
- connection: Optional['Redis'] = None,
- is_async: bool = True,
- job_class: Union[str, Type['Job'], None] = None,
- serializer: Any = None,
- death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
- **kwargs,
+ self,
+ name: str = 'default',
+ default_timeout: Optional[int] = None,
+ connection: Optional['Redis'] = None,
+ is_async: bool = True,
+ job_class: Union[str, Type['Job'], None] = None,
+ serializer: Any = None,
+ death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
+ **kwargs,
):
"""Initializes a Queue object.
@@ -207,6 +218,7 @@ class Queue:
@property
def scheduler_pid(self) -> int:
from rq.scheduler import RQScheduler
+
pid = self.connection.get(RQScheduler.get_locking_key(self.name))
return int(pid.decode()) if pid is not None else None
@@ -467,23 +479,23 @@ class Queue:
self.log.debug('Pushed job %s into %s, %s job(s) are in queue.', blue(job_id), green(self.name), result)
def create_job(
- self,
- func: 'FunctionReferenceType',
- args: Union[Tuple, List, None] = None,
- kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None,
- result_ttl: Optional[int] = None,
- ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None,
- description: Optional[str] = None,
- depends_on: Optional['JobDependencyType'] = None,
- job_id: Optional[str] = None,
- meta: Optional[Dict] = None,
- status: JobStatus = JobStatus.QUEUED,
- retry: Optional['Retry'] = None,
- *,
- on_success: Optional[Callable] = None,
- on_failure: Optional[Callable] = None,
+ self,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ depends_on: Optional['JobDependencyType'] = None,
+ job_id: Optional[str] = None,
+ meta: Optional[Dict] = None,
+ status: JobStatus = JobStatus.QUEUED,
+ retry: Optional['Retry'] = None,
+ *,
+ on_success: Optional[Callable] = None,
+ on_failure: Optional[Callable] = None,
) -> Job:
"""Creates a job based on parameters given
@@ -609,23 +621,23 @@ class Queue:
return job
def enqueue_call(
- self,
- func: 'FunctionReferenceType',
- args: Union[Tuple, List, None] = None,
- kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None,
- result_ttl: Optional[int] = None,
- ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None,
- description: Optional[str] = None,
- depends_on: Optional['JobDependencyType'] = None,
- job_id: Optional[str] = None,
- at_front: bool = False,
- meta: Optional[Dict] = None,
- retry: Optional['Retry'] = None,
- on_success: Optional[Callable[..., Any]] = None,
- on_failure: Optional[Callable[..., Any]] = None,
- pipeline: Optional['Pipeline'] = None,
+ self,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ depends_on: Optional['JobDependencyType'] = None,
+ job_id: Optional[str] = None,
+ at_front: bool = False,
+ meta: Optional[Dict] = None,
+ retry: Optional['Retry'] = None,
+ on_success: Optional[Callable[..., Any]] = None,
+ on_failure: Optional[Callable[..., Any]] = None,
+ pipeline: Optional['Pipeline'] = None,
) -> Job:
"""Creates a job to represent the delayed function call and enqueues it.
@@ -676,20 +688,20 @@ class Queue:
@staticmethod
def prepare_data(
- func: 'FunctionReferenceType',
- args: Union[Tuple, List, None] = None,
- kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None,
- result_ttl: Optional[int] = None,
- ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None,
- description: Optional[str] = None,
- job_id: Optional[str] = None,
- at_front: bool = False,
- meta: Optional[Dict] = None,
- retry: Optional['Retry'] = None,
- on_success: Optional[Callable] = None,
- on_failure: Optional[Callable] = None,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ job_id: Optional[str] = None,
+ at_front: bool = False,
+ meta: Optional[Dict] = None,
+ retry: Optional['Retry'] = None,
+ on_success: Optional[Callable] = None,
+ on_failure: Optional[Callable] = None,
) -> EnqueueData:
"""Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
And can keep this logic within EnqueueData
@@ -1001,7 +1013,6 @@ class Queue:
return self._enqueue_job(job, pipeline=pipeline, at_front=at_front)
return job
-
def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
"""Enqueues a job for delayed execution without checking dependencies.
@@ -1071,7 +1082,7 @@ class Queue:
return job
def enqueue_dependents(
- self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
+ self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
):
"""Enqueues all jobs in the given job's dependents set and clears it.
@@ -1108,7 +1119,7 @@ class Queue:
dependent_job_ids, connection=self.connection, serializer=self.serializer
)
if dependent_job
- and dependent_job.dependencies_are_met(
+ and dependent_job.dependencies_are_met(
parent_job=job,
pipeline=pipe,
exclude_job_id=exclude_job_id,
@@ -1208,13 +1219,13 @@ class Queue:
@classmethod
def dequeue_any(
- cls,
- queues: List['Queue'],
- timeout: Optional[int],
- connection: Optional['Redis'] = None,
- job_class: Optional['Job'] = None,
- serializer: Any = None,
- death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
+ cls,
+ queues: List['Queue'],
+ timeout: Optional[int],
+ connection: Optional['Redis'] = None,
+ job_class: Optional[Type['Job']] = None,
+ serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> Tuple['Job', 'Queue']:
"""Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important.
@@ -1248,8 +1259,13 @@ class Queue:
if result is None:
return None
queue_key, job_id = map(as_text, result)
- queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class,
- serializer=serializer, death_penalty_class=death_penalty_class)
+ queue = cls.from_queue_key(
+ queue_key,
+ connection=connection,
+ job_class=job_class,
+ serializer=serializer,
+ death_penalty_class=death_penalty_class,
+ )
try:
job = job_class.fetch(job_id, connection=connection, serializer=serializer)
except NoSuchJobError: