Skip to content

threadloop

ThreadLoop #

This class creates a thread to execute a function in loop and wait after the function until the defined loop interval is reached. A warning is emitted if the function duration is longer than the loop interval.

Source code in cogip/utils/threadloop.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class ThreadLoop:
    """
    This class creates a thread to execute a function in loop and wait after
    the function until the defined loop interval is reached.
    A warning is emitted if the function duration is longer than the loop
    interval.
    """

    def __init__(
        self,
        name: str,
        interval: float,
        func: Callable,
        logger: bool | logging.Logger = False,
        args: list[Any] | None = None,
        kwargs: dict[str, Any] | None = None,
    ):
        """
        Class constructor.

        Arguments:
            name: Name to identify the thread in the logs
            interval: time between each iteration of the loop, in seconds
                      if 0, the function is supposed to have its own sleep time.
                      if < 0, like 0 but always display a loop duration
            func: function to execute in the loop
            logger: an optional custom logger
            args: arguments of the function
            kwargs: named arguments of the function
        """
        self._name = name
        self._interval = interval
        self._func = func
        self._args = args or []
        self._kwargs = kwargs or {}
        self._thread = threading.Thread(target=self.repeat)
        self._cancel = False
        self._logger = logging.getLogger(f"ThreadLoop: {name}")

        if not isinstance(logger, bool):
            self._logger = logger
        else:
            if self._logger.level == logging.NOTSET:
                if logger:
                    self._logger.setLevel(logging.INFO)
                else:
                    self._logger.setLevel(logging.ERROR)
                self._logger.addHandler(logging.StreamHandler())

    @property
    def interval(self) -> float:
        return self._interval

    @interval.setter
    def interval(self, value: float) -> None:
        self._interval = value

    def repeat(self) -> None:
        """
        Loop function executed in the thread.
        """
        while not self._cancel:
            start = time.time()
            self._func(*self._args, **self._kwargs)
            now = time.time()
            duration = now - start
            if self._interval > 0:
                if duration > self._interval:
                    self._logger.warning(f"Function too long: {duration} > {self._interval}")
                else:
                    wait = self._interval - duration
                    time.sleep(wait)
            elif self._interval < 0:
                self._logger.info(f"Function duration: {duration}")

    def start(self) -> None:
        """
        Start the thread loop.
        """
        if self._thread.is_alive():
            self._logger.warning(f"Already {'canceled' if self._cancel else 'running'}")
            return
        if self._cancel:
            self._thread = threading.Thread(target=self.repeat)
            self._cancel = False
        self._thread.start()

    def stop(self) -> None:
        """
        Stop the thread loop.
        """
        self._logger.debug("Stopping...")
        if self._thread.is_alive():
            self._cancel = True
            try:
                self._thread.join()
            except KeyboardInterrupt:
                pass
            self._logger.debug("Stopped.")

__init__(name, interval, func, logger=False, args=None, kwargs=None) #

Class constructor.

Parameters:

Name Type Description Default
name str

Name to identify the thread in the logs

required
interval float

time between each iteration of the loop, in seconds if 0, the function is supposed to have its own sleep time. if < 0, like 0 but always display a loop duration

required
func Callable

function to execute in the loop

required
logger bool | Logger

an optional custom logger

False
args list[Any] | None

arguments of the function

None
kwargs dict[str, Any] | None

named arguments of the function

None
Source code in cogip/utils/threadloop.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(
    self,
    name: str,
    interval: float,
    func: Callable,
    logger: bool | logging.Logger = False,
    args: list[Any] | None = None,
    kwargs: dict[str, Any] | None = None,
):
    """
    Class constructor.

    Arguments:
        name: Name to identify the thread in the logs
        interval: time between each iteration of the loop, in seconds
                  if 0, the function is supposed to have its own sleep time.
                  if < 0, like 0 but always display a loop duration
        func: function to execute in the loop
        logger: an optional custom logger
        args: arguments of the function
        kwargs: named arguments of the function
    """
    self._name = name
    self._interval = interval
    self._func = func
    self._args = args or []
    self._kwargs = kwargs or {}
    self._thread = threading.Thread(target=self.repeat)
    self._cancel = False
    self._logger = logging.getLogger(f"ThreadLoop: {name}")

    if not isinstance(logger, bool):
        self._logger = logger
    else:
        if self._logger.level == logging.NOTSET:
            if logger:
                self._logger.setLevel(logging.INFO)
            else:
                self._logger.setLevel(logging.ERROR)
            self._logger.addHandler(logging.StreamHandler())

repeat() #

Loop function executed in the thread.

Source code in cogip/utils/threadloop.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def repeat(self) -> None:
    """
    Loop function executed in the thread.
    """
    while not self._cancel:
        start = time.time()
        self._func(*self._args, **self._kwargs)
        now = time.time()
        duration = now - start
        if self._interval > 0:
            if duration > self._interval:
                self._logger.warning(f"Function too long: {duration} > {self._interval}")
            else:
                wait = self._interval - duration
                time.sleep(wait)
        elif self._interval < 0:
            self._logger.info(f"Function duration: {duration}")

start() #

Start the thread loop.

Source code in cogip/utils/threadloop.py
83
84
85
86
87
88
89
90
91
92
93
def start(self) -> None:
    """
    Start the thread loop.
    """
    if self._thread.is_alive():
        self._logger.warning(f"Already {'canceled' if self._cancel else 'running'}")
        return
    if self._cancel:
        self._thread = threading.Thread(target=self.repeat)
        self._cancel = False
    self._thread.start()

stop() #

Stop the thread loop.

Source code in cogip/utils/threadloop.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def stop(self) -> None:
    """
    Stop the thread loop.
    """
    self._logger.debug("Stopping...")
    if self._thread.is_alive():
        self._cancel = True
        try:
            self._thread.join()
        except KeyboardInterrupt:
            pass
        self._logger.debug("Stopped.")