Skip to content

threadloop

ThreadLoop #

This class creates a thread to execute a function in loop and wait after the function until the defined loop interval is reached. A warning is emitted if the function duration is longer than the loop interval.

Source code in cogip/utils/threadloop.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class ThreadLoop:
    """
    This class creates a thread to execute a function in loop and wait after
    the function until the defined loop interval is reached.
    A warning is emitted if the function duration is longer than the loop
    interval.
    """

    def __init__(
        self,
        name: str,
        interval: float,
        func: Callable,
        logger: bool | logging.Logger = False,
        args: list[Any] | None = None,
        kwargs: dict[str, Any] | None = None,
    ):
        """
        Class constructor.

        Arguments:
            name: Name to identify the thread in the logs
            interval: time between each iteration of the loop, in seconds
            func: function to execute in the loop
            logger: an optional custom logger
            args: arguments of the function
            kwargs: named arguments of the function
        """
        self._name = name
        self._interval = interval
        self._func = func
        self._args = args or []
        self._kwargs = kwargs or {}
        self._thread = threading.Thread(target=self.repeat)
        self._cancel = False
        self._logger = logging.getLogger(f"ThreadLoop: {name}")

        if not isinstance(logger, bool):
            self._logger = logger
        else:
            if self._logger.level == logging.NOTSET:
                if logger:
                    self._logger.setLevel(logging.INFO)
                else:
                    self._logger.setLevel(logging.ERROR)
                self._logger.addHandler(logging.StreamHandler())

    @property
    def interval(self) -> float:
        return self._interval

    @interval.setter
    def interval(self, value: float) -> None:
        self._interval = value

    def repeat(self) -> None:
        """
        Loop function executed in the thread.
        """
        while not self._cancel:
            start = time.time()
            self._func(*self._args, **self._kwargs)
            now = time.time()
            duration = now - start
            if duration > self._interval:
                self._logger.warning(f"Function too long: {duration} > {self._interval}")
            else:
                wait = self._interval - duration
                time.sleep(wait)

    def start(self) -> None:
        """
        Start the thread loop.
        """
        if self._thread.is_alive():
            self._logger.warning(f"Already {'canceled' if self._cancel else 'running'}")
            return
        if self._cancel:
            self._thread = threading.Thread(target=self.repeat)
            self._cancel = False
        self._thread.start()

    def stop(self) -> None:
        """
        Stop the thread loop.
        """
        self._logger.debug("Stopping...")
        if self._thread.is_alive():
            self._cancel = True
            try:
                self._thread.join()
            except KeyboardInterrupt:
                pass
            self._logger.debug("Stopped.")

__init__(name, interval, func, logger=False, args=None, kwargs=None) #

Class constructor.

Parameters:

Name Type Description Default
name str

Name to identify the thread in the logs

required
interval float

time between each iteration of the loop, in seconds

required
func Callable

function to execute in the loop

required
logger bool | Logger

an optional custom logger

False
args list[Any] | None

arguments of the function

None
kwargs dict[str, Any] | None

named arguments of the function

None
Source code in cogip/utils/threadloop.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    name: str,
    interval: float,
    func: Callable,
    logger: bool | logging.Logger = False,
    args: list[Any] | None = None,
    kwargs: dict[str, Any] | None = None,
):
    """
    Class constructor.

    Arguments:
        name: Name to identify the thread in the logs
        interval: time between each iteration of the loop, in seconds
        func: function to execute in the loop
        logger: an optional custom logger
        args: arguments of the function
        kwargs: named arguments of the function
    """
    self._name = name
    self._interval = interval
    self._func = func
    self._args = args or []
    self._kwargs = kwargs or {}
    self._thread = threading.Thread(target=self.repeat)
    self._cancel = False
    self._logger = logging.getLogger(f"ThreadLoop: {name}")

    if not isinstance(logger, bool):
        self._logger = logger
    else:
        if self._logger.level == logging.NOTSET:
            if logger:
                self._logger.setLevel(logging.INFO)
            else:
                self._logger.setLevel(logging.ERROR)
            self._logger.addHandler(logging.StreamHandler())

repeat() #

Loop function executed in the thread.

Source code in cogip/utils/threadloop.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def repeat(self) -> None:
    """
    Loop function executed in the thread.
    """
    while not self._cancel:
        start = time.time()
        self._func(*self._args, **self._kwargs)
        now = time.time()
        duration = now - start
        if duration > self._interval:
            self._logger.warning(f"Function too long: {duration} > {self._interval}")
        else:
            wait = self._interval - duration
            time.sleep(wait)

start() #

Start the thread loop.

Source code in cogip/utils/threadloop.py
78
79
80
81
82
83
84
85
86
87
88
def start(self) -> None:
    """
    Start the thread loop.
    """
    if self._thread.is_alive():
        self._logger.warning(f"Already {'canceled' if self._cancel else 'running'}")
        return
    if self._cancel:
        self._thread = threading.Thread(target=self.repeat)
        self._cancel = False
    self._thread.start()

stop() #

Stop the thread loop.

Source code in cogip/utils/threadloop.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def stop(self) -> None:
    """
    Stop the thread loop.
    """
    self._logger.debug("Stopping...")
    if self._thread.is_alive():
        self._cancel = True
        try:
            self._thread.join()
        except KeyboardInterrupt:
            pass
        self._logger.debug("Stopped.")