This class represents an action of the game.
It contains a list of Pose to reach in order.
A function can be executed before the action starts and after it ends.
Source code in cogip/tools/planner/actions/action.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 | class Action:
"""
This class represents an action of the game.
It contains a list of Pose to reach in order.
A function can be executed before the action starts and after it ends.
"""
logger = logger
def __init__(
self,
name: str,
planner: "Planner",
strategy: "Strategy",
interruptable: bool = True,
recyclable: bool = True,
):
self.name = name
self.planner = planner
self.strategy = strategy
self.interruptable = interruptable
self.recyclable = recyclable
self.poses: list[Pose] = []
self.before_action_func: Callable[[], Awaitable[None]] | None = None
self.after_action_func: Callable[[], Awaitable[None]] | None = None
self.on_blocked_func: Callable[[], Awaitable[None]] | None = None
self.recycled: bool = False
def weight(self) -> float:
"""
Weight of the action.
It can be used to choose the next action to select.
This is the generic implementation.
"""
raise NotImplementedError
@final
async def act_before_action(self):
"""
Function executed before the action starts.
"""
if self.before_action_func:
await self.before_action_func()
@final
async def act_after_action(self):
"""
Function executed after the action ends.
"""
if self.after_action_func:
await self.after_action_func()
# Re-enable all actions after a successful action
for action in self.strategy:
action.recycled = False
@final
async def act_on_blocked(self):
"""
Function executed when the action is blocked.
"""
if self.on_blocked_func:
await self.on_blocked_func()
async def recycle(self):
"""
Function called if the action is blocked and put back in the actions list
"""
self.recycled = True
@property
def pose_current(self) -> models.Pose:
return self.planner.pose_current
async def evaluate(self):
# Average robot speed in mm/s.
# This is just an empirical value found by testing that gives a good enough
# estimation of the time needed to perform the action.
# This could be improved later by using target speed and acceleration.
average_speed = 100
await self.act_before_action()
# Update countdown
self.planner.game_context.countdown -= asyncio.sleep.total_sleep
asyncio.sleep.reset()
while len(self.poses) and self.planner.game_context.countdown > 0:
pose = self.poses.pop(0)
await pose.act_before_pose()
await pose.act_after_pose()
# Update countdown
distance = math.dist(
(self.planner.pose_current.x, self.planner.pose_current.y),
(pose.x, pose.y),
)
self.planner.game_context.countdown -= asyncio.sleep.total_sleep + distance / average_speed
asyncio.sleep.reset()
# Update pose_current
self.planner.pose_current.x = pose.x
self.planner.pose_current.y = pose.y
self.planner.pose_current.O = pose.O
await self.act_after_action()
# Update countdown
self.planner.game_context.countdown -= asyncio.sleep.total_sleep
asyncio.sleep.reset()
def __str__(self) -> str:
return f"Action[0x{id(self):x}]({self.name})"
def __repr__(self) -> str:
return str(self)
|
Function executed after the action ends.
Source code in cogip/tools/planner/actions/action.py
59
60
61
62
63
64
65
66
67
68
69 | @final
async def act_after_action(self):
"""
Function executed after the action ends.
"""
if self.after_action_func:
await self.after_action_func()
# Re-enable all actions after a successful action
for action in self.strategy:
action.recycled = False
|
Function executed before the action starts.
Source code in cogip/tools/planner/actions/action.py
| @final
async def act_before_action(self):
"""
Function executed before the action starts.
"""
if self.before_action_func:
await self.before_action_func()
|
Function executed when the action is blocked.
Source code in cogip/tools/planner/actions/action.py
| @final
async def act_on_blocked(self):
"""
Function executed when the action is blocked.
"""
if self.on_blocked_func:
await self.on_blocked_func()
|
Function called if the action is blocked and put back in the actions list
Source code in cogip/tools/planner/actions/action.py
| async def recycle(self):
"""
Function called if the action is blocked and put back in the actions list
"""
self.recycled = True
|
Weight of the action.
It can be used to choose the next action to select.
This is the generic implementation.
Source code in cogip/tools/planner/actions/action.py
| def weight(self) -> float:
"""
Weight of the action.
It can be used to choose the next action to select.
This is the generic implementation.
"""
raise NotImplementedError
|