diff options
| author | Selwin Ong <selwin.ong@gmail.com> | 2017-08-13 08:14:48 +0700 |
|---|---|---|
| committer | Selwin Ong <selwin.ong@gmail.com> | 2017-08-13 08:14:48 +0700 |
| commit | e4ab9fd1c4d4e7ffe4b85252e94df43be962113a (patch) | |
| tree | 9450257e05f882a4c08a4807697496adf2ab09d8 | |
| parent | 04258d994640a2fa65c78205e325defb5baff62d (diff) | |
| download | rq-worker-stats.tar.gz | |
Worker now keeps track of its last heartbeat.worker-stats
| -rw-r--r-- | rq/worker.py | 17 | ||||
| -rw-r--r-- | tests/test_worker.py | 16 |
2 files changed, 29 insertions, 4 deletions
diff --git a/rq/worker.py b/rq/worker.py index d163082..272f300 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -12,7 +12,7 @@ import sys import time import traceback import warnings -from datetime import timedelta +from datetime import datetime, timedelta try: from signal import SIGKILL @@ -132,10 +132,13 @@ class Worker(object): connection=connection, job_class=job_class, queue_class=queue_class) - queues, state, job_id = connection.hmget(worker.key, 'queues', 'state', 'current_job') + queues, state, job_id, last_heartbeat = connection.hmget(worker.key, 'queues', 'state', 'current_job', + 'last_heartbeat') queues = as_text(queues) worker._state = as_text(state or '?') worker._job_id = job_id or None + if last_heartbeat: + worker._last_heartbeat = float(last_heartbeat) if queues: worker.queues = [worker.queue_class(queue, connection=connection, @@ -179,6 +182,7 @@ class Worker(object): self.failed_queue = get_failed_queue(connection=self.connection, job_class=self.job_class) self.last_cleaned_at = None + self._last_heartbeat = None # By default, push the "move-to-failed-queue" exception handler onto # the stack @@ -253,6 +257,13 @@ class Worker(object): """ setprocname('rq: {0}'.format(message)) + @property + def last_heartbeat(self): + """Returns last heartbeat in Python's datetime object.""" + if self._last_heartbeat: + return datetime.fromtimestamp(self._last_heartbeat) + + def register_birth(self): """Registers its own birth.""" self.log.debug('Registering birth of worker {0}'.format(self.name)) @@ -525,6 +536,8 @@ class Worker(object): timeout = max(timeout, self.default_worker_ttl) connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, timeout) + self._last_heartbeat = time.time() + connection.hset(self.key, 'last_heartbeat', self._last_heartbeat) self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within {0} seconds.'.format(timeout)) diff --git a/tests/test_worker.py b/tests/test_worker.py index 19d44a8..49af464 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function, import os import shutil -from datetime import timedelta +from datetime import date, timedelta from time import sleep import signal import time @@ -106,16 +106,28 @@ class TestWorker(RQTestCase): ) def test_find_by_key(self): - """Worker.find_by_key restores queues, state and job_id.""" + """Worker.find_by_key restores worker state.""" queues = [Queue('foo'), Queue('bar')] w = Worker(queues) w.register_death() w.register_birth() w.set_state(WorkerStatus.STARTED) + worker = Worker.find_by_key(w.key) self.assertEqual(worker.queues, queues) self.assertEqual(worker.get_state(), WorkerStatus.STARTED) self.assertEqual(worker._job_id, None) + self.assertEqual(worker._last_heartbeat, None) + self.assertEqual(worker.last_heartbeat, None) + + # last_heartbeat should be restored after successful heartbeat + timestamp = time.time() + w.heartbeat(timeout=2) + worker = Worker.find_by_key(w.key) + self.assertEqual(worker.get_state(), WorkerStatus.STARTED) + self.assertEqual(worker.last_heartbeat.date(), date.today()) + self.assertTrue(timestamp - 1 < worker._last_heartbeat < timestamp + 1) + w.register_death() def test_worker_ttl(self): |
