Skip to content

action

Action #

This class represents an action of the game. It contains a list of Pose to reach in order. A function can be executed before the action starts and after it ends.

Source code in cogip/tools/planner/actions/action.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class Action:
    """
    This class represents an action of the game.
    It contains a list of Pose to reach in order.
    A function can be executed before the action starts and after it ends.
    """

    logger = logger

    def __init__(
        self,
        name: str,
        planner: "Planner",
        strategy: "Strategy",
        interruptable: bool = True,
        recyclable: bool = True,
    ):
        self.name = name
        self.planner = planner
        self.strategy = strategy
        self.interruptable = interruptable
        self.recyclable = recyclable
        self.poses: list[Pose] = []
        self.before_action_func: Callable[[], Awaitable[None]] | None = None
        self.after_action_func: Callable[[], Awaitable[None]] | None = None
        self.on_blocked_func: Callable[[], Awaitable[None]] | None = None
        self.recycled: bool = False

    def weight(self) -> float:
        """
        Weight of the action.
        It can be used to choose the next action to select.
        This is the generic implementation.
        """
        raise NotImplementedError

    @final
    async def act_before_action(self):
        """
        Function executed before the action starts.
        """
        if self.before_action_func:
            await self.before_action_func()

    @final
    async def act_after_action(self):
        """
        Function executed after the action ends.
        """
        if self.after_action_func:
            await self.after_action_func()

        # Re-enable all actions after a successful action
        for action in self.strategy:
            action.recycled = False

    @final
    async def act_on_blocked(self):
        """
        Function executed when the action is blocked.
        """
        if self.on_blocked_func:
            await self.on_blocked_func()

    async def recycle(self):
        """
        Function called if the action is blocked and put back in the actions list
        """
        self.recycled = True

    @property
    def pose_current(self) -> models.Pose:
        return self.planner.pose_current

    async def evaluate(self):
        # Average robot speed in mm/s.
        # This is just an empirical value found by testing that gives a good enough
        # estimation of the time needed to perform the action.
        # This could be improved later by using target speed and acceleration.
        average_speed = 100

        await self.act_before_action()

        # Update countdown
        self.planner.game_context.countdown -= asyncio.sleep.total_sleep
        asyncio.sleep.reset()

        while len(self.poses) and self.planner.game_context.countdown > 0:
            pose = self.poses.pop(0)

            await pose.act_before_pose()
            await pose.act_after_pose()

            # Update countdown
            distance = math.dist(
                (self.planner.pose_current.x, self.planner.pose_current.y),
                (pose.x, pose.y),
            )
            self.planner.game_context.countdown -= asyncio.sleep.total_sleep + distance / average_speed
            asyncio.sleep.reset()

            # Update pose_current
            self.planner.pose_current.x = pose.x
            self.planner.pose_current.y = pose.y
            self.planner.pose_current.O = pose.O

        await self.act_after_action()

        # Update countdown
        self.planner.game_context.countdown -= asyncio.sleep.total_sleep
        asyncio.sleep.reset()

    def __str__(self) -> str:
        return f"Action[0x{id(self):x}]({self.name})"

    def __repr__(self) -> str:
        return str(self)

act_after_action() async #

Function executed after the action ends.

Source code in cogip/tools/planner/actions/action.py
59
60
61
62
63
64
65
66
67
68
69
@final
async def act_after_action(self):
    """
    Function executed after the action ends.
    """
    if self.after_action_func:
        await self.after_action_func()

    # Re-enable all actions after a successful action
    for action in self.strategy:
        action.recycled = False

act_before_action() async #

Function executed before the action starts.

Source code in cogip/tools/planner/actions/action.py
51
52
53
54
55
56
57
@final
async def act_before_action(self):
    """
    Function executed before the action starts.
    """
    if self.before_action_func:
        await self.before_action_func()

act_on_blocked() async #

Function executed when the action is blocked.

Source code in cogip/tools/planner/actions/action.py
71
72
73
74
75
76
77
@final
async def act_on_blocked(self):
    """
    Function executed when the action is blocked.
    """
    if self.on_blocked_func:
        await self.on_blocked_func()

recycle() async #

Function called if the action is blocked and put back in the actions list

Source code in cogip/tools/planner/actions/action.py
79
80
81
82
83
async def recycle(self):
    """
    Function called if the action is blocked and put back in the actions list
    """
    self.recycled = True

weight() #

Weight of the action. It can be used to choose the next action to select. This is the generic implementation.

Source code in cogip/tools/planner/actions/action.py
43
44
45
46
47
48
49
def weight(self) -> float:
    """
    Weight of the action.
    It can be used to choose the next action to select.
    This is the generic implementation.
    """
    raise NotImplementedError