blob: 41ba025b28563952ad3f39a6e8e023c7836d35e7 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
import copy
import time
import kafka.common as Errors
class Heartbeat(object):
DEFAULT_CONFIG = {
'heartbeat_interval_ms': 3000,
'session_timeout_ms': 30000,
}
def __init__(self, **configs):
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
self.config[key] = configs[key]
if self.config['heartbeat_interval_ms'] > self.config['session_timeout_ms']:
raise Errors.IllegalArgumentError("Heartbeat interval must be set"
" lower than the session timeout")
self.interval = self.config['heartbeat_interval_ms'] / 1000.0
self.timeout = self.config['session_timeout_ms'] / 1000.0
self.last_send = 0
self.last_receive = 0
self.last_reset = time.time()
def sent_heartbeat(self):
self.last_send = time.time()
def received_heartbeat(self):
self.last_receive = time.time()
def ttl(self):
last_beat = max(self.last_send, self.last_reset)
return max(0, last_beat + self.interval - time.time())
def should_heartbeat(self):
return self.ttl() == 0
def session_expired(self):
last_recv = max(self.last_receive, self.last_reset)
return (time.time() - last_recv) > self.timeout
def reset_session_timeout(self):
self.last_reset = time.time()
|