summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2017-08-13 08:14:48 +0700
committerSelwin Ong <selwin.ong@gmail.com>2017-08-13 08:14:48 +0700
commite4ab9fd1c4d4e7ffe4b85252e94df43be962113a (patch)
tree9450257e05f882a4c08a4807697496adf2ab09d8
parent04258d994640a2fa65c78205e325defb5baff62d (diff)
downloadrq-worker-stats.tar.gz
Worker now keeps track of its last heartbeat.worker-stats
-rw-r--r--rq/worker.py17
-rw-r--r--tests/test_worker.py16
2 files changed, 29 insertions, 4 deletions
diff --git a/rq/worker.py b/rq/worker.py
index d163082..272f300 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -12,7 +12,7 @@ import sys
import time
import traceback
import warnings
-from datetime import timedelta
+from datetime import datetime, timedelta
try:
from signal import SIGKILL
@@ -132,10 +132,13 @@ class Worker(object):
connection=connection,
job_class=job_class,
queue_class=queue_class)
- queues, state, job_id = connection.hmget(worker.key, 'queues', 'state', 'current_job')
+ queues, state, job_id, last_heartbeat = connection.hmget(worker.key, 'queues', 'state', 'current_job',
+ 'last_heartbeat')
queues = as_text(queues)
worker._state = as_text(state or '?')
worker._job_id = job_id or None
+ if last_heartbeat:
+ worker._last_heartbeat = float(last_heartbeat)
if queues:
worker.queues = [worker.queue_class(queue,
connection=connection,
@@ -179,6 +182,7 @@ class Worker(object):
self.failed_queue = get_failed_queue(connection=self.connection,
job_class=self.job_class)
self.last_cleaned_at = None
+ self._last_heartbeat = None
# By default, push the "move-to-failed-queue" exception handler onto
# the stack
@@ -253,6 +257,13 @@ class Worker(object):
"""
setprocname('rq: {0}'.format(message))
+ @property
+ def last_heartbeat(self):
+ """Returns last heartbeat in Python's datetime object."""
+ if self._last_heartbeat:
+ return datetime.fromtimestamp(self._last_heartbeat)
+
+
def register_birth(self):
"""Registers its own birth."""
self.log.debug('Registering birth of worker {0}'.format(self.name))
@@ -525,6 +536,8 @@ class Worker(object):
timeout = max(timeout, self.default_worker_ttl)
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout)
+ self._last_heartbeat = time.time()
+ connection.hset(self.key, 'last_heartbeat', self._last_heartbeat)
self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout))
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 19d44a8..49af464 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import os
import shutil
-from datetime import timedelta
+from datetime import date, timedelta
from time import sleep
import signal
import time
@@ -106,16 +106,28 @@ class TestWorker(RQTestCase):
)
def test_find_by_key(self):
- """Worker.find_by_key restores queues, state and job_id."""
+ """Worker.find_by_key restores worker state."""
queues = [Queue('foo'), Queue('bar')]
w = Worker(queues)
w.register_death()
w.register_birth()
w.set_state(WorkerStatus.STARTED)
+
worker = Worker.find_by_key(w.key)
self.assertEqual(worker.queues, queues)
self.assertEqual(worker.get_state(), WorkerStatus.STARTED)
self.assertEqual(worker._job_id, None)
+ self.assertEqual(worker._last_heartbeat, None)
+ self.assertEqual(worker.last_heartbeat, None)
+
+ # last_heartbeat should be restored after successful heartbeat
+ timestamp = time.time()
+ w.heartbeat(timeout=2)
+ worker = Worker.find_by_key(w.key)
+ self.assertEqual(worker.get_state(), WorkerStatus.STARTED)
+ self.assertEqual(worker.last_heartbeat.date(), date.today())
+ self.assertTrue(timestamp - 1 < worker._last_heartbeat < timestamp + 1)
+
w.register_death()
def test_worker_ttl(self):