This class creates a thread to execute a function in loop and wait after
the function until the defined loop interval is reached.
A warning is emitted if the function duration is longer than the loop
interval.
Source code in cogip/utils/threadloop.py
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 | class ThreadLoop:
"""
This class creates a thread to execute a function in loop and wait after
the function until the defined loop interval is reached.
A warning is emitted if the function duration is longer than the loop
interval.
"""
def __init__(
self,
name: str,
interval: float,
func: Callable,
logger: bool | logging.Logger = False,
args: list[Any] | None = None,
kwargs: dict[str, Any] | None = None,
):
"""
Class constructor.
Arguments:
name: Name to identify the thread in the logs
interval: time between each iteration of the loop, in seconds
func: function to execute in the loop
logger: an optional custom logger
args: arguments of the function
kwargs: named arguments of the function
"""
self._name = name
self._interval = interval
self._func = func
self._args = args or []
self._kwargs = kwargs or {}
self._thread = threading.Thread(target=self.repeat)
self._cancel = False
self._logger = logging.getLogger(f"ThreadLoop: {name}")
if not isinstance(logger, bool):
self._logger = logger
else:
if self._logger.level == logging.NOTSET:
if logger:
self._logger.setLevel(logging.INFO)
else:
self._logger.setLevel(logging.ERROR)
self._logger.addHandler(logging.StreamHandler())
@property
def interval(self) -> float:
return self._interval
@interval.setter
def interval(self, value: float) -> None:
self._interval = value
def repeat(self) -> None:
"""
Loop function executed in the thread.
"""
while not self._cancel:
start = time.time()
self._func(*self._args, **self._kwargs)
now = time.time()
duration = now - start
if duration > self._interval:
self._logger.warning(f"Function too long: {duration} > {self._interval}")
else:
wait = self._interval - duration
time.sleep(wait)
def start(self) -> None:
"""
Start the thread loop.
"""
if self._thread.is_alive():
self._logger.warning(f"Already {'canceled' if self._cancel else 'running'}")
return
if self._cancel:
self._thread = threading.Thread(target=self.repeat)
self._cancel = False
self._thread.start()
def stop(self) -> None:
"""
Stop the thread loop.
"""
self._logger.debug("Stopping...")
if self._thread.is_alive():
self._cancel = True
try:
self._thread.join()
except KeyboardInterrupt:
pass
self._logger.debug("Stopped.")
|
__init__(name, interval, func, logger=False, args=None, kwargs=None)
Class constructor.
Parameters:
Name |
Type |
Description |
Default |
name |
str
|
Name to identify the thread in the logs
|
required
|
interval |
float
|
time between each iteration of the loop, in seconds
|
required
|
func |
Callable
|
function to execute in the loop
|
required
|
logger |
bool | Logger
|
an optional custom logger
|
False
|
args |
list[Any] | None
|
arguments of the function
|
None
|
kwargs |
dict[str, Any] | None
|
named arguments of the function
|
None
|
Source code in cogip/utils/threadloop.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 | def __init__(
self,
name: str,
interval: float,
func: Callable,
logger: bool | logging.Logger = False,
args: list[Any] | None = None,
kwargs: dict[str, Any] | None = None,
):
"""
Class constructor.
Arguments:
name: Name to identify the thread in the logs
interval: time between each iteration of the loop, in seconds
func: function to execute in the loop
logger: an optional custom logger
args: arguments of the function
kwargs: named arguments of the function
"""
self._name = name
self._interval = interval
self._func = func
self._args = args or []
self._kwargs = kwargs or {}
self._thread = threading.Thread(target=self.repeat)
self._cancel = False
self._logger = logging.getLogger(f"ThreadLoop: {name}")
if not isinstance(logger, bool):
self._logger = logger
else:
if self._logger.level == logging.NOTSET:
if logger:
self._logger.setLevel(logging.INFO)
else:
self._logger.setLevel(logging.ERROR)
self._logger.addHandler(logging.StreamHandler())
|
repeat()
Loop function executed in the thread.
Source code in cogip/utils/threadloop.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76 | def repeat(self) -> None:
"""
Loop function executed in the thread.
"""
while not self._cancel:
start = time.time()
self._func(*self._args, **self._kwargs)
now = time.time()
duration = now - start
if duration > self._interval:
self._logger.warning(f"Function too long: {duration} > {self._interval}")
else:
wait = self._interval - duration
time.sleep(wait)
|
start()
Start the thread loop.
Source code in cogip/utils/threadloop.py
78
79
80
81
82
83
84
85
86
87
88 | def start(self) -> None:
"""
Start the thread loop.
"""
if self._thread.is_alive():
self._logger.warning(f"Already {'canceled' if self._cancel else 'running'}")
return
if self._cancel:
self._thread = threading.Thread(target=self.repeat)
self._cancel = False
self._thread.start()
|
stop()
Stop the thread loop.
Source code in cogip/utils/threadloop.py
90
91
92
93
94
95
96
97
98
99
100
101 | def stop(self) -> None:
"""
Stop the thread loop.
"""
self._logger.debug("Stopping...")
if self._thread.is_alive():
self._cancel = True
try:
self._thread.join()
except KeyboardInterrupt:
pass
self._logger.debug("Stopped.")
|