diff options
Diffstat (limited to 'rq/queue.py')
-rw-r--r-- | rq/queue.py | 180 |
1 files changed, 98 insertions, 82 deletions
diff --git a/rq/queue.py b/rq/queue.py index 77a6f3e..43b31eb 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -71,8 +71,11 @@ class Queue: @classmethod def all( - cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, - serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None + cls, + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + serializer=None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> List['Queue']: """Returns an iterable of all Queues. @@ -89,8 +92,11 @@ class Queue: def to_queue(queue_key): return cls.from_queue_key( - as_text(queue_key), connection=connection, job_class=job_class, - serializer=serializer, death_penalty_class=death_penalty_class + as_text(queue_key), + connection=connection, + job_class=job_class, + serializer=serializer, + death_penalty_class=death_penalty_class, ) all_registerd_queues = connection.smembers(cls.redis_queues_keys) @@ -99,12 +105,12 @@ class Queue: @classmethod def from_queue_key( - cls, - queue_key: str, - connection: Optional['Redis'] = None, - job_class: Optional[Type['Job']] = None, - serializer: Any = None, - death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, + cls, + queue_key: str, + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> 'Queue': """Returns a Queue instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Queues by their @@ -126,20 +132,25 @@ class Queue: prefix = cls.redis_queue_namespace_prefix if not queue_key.startswith(prefix): raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) - name = queue_key[len(prefix):] - return cls(name, connection=connection, job_class=job_class, serializer=serializer, - death_penalty_class=death_penalty_class) + name = queue_key[len(prefix) :] + return cls( + name, + connection=connection, + job_class=job_class, + serializer=serializer, + death_penalty_class=death_penalty_class, + ) def __init__( - self, - name: str = 'default', - default_timeout: Optional[int] = None, - connection: Optional['Redis'] = None, - is_async: bool = True, - job_class: Union[str, Type['Job'], None] = None, - serializer: Any = None, - death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty, - **kwargs, + self, + name: str = 'default', + default_timeout: Optional[int] = None, + connection: Optional['Redis'] = None, + is_async: bool = True, + job_class: Union[str, Type['Job'], None] = None, + serializer: Any = None, + death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty, + **kwargs, ): """Initializes a Queue object. @@ -207,6 +218,7 @@ class Queue: @property def scheduler_pid(self) -> int: from rq.scheduler import RQScheduler + pid = self.connection.get(RQScheduler.get_locking_key(self.name)) return int(pid.decode()) if pid is not None else None @@ -467,23 +479,23 @@ class Queue: self.log.debug('Pushed job %s into %s, %s job(s) are in queue.', blue(job_id), green(self.name), result) def create_job( - self, - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - depends_on: Optional['JobDependencyType'] = None, - job_id: Optional[str] = None, - meta: Optional[Dict] = None, - status: JobStatus = JobStatus.QUEUED, - retry: Optional['Retry'] = None, - *, - on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None, + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + meta: Optional[Dict] = None, + status: JobStatus = JobStatus.QUEUED, + retry: Optional['Retry'] = None, + *, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, ) -> Job: """Creates a job based on parameters given @@ -609,23 +621,23 @@ class Queue: return job def enqueue_call( - self, - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - depends_on: Optional['JobDependencyType'] = None, - job_id: Optional[str] = None, - at_front: bool = False, - meta: Optional[Dict] = None, - retry: Optional['Retry'] = None, - on_success: Optional[Callable[..., Any]] = None, - on_failure: Optional[Callable[..., Any]] = None, - pipeline: Optional['Pipeline'] = None, + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable[..., Any]] = None, + on_failure: Optional[Callable[..., Any]] = None, + pipeline: Optional['Pipeline'] = None, ) -> Job: """Creates a job to represent the delayed function call and enqueues it. @@ -676,20 +688,20 @@ class Queue: @staticmethod def prepare_data( - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - job_id: Optional[str] = None, - at_front: bool = False, - meta: Optional[Dict] = None, - retry: Optional['Retry'] = None, - on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, ) -> EnqueueData: """Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples And can keep this logic within EnqueueData @@ -1001,7 +1013,6 @@ class Queue: return self._enqueue_job(job, pipeline=pipeline, at_front=at_front) return job - def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: """Enqueues a job for delayed execution without checking dependencies. @@ -1071,7 +1082,7 @@ class Queue: return job def enqueue_dependents( - self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None + self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None ): """Enqueues all jobs in the given job's dependents set and clears it. @@ -1108,7 +1119,7 @@ class Queue: dependent_job_ids, connection=self.connection, serializer=self.serializer ) if dependent_job - and dependent_job.dependencies_are_met( + and dependent_job.dependencies_are_met( parent_job=job, pipeline=pipe, exclude_job_id=exclude_job_id, @@ -1208,13 +1219,13 @@ class Queue: @classmethod def dequeue_any( - cls, - queues: List['Queue'], - timeout: Optional[int], - connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, - serializer: Any = None, - death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, + cls, + queues: List['Queue'], + timeout: Optional[int], + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> Tuple['Job', 'Queue']: """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. @@ -1248,8 +1259,13 @@ class Queue: if result is None: return None queue_key, job_id = map(as_text, result) - queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, - serializer=serializer, death_penalty_class=death_penalty_class) + queue = cls.from_queue_key( + queue_key, + connection=connection, + job_class=job_class, + serializer=serializer, + death_penalty_class=death_penalty_class, + ) try: job = job_class.fetch(job_id, connection=connection, serializer=serializer) except NoSuchJobError: |