blob: cedd64c1a85564c7e0e912c44a0b38839b72ba68 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
from __future__ import absolute_import
import heapq
import threading
class PunctuationQueue(object):
def __init__(self):
self._pq = []
self._lock = threading.Lock()
def schedule(self, sched):
with self._lock:
heapq.heappush(self._pq, sched)
def close(self):
with self._lock:
self._pq = []
def may_punctuate(self, timestamp, punctuator):
with self._lock:
punctuated = False
while (self._pq and self._pq[0][0] <= timestamp):
old_ts, node, interval_ms = heapq.heappop(self._pq)
if old_ts == 0:
old_ts = timestamp
punctuator.punctuate(node, timestamp)
sched = (old_ts + interval_ms, node, interval_ms)
heapq.heappush(self._pq, sched)
punctuated = True
return punctuated
|