Manages asynchronous events and tasks for the Planner.
Source code in cogip/tools/planner/event_manager.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 | class EventManager:
"""
Manages asynchronous events and tasks for the Planner.
"""
def __init__(self, planner: "Planner"):
self.planner = planner
self.blocked_event_task: asyncio.Task | None = None
self.new_path_event_task: asyncio.Task | None = None
self.countdown_task: asyncio.Task | None = None
async def start_loops(self):
"""Start all async event loops."""
self.blocked_event_task = asyncio.create_task(
self.blocked_event_loop(),
name="Planner: Task Blocked Event Watcher Loop",
)
self.new_path_event_task = asyncio.create_task(
self.new_path_event_loop(),
name="Planner: Task New Path Event Watcher Loop",
)
await self.countdown_start()
async def stop_loops(self):
"""Stop all async event loops."""
await self.countdown_stop()
if self.blocked_event_task:
self.blocked_event_task.cancel()
try:
await self.blocked_event_task
except asyncio.CancelledError:
logger.info("Planner: Task Blocked Event Watcher Loop stopped")
except Exception as exc:
logger.warning(f"Planner: Unexpected exception {exc}")
self.blocked_event_task = None
if self.new_path_event_task:
self.new_path_event_task.cancel()
try:
await self.new_path_event_task
except asyncio.CancelledError:
logger.info("Planner: Task New Path Event Watcher Loop stopped")
except Exception as exc:
logger.warning(f"Planner: Unexpected exception {exc}")
traceback.print_exc()
self.new_path_event_task = None
async def blocked_event_loop(self):
"""Watches for robot blocked events."""
logger.info("Planner: Task Blocked Event Watcher Loop started")
try:
while True:
await asyncio.to_thread(self.planner.shared_avoidance_blocked_lock.wait_update)
if self.planner.sio.connected:
await self.planner.sio_ns.emit("brake")
self.planner.blocked_counter += 1
if self.planner.blocked_counter > 10:
self.planner.blocked_counter = 0
await self.planner.blocked()
except asyncio.CancelledError:
logger.info("Planner: Task Blocked Event Watcher Loop cancelled")
raise
except Exception as exc: # noqa
logger.warning(f"Planner: Task Blocked Event Watcher Loop: Unknown exception {exc}")
traceback.print_exc()
raise
async def new_path_event_loop(self):
"""Watches for new path events."""
logger.info("Planner: Task New Path Event Watcher Loop started")
try:
while True:
await asyncio.to_thread(self.planner.shared_avoidance_path_lock.wait_update)
self.planner.blocked_counter = 0
if self.planner.pose_order:
await self.planner.pose_order.act_intermediate_pose()
except asyncio.CancelledError:
logger.info("Planner: Task New Path Event Watcher Loop cancelled")
raise
except Exception as exc: # noqa
logger.warning(f"Planner: Task New Path Event Watcher Loop: Unknown exception {exc}")
traceback.print_exc()
raise
async def countdown_loop(self):
"""Manages the game countdown."""
logger.info("Planner: Task Countdown started")
try:
self.planner.game_context.last_countdown = self.planner.game_context.countdown
while True:
await asyncio.sleep(0.2)
if not self.planner.playing:
continue
now = datetime.now(UTC)
self.planner.game_context.countdown = (
self.planner.game_context.game_duration
- (now - self.planner.countdown_start_timestamp).total_seconds()
)
logger.info(f"Planner: countdown = {self.planner.game_context.countdown: 3.2f}")
if (
self.planner.robot_id > 1
and self.planner.game_context.countdown < 15
and self.planner.game_context.last_countdown > 15
):
logger.info("Planner: countdown==15: start PAMI")
self.planner.pami_event.set()
if (
self.planner.robot_id == 1
and self.planner.game_context.countdown < 7
and self.planner.game_context.last_countdown > 7
):
logger.info("Planner: countdown==7: disable GOAP, force blocked")
self.planner.strategy.goap_allowed = False
asyncio.create_task(self.planner.blocked())
if self.planner.game_context.countdown < 0 and self.planner.game_context.last_countdown > 0:
logger.info("Planner: countdown==0: final action")
await self.planner.final_action()
if self.planner.game_context.countdown < -5 and self.planner.game_context.last_countdown > -5:
await self.planner.sio_ns.emit("stop_video_record")
self.planner.game_context.last_countdown = self.planner.game_context.countdown
except asyncio.CancelledError:
logger.info("Planner: Task Countdown cancelled")
raise
except Exception as exc: # noqa
logger.warning(f"Planner: Unknown exception {exc}")
raise
async def countdown_start(self):
"""Starts the countdown task."""
await self.countdown_stop()
self.countdown_task = asyncio.create_task(self.countdown_loop())
async def countdown_stop(self):
"""Stops the countdown task."""
if self.countdown_task is None:
return
self.countdown_task.cancel()
try:
await self.countdown_task
except asyncio.CancelledError:
logger.info("Planner: Task Countdown stopped")
except Exception as exc:
logger.warning(f"Planner: Unexpected exception {exc}")
self.countdown_task = None
|
Watches for robot blocked events.
Source code in cogip/tools/planner/event_manager.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 | async def blocked_event_loop(self):
"""Watches for robot blocked events."""
logger.info("Planner: Task Blocked Event Watcher Loop started")
try:
while True:
await asyncio.to_thread(self.planner.shared_avoidance_blocked_lock.wait_update)
if self.planner.sio.connected:
await self.planner.sio_ns.emit("brake")
self.planner.blocked_counter += 1
if self.planner.blocked_counter > 10:
self.planner.blocked_counter = 0
await self.planner.blocked()
except asyncio.CancelledError:
logger.info("Planner: Task Blocked Event Watcher Loop cancelled")
raise
except Exception as exc: # noqa
logger.warning(f"Planner: Task Blocked Event Watcher Loop: Unknown exception {exc}")
traceback.print_exc()
raise
|
Manages the game countdown.
Source code in cogip/tools/planner/event_manager.py
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141 | async def countdown_loop(self):
"""Manages the game countdown."""
logger.info("Planner: Task Countdown started")
try:
self.planner.game_context.last_countdown = self.planner.game_context.countdown
while True:
await asyncio.sleep(0.2)
if not self.planner.playing:
continue
now = datetime.now(UTC)
self.planner.game_context.countdown = (
self.planner.game_context.game_duration
- (now - self.planner.countdown_start_timestamp).total_seconds()
)
logger.info(f"Planner: countdown = {self.planner.game_context.countdown: 3.2f}")
if (
self.planner.robot_id > 1
and self.planner.game_context.countdown < 15
and self.planner.game_context.last_countdown > 15
):
logger.info("Planner: countdown==15: start PAMI")
self.planner.pami_event.set()
if (
self.planner.robot_id == 1
and self.planner.game_context.countdown < 7
and self.planner.game_context.last_countdown > 7
):
logger.info("Planner: countdown==7: disable GOAP, force blocked")
self.planner.strategy.goap_allowed = False
asyncio.create_task(self.planner.blocked())
if self.planner.game_context.countdown < 0 and self.planner.game_context.last_countdown > 0:
logger.info("Planner: countdown==0: final action")
await self.planner.final_action()
if self.planner.game_context.countdown < -5 and self.planner.game_context.last_countdown > -5:
await self.planner.sio_ns.emit("stop_video_record")
self.planner.game_context.last_countdown = self.planner.game_context.countdown
except asyncio.CancelledError:
logger.info("Planner: Task Countdown cancelled")
raise
except Exception as exc: # noqa
logger.warning(f"Planner: Unknown exception {exc}")
raise
|
Starts the countdown task.
Source code in cogip/tools/planner/event_manager.py
| async def countdown_start(self):
"""Starts the countdown task."""
await self.countdown_stop()
self.countdown_task = asyncio.create_task(self.countdown_loop())
|
Stops the countdown task.
Source code in cogip/tools/planner/event_manager.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161 | async def countdown_stop(self):
"""Stops the countdown task."""
if self.countdown_task is None:
return
self.countdown_task.cancel()
try:
await self.countdown_task
except asyncio.CancelledError:
logger.info("Planner: Task Countdown stopped")
except Exception as exc:
logger.warning(f"Planner: Unexpected exception {exc}")
self.countdown_task = None
|
Watches for new path events.
Source code in cogip/tools/planner/event_manager.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 | async def new_path_event_loop(self):
"""Watches for new path events."""
logger.info("Planner: Task New Path Event Watcher Loop started")
try:
while True:
await asyncio.to_thread(self.planner.shared_avoidance_path_lock.wait_update)
self.planner.blocked_counter = 0
if self.planner.pose_order:
await self.planner.pose_order.act_intermediate_pose()
except asyncio.CancelledError:
logger.info("Planner: Task New Path Event Watcher Loop cancelled")
raise
except Exception as exc: # noqa
logger.warning(f"Planner: Task New Path Event Watcher Loop: Unknown exception {exc}")
traceback.print_exc()
raise
|
Start all async event loops.
Source code in cogip/tools/planner/event_manager.py
23
24
25
26
27
28
29
30
31
32
33 | async def start_loops(self):
"""Start all async event loops."""
self.blocked_event_task = asyncio.create_task(
self.blocked_event_loop(),
name="Planner: Task Blocked Event Watcher Loop",
)
self.new_path_event_task = asyncio.create_task(
self.new_path_event_loop(),
name="Planner: Task New Path Event Watcher Loop",
)
await self.countdown_start()
|
Stop all async event loops.
Source code in cogip/tools/planner/event_manager.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 | async def stop_loops(self):
"""Stop all async event loops."""
await self.countdown_stop()
if self.blocked_event_task:
self.blocked_event_task.cancel()
try:
await self.blocked_event_task
except asyncio.CancelledError:
logger.info("Planner: Task Blocked Event Watcher Loop stopped")
except Exception as exc:
logger.warning(f"Planner: Unexpected exception {exc}")
self.blocked_event_task = None
if self.new_path_event_task:
self.new_path_event_task.cancel()
try:
await self.new_path_event_task
except asyncio.CancelledError:
logger.info("Planner: Task New Path Event Watcher Loop stopped")
except Exception as exc:
logger.warning(f"Planner: Unexpected exception {exc}")
traceback.print_exc()
self.new_path_event_task = None
|