Skip to content

event_manager

EventManager #

Manages asynchronous events and tasks for the Planner.

Source code in cogip/tools/planner/event_manager.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class EventManager:
    """
    Manages asynchronous events and tasks for the Planner.
    """

    def __init__(self, planner: "Planner"):
        self.planner = planner
        self.blocked_event_task: asyncio.Task | None = None
        self.new_path_event_task: asyncio.Task | None = None
        self.countdown_task: asyncio.Task | None = None

    async def start_loops(self):
        """Start all async event loops."""
        self.blocked_event_task = asyncio.create_task(
            self.blocked_event_loop(),
            name="Planner: Task Blocked Event Watcher Loop",
        )
        self.new_path_event_task = asyncio.create_task(
            self.new_path_event_loop(),
            name="Planner: Task New Path Event Watcher Loop",
        )
        await self.countdown_start()

    async def stop_loops(self):
        """Stop all async event loops."""
        await self.countdown_stop()

        if self.blocked_event_task:
            self.blocked_event_task.cancel()
            try:
                await self.blocked_event_task
            except asyncio.CancelledError:
                logger.info("Planner: Task Blocked Event Watcher Loop stopped")
            except Exception as exc:
                logger.warning(f"Planner: Unexpected exception {exc}")
            self.blocked_event_task = None

        if self.new_path_event_task:
            self.new_path_event_task.cancel()
            try:
                await self.new_path_event_task
            except asyncio.CancelledError:
                logger.info("Planner: Task New Path Event Watcher Loop stopped")
            except Exception as exc:
                logger.warning(f"Planner: Unexpected exception {exc}")
                traceback.print_exc()
            self.new_path_event_task = None

    async def blocked_event_loop(self):
        """Watches for robot blocked events."""
        logger.info("Planner: Task Blocked Event Watcher Loop started")
        try:
            while True:
                await asyncio.to_thread(self.planner.shared_avoidance_blocked_lock.wait_update)
                if self.planner.sio.connected:
                    await self.planner.sio_ns.emit("brake")
                self.planner.blocked_counter += 1
                if self.planner.blocked_counter > 10:
                    self.planner.blocked_counter = 0
                    await self.planner.blocked()
        except asyncio.CancelledError:
            logger.info("Planner: Task Blocked Event Watcher Loop cancelled")
            raise
        except Exception as exc:  # noqa
            logger.warning(f"Planner: Task Blocked Event Watcher Loop: Unknown exception {exc}")
            traceback.print_exc()
            raise

    async def new_path_event_loop(self):
        """Watches for new path events."""
        logger.info("Planner: Task New Path Event Watcher Loop started")
        try:
            while True:
                await asyncio.to_thread(self.planner.shared_avoidance_path_lock.wait_update)
                self.planner.blocked_counter = 0
                if self.planner.pose_order:
                    await self.planner.pose_order.act_intermediate_pose()
        except asyncio.CancelledError:
            logger.info("Planner: Task New Path Event Watcher Loop cancelled")
            raise
        except Exception as exc:  # noqa
            logger.warning(f"Planner: Task New Path Event Watcher Loop: Unknown exception {exc}")
            traceback.print_exc()
            raise

    async def countdown_loop(self):
        """Manages the game countdown."""
        logger.info("Planner: Task Countdown started")
        try:
            self.planner.game_context.last_countdown = self.planner.game_context.countdown
            while True:
                await asyncio.sleep(0.2)

                if not self.planner.playing:
                    continue

                now = datetime.now(UTC)
                self.planner.game_context.countdown = (
                    self.planner.game_context.game_duration
                    - (now - self.planner.countdown_start_timestamp).total_seconds()
                )

                logger.info(f"Planner: countdown = {self.planner.game_context.countdown: 3.2f}")
                if (
                    self.planner.robot_id > 1
                    and self.planner.game_context.countdown < 15
                    and self.planner.game_context.last_countdown > 15
                ):
                    logger.info("Planner: countdown==15: start PAMI")
                    self.planner.pami_event.set()
                if (
                    self.planner.robot_id == 1
                    and self.planner.game_context.countdown < 7
                    and self.planner.game_context.last_countdown > 7
                ):
                    logger.info("Planner: countdown==7: disable GOAP, force blocked")
                    self.planner.strategy.goap_allowed = False
                    asyncio.create_task(self.planner.blocked())
                if self.planner.game_context.countdown < 0 and self.planner.game_context.last_countdown > 0:
                    logger.info("Planner: countdown==0: final action")
                    await self.planner.final_action()
                if self.planner.game_context.countdown < -5 and self.planner.game_context.last_countdown > -5:
                    await self.planner.sio_ns.emit("stop_video_record")
                self.planner.game_context.last_countdown = self.planner.game_context.countdown
        except asyncio.CancelledError:
            logger.info("Planner: Task Countdown cancelled")
            raise
        except Exception as exc:  # noqa
            logger.warning(f"Planner: Unknown exception {exc}")
            raise

    async def countdown_start(self):
        """Starts the countdown task."""
        await self.countdown_stop()
        self.countdown_task = asyncio.create_task(self.countdown_loop())

    async def countdown_stop(self):
        """Stops the countdown task."""
        if self.countdown_task is None:
            return

        self.countdown_task.cancel()
        try:
            await self.countdown_task
        except asyncio.CancelledError:
            logger.info("Planner: Task Countdown stopped")
        except Exception as exc:
            logger.warning(f"Planner: Unexpected exception {exc}")

        self.countdown_task = None

blocked_event_loop() async #

Watches for robot blocked events.

Source code in cogip/tools/planner/event_manager.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def blocked_event_loop(self):
    """Watches for robot blocked events."""
    logger.info("Planner: Task Blocked Event Watcher Loop started")
    try:
        while True:
            await asyncio.to_thread(self.planner.shared_avoidance_blocked_lock.wait_update)
            if self.planner.sio.connected:
                await self.planner.sio_ns.emit("brake")
            self.planner.blocked_counter += 1
            if self.planner.blocked_counter > 10:
                self.planner.blocked_counter = 0
                await self.planner.blocked()
    except asyncio.CancelledError:
        logger.info("Planner: Task Blocked Event Watcher Loop cancelled")
        raise
    except Exception as exc:  # noqa
        logger.warning(f"Planner: Task Blocked Event Watcher Loop: Unknown exception {exc}")
        traceback.print_exc()
        raise

countdown_loop() async #

Manages the game countdown.

Source code in cogip/tools/planner/event_manager.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
async def countdown_loop(self):
    """Manages the game countdown."""
    logger.info("Planner: Task Countdown started")
    try:
        self.planner.game_context.last_countdown = self.planner.game_context.countdown
        while True:
            await asyncio.sleep(0.2)

            if not self.planner.playing:
                continue

            now = datetime.now(UTC)
            self.planner.game_context.countdown = (
                self.planner.game_context.game_duration
                - (now - self.planner.countdown_start_timestamp).total_seconds()
            )

            logger.info(f"Planner: countdown = {self.planner.game_context.countdown: 3.2f}")
            if (
                self.planner.robot_id > 1
                and self.planner.game_context.countdown < 15
                and self.planner.game_context.last_countdown > 15
            ):
                logger.info("Planner: countdown==15: start PAMI")
                self.planner.pami_event.set()
            if (
                self.planner.robot_id == 1
                and self.planner.game_context.countdown < 7
                and self.planner.game_context.last_countdown > 7
            ):
                logger.info("Planner: countdown==7: disable GOAP, force blocked")
                self.planner.strategy.goap_allowed = False
                asyncio.create_task(self.planner.blocked())
            if self.planner.game_context.countdown < 0 and self.planner.game_context.last_countdown > 0:
                logger.info("Planner: countdown==0: final action")
                await self.planner.final_action()
            if self.planner.game_context.countdown < -5 and self.planner.game_context.last_countdown > -5:
                await self.planner.sio_ns.emit("stop_video_record")
            self.planner.game_context.last_countdown = self.planner.game_context.countdown
    except asyncio.CancelledError:
        logger.info("Planner: Task Countdown cancelled")
        raise
    except Exception as exc:  # noqa
        logger.warning(f"Planner: Unknown exception {exc}")
        raise

countdown_start() async #

Starts the countdown task.

Source code in cogip/tools/planner/event_manager.py
143
144
145
146
async def countdown_start(self):
    """Starts the countdown task."""
    await self.countdown_stop()
    self.countdown_task = asyncio.create_task(self.countdown_loop())

countdown_stop() async #

Stops the countdown task.

Source code in cogip/tools/planner/event_manager.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def countdown_stop(self):
    """Stops the countdown task."""
    if self.countdown_task is None:
        return

    self.countdown_task.cancel()
    try:
        await self.countdown_task
    except asyncio.CancelledError:
        logger.info("Planner: Task Countdown stopped")
    except Exception as exc:
        logger.warning(f"Planner: Unexpected exception {exc}")

    self.countdown_task = None

new_path_event_loop() async #

Watches for new path events.

Source code in cogip/tools/planner/event_manager.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
async def new_path_event_loop(self):
    """Watches for new path events."""
    logger.info("Planner: Task New Path Event Watcher Loop started")
    try:
        while True:
            await asyncio.to_thread(self.planner.shared_avoidance_path_lock.wait_update)
            self.planner.blocked_counter = 0
            if self.planner.pose_order:
                await self.planner.pose_order.act_intermediate_pose()
    except asyncio.CancelledError:
        logger.info("Planner: Task New Path Event Watcher Loop cancelled")
        raise
    except Exception as exc:  # noqa
        logger.warning(f"Planner: Task New Path Event Watcher Loop: Unknown exception {exc}")
        traceback.print_exc()
        raise

start_loops() async #

Start all async event loops.

Source code in cogip/tools/planner/event_manager.py
23
24
25
26
27
28
29
30
31
32
33
async def start_loops(self):
    """Start all async event loops."""
    self.blocked_event_task = asyncio.create_task(
        self.blocked_event_loop(),
        name="Planner: Task Blocked Event Watcher Loop",
    )
    self.new_path_event_task = asyncio.create_task(
        self.new_path_event_loop(),
        name="Planner: Task New Path Event Watcher Loop",
    )
    await self.countdown_start()

stop_loops() async #

Stop all async event loops.

Source code in cogip/tools/planner/event_manager.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
async def stop_loops(self):
    """Stop all async event loops."""
    await self.countdown_stop()

    if self.blocked_event_task:
        self.blocked_event_task.cancel()
        try:
            await self.blocked_event_task
        except asyncio.CancelledError:
            logger.info("Planner: Task Blocked Event Watcher Loop stopped")
        except Exception as exc:
            logger.warning(f"Planner: Unexpected exception {exc}")
        self.blocked_event_task = None

    if self.new_path_event_task:
        self.new_path_event_task.cancel()
        try:
            await self.new_path_event_task
        except asyncio.CancelledError:
            logger.info("Planner: Task New Path Event Watcher Loop stopped")
        except Exception as exc:
            logger.warning(f"Planner: Unexpected exception {exc}")
            traceback.print_exc()
        self.new_path_event_task = None