Skip to content

asyncloop

AsyncLoop #

This class creates a async task to execute a function in loop and wait after the function until the defined loop interval is reached. A warning is emitted if the function duration is longer than the loop interval.

Source code in cogip/utils/asyncloop.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class AsyncLoop:
    """
    This class creates a async task to execute a function in loop and wait after
    the function until the defined loop interval is reached.
    A warning is emitted if the function duration is longer than the loop
    interval.
    """

    def __init__(
        self,
        name: str,
        interval: float,
        func: Callable,
        logger: bool | logging.Logger = False,
        args: list[Any] | None = None,
        kwargs: dict[str, Any] | None = None,
    ):
        """
        Class constructor.

        Arguments:
            name: Name to identify the thread in the logs
            interval: time between each iteration of the loop, in seconds
            func: function to execute in the loop
            logger: an optional custom logger
            args: arguments of the function
            kwargs: named arguments of the function
        """
        self._name = name
        self.interval = interval
        self._func = func
        self._args = args or []
        self._kwargs = kwargs or {}
        self._logger = logging.getLogger(f"AsyncLoop: {name}")
        self._task: asyncio.Task | None = None
        self.exit: bool = False

        if not isinstance(logger, bool):
            self._logger = logger
        else:
            if self._logger.level == logging.NOTSET:
                if logger:
                    self._logger.setLevel(logging.DEBUG)
                else:
                    self._logger.setLevel(logging.INFO)

    async def task(self) -> None:
        """
        Loop function executed in the task.
        """
        self._logger.info("Task started")

        try:
            while not self.exit:
                start = time.time()
                await self._func(*self._args, **self._kwargs)
                now = time.time()
                duration = now - start
                if duration > self.interval:
                    self._logger.warning(f"Function too long: {duration} > {self.interval}")
                else:
                    wait = self.interval - duration
                    await asyncio.sleep(wait)
        except asyncio.CancelledError:
            self._logger.info("Task cancelled")
            raise

    def start(self):
        if self._task:
            self._logger.warning("Already started")
            return

        self.exit = False
        self._task = asyncio.create_task(self.task(), name=self._name)

    async def stop(self):
        if not self._task:
            self._logger.warning("Not running")
            return

        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            self._logger.info("Task cancelled")

        self._task = None

__init__(name, interval, func, logger=False, args=None, kwargs=None) #

Class constructor.

Parameters:

Name Type Description Default
name str

Name to identify the thread in the logs

required
interval float

time between each iteration of the loop, in seconds

required
func Callable

function to execute in the loop

required
logger bool | Logger

an optional custom logger

False
args list[Any] | None

arguments of the function

None
kwargs dict[str, Any] | None

named arguments of the function

None
Source code in cogip/utils/asyncloop.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    name: str,
    interval: float,
    func: Callable,
    logger: bool | logging.Logger = False,
    args: list[Any] | None = None,
    kwargs: dict[str, Any] | None = None,
):
    """
    Class constructor.

    Arguments:
        name: Name to identify the thread in the logs
        interval: time between each iteration of the loop, in seconds
        func: function to execute in the loop
        logger: an optional custom logger
        args: arguments of the function
        kwargs: named arguments of the function
    """
    self._name = name
    self.interval = interval
    self._func = func
    self._args = args or []
    self._kwargs = kwargs or {}
    self._logger = logging.getLogger(f"AsyncLoop: {name}")
    self._task: asyncio.Task | None = None
    self.exit: bool = False

    if not isinstance(logger, bool):
        self._logger = logger
    else:
        if self._logger.level == logging.NOTSET:
            if logger:
                self._logger.setLevel(logging.DEBUG)
            else:
                self._logger.setLevel(logging.INFO)

task() async #

Loop function executed in the task.

Source code in cogip/utils/asyncloop.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
async def task(self) -> None:
    """
    Loop function executed in the task.
    """
    self._logger.info("Task started")

    try:
        while not self.exit:
            start = time.time()
            await self._func(*self._args, **self._kwargs)
            now = time.time()
            duration = now - start
            if duration > self.interval:
                self._logger.warning(f"Function too long: {duration} > {self.interval}")
            else:
                wait = self.interval - duration
                await asyncio.sleep(wait)
    except asyncio.CancelledError:
        self._logger.info("Task cancelled")
        raise