Skip to content

sio_events

SioEvents #

Bases: AsyncClientNamespace

Handle all SocketIO events received by Planner.

Source code in cogip/tools/copilot/sio_events.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
class SioEvents(socketio.AsyncClientNamespace):
    """
    Handle all SocketIO events received by Planner.
    """

    def __init__(self, copilot: "copilot.Copilot"):
        super().__init__("/copilot")
        self.copilot = copilot

    async def on_connect(self):
        """
        On connection to cogip-server.
        """
        await asyncio.to_thread(
            polling2.poll,
            lambda: self.client.connected is True,
            step=1,
            poll_forever=True,
        )
        logger.info("Connected to cogip-server")
        await self.emit("connected")

        if self.copilot.shell_menu:
            await self.emit("menu", self.copilot.shell_menu.model_dump(exclude_defaults=True, exclude_unset=True))
        await self.emit("register_menu", {"name": "copilot", "menu": menu.model_dump()})

    def on_disconnect(self) -> None:
        """
        On disconnection from cogip-server.
        """
        logger.info("Disconnected from cogip-server")

    async def on_connect_error(self, data: dict[str, Any]) -> None:
        """
        On connection error, check if a Planner is already connected and exit,
        or retry connection.
        """
        if isinstance(data, dict) and "message" in data:
            message = data["message"]
        else:
            message = data
        logger.error(f"Connection to cogip-server failed: {message}")

    async def on_command(self, data):
        """
        Callback on tool command message.
        """
        cmd, _, _ = data.partition(" ")
        pid_id = PB_Pid_Id()
        match cmd:
            case "angular_speed_pid_config":
                # Request angular speed pid state
                pid_id.id = PB_PidEnum.ANGULAR_SPEED_PID
                await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
            case "linear_speed_pid_config":
                # Request linear_speed pid state
                pid_id.id = PB_PidEnum.LINEAR_SPEED_PID
                await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
            case "angular_position_pid_config":
                # Request angular position pid state
                pid_id.id = PB_PidEnum.ANGULAR_POSE_PID
                await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
            case "linear_position_pid_config":
                # Request linear position pid state
                pid_id.id = PB_PidEnum.LINEAR_POSE_PID
                await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
            case _:
                logger.warning(f"Unknown command: {cmd}")

    async def on_shell_command(self, data):
        """
        Callback on shell command message.

        Build the Protobuf command message:

        * split received string at first space if any.
        * first is the command and goes to `cmd` attribute.
        * second part is arguments, if any, and goes to `desc` attribute.
        """
        response = PB_Command()
        response.cmd, _, response.desc = data.partition(" ")
        await self.copilot.pbcom.send_can_message(copilot.command_uuid, response)

    async def on_pose_start(self, data: dict[str, Any]):
        """
        Callback on pose start (from planner).
        Forward to mcu-firmware.
        """
        start_pose = models.PathPose.model_validate(data)
        pb_start_pose = PB_PathPose()
        start_pose.copy_pb(pb_start_pose)
        await self.copilot.pbcom.send_can_message(copilot.pose_start_uuid, pb_start_pose)

    async def on_pose_order(self, data: dict[str, Any]):
        """
        Callback on pose order (from planner).
        Forward to mcu-firmware.
        """
        pose_order = models.PathPose.model_validate(data)
        pb_pose_order = PB_PathPose()
        pose_order.copy_pb(pb_pose_order)
        await self.copilot.pbcom.send_can_message(copilot.pose_order_uuid, pb_pose_order)

    async def on_actuators_start(self):
        """
        Callback on actuators_start (from dashboard).
        Forward to mcu-firmware.
        """
        await self.copilot.pbcom.send_can_message(copilot.actuators_thread_start_uuid, None)

    async def on_actuators_stop(self):
        """
        Callback on actuators_stop (from dashboard).
        Forward to mcu-firmware.
        """
        await self.copilot.pbcom.send_can_message(copilot.actuators_thread_stop_uuid, None)

    async def on_actuator_command(self, data: dict[str, Any]):
        """
        Callback on actuator_command (from dashboard).
        Forward to mcu-firmware.
        """
        command = TypeAdapter(ActuatorCommand).validate_python(data)

        pb_command = PB_ActuatorCommand()
        if isinstance(command, ServoCommand):
            command.pb_copy(pb_command.servo)
        elif isinstance(command, PositionalActuatorCommand):
            command.pb_copy(pb_command.positional_actuator)
        await self.copilot.pbcom.send_can_message(copilot.actuator_command_uuid, pb_command)

    async def on_config_updated(self, config: dict[str, Any]) -> None:
        """
        Callback on config_updated from dashboard.
        Update pid PB message and send it back to firmware.
        """
        pid_id, _, name = config["name"].partition("-")
        if pid_id and name:
            setattr(self.copilot.pb_pids[int(pid_id)], name, config["value"])
            await self.copilot.pbcom.send_can_message(copilot.pid_uuid, self.copilot.pb_pids[int(pid_id)])

    async def on_set_controller(self, controller: int):
        """
        Callback on set_controller message.
        Forward to firmware.
        """
        pb_controller = PB_Controller()
        pb_controller.id = controller
        await self.copilot.pbcom.send_can_message(copilot.controller_uuid, pb_controller)

    async def on_game_start(self):
        """
        Callback on game_start message.
        Forward to firmware.
        """
        await self.copilot.pbcom.send_can_message(copilot.game_start_uuid, None)

    async def on_game_end(self):
        """
        Callback on game_end message.
        Forward to firmware.
        """
        await self.copilot.pbcom.send_can_message(copilot.game_end_uuid, None)

    async def on_game_reset(self):
        """
        Callback on game_reset message.
        Forward to firmware.
        """
        await self.copilot.pbcom.send_can_message(copilot.game_reset_uuid, None)

    async def on_brake(self):
        """
        Callback on brake message.
        Forward to firmware.
        """
        await self.copilot.pbcom.send_can_message(copilot.brake_uuid, None)

on_actuator_command(data) async #

Callback on actuator_command (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
132
133
134
135
136
137
138
139
140
141
142
143
144
async def on_actuator_command(self, data: dict[str, Any]):
    """
    Callback on actuator_command (from dashboard).
    Forward to mcu-firmware.
    """
    command = TypeAdapter(ActuatorCommand).validate_python(data)

    pb_command = PB_ActuatorCommand()
    if isinstance(command, ServoCommand):
        command.pb_copy(pb_command.servo)
    elif isinstance(command, PositionalActuatorCommand):
        command.pb_copy(pb_command.positional_actuator)
    await self.copilot.pbcom.send_can_message(copilot.actuator_command_uuid, pb_command)

on_actuators_start() async #

Callback on actuators_start (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
118
119
120
121
122
123
async def on_actuators_start(self):
    """
    Callback on actuators_start (from dashboard).
    Forward to mcu-firmware.
    """
    await self.copilot.pbcom.send_can_message(copilot.actuators_thread_start_uuid, None)

on_actuators_stop() async #

Callback on actuators_stop (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
125
126
127
128
129
130
async def on_actuators_stop(self):
    """
    Callback on actuators_stop (from dashboard).
    Forward to mcu-firmware.
    """
    await self.copilot.pbcom.send_can_message(copilot.actuators_thread_stop_uuid, None)

on_brake() async #

Callback on brake message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
186
187
188
189
190
191
async def on_brake(self):
    """
    Callback on brake message.
    Forward to firmware.
    """
    await self.copilot.pbcom.send_can_message(copilot.brake_uuid, None)

on_command(data) async #

Callback on tool command message.

Source code in cogip/tools/copilot/sio_events.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
async def on_command(self, data):
    """
    Callback on tool command message.
    """
    cmd, _, _ = data.partition(" ")
    pid_id = PB_Pid_Id()
    match cmd:
        case "angular_speed_pid_config":
            # Request angular speed pid state
            pid_id.id = PB_PidEnum.ANGULAR_SPEED_PID
            await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
        case "linear_speed_pid_config":
            # Request linear_speed pid state
            pid_id.id = PB_PidEnum.LINEAR_SPEED_PID
            await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
        case "angular_position_pid_config":
            # Request angular position pid state
            pid_id.id = PB_PidEnum.ANGULAR_POSE_PID
            await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
        case "linear_position_pid_config":
            # Request linear position pid state
            pid_id.id = PB_PidEnum.LINEAR_POSE_PID
            await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
        case _:
            logger.warning(f"Unknown command: {cmd}")

on_config_updated(config) async #

Callback on config_updated from dashboard. Update pid PB message and send it back to firmware.

Source code in cogip/tools/copilot/sio_events.py
146
147
148
149
150
151
152
153
154
async def on_config_updated(self, config: dict[str, Any]) -> None:
    """
    Callback on config_updated from dashboard.
    Update pid PB message and send it back to firmware.
    """
    pid_id, _, name = config["name"].partition("-")
    if pid_id and name:
        setattr(self.copilot.pb_pids[int(pid_id)], name, config["value"])
        await self.copilot.pbcom.send_can_message(copilot.pid_uuid, self.copilot.pb_pids[int(pid_id)])

on_connect() async #

On connection to cogip-server.

Source code in cogip/tools/copilot/sio_events.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def on_connect(self):
    """
    On connection to cogip-server.
    """
    await asyncio.to_thread(
        polling2.poll,
        lambda: self.client.connected is True,
        step=1,
        poll_forever=True,
    )
    logger.info("Connected to cogip-server")
    await self.emit("connected")

    if self.copilot.shell_menu:
        await self.emit("menu", self.copilot.shell_menu.model_dump(exclude_defaults=True, exclude_unset=True))
    await self.emit("register_menu", {"name": "copilot", "menu": menu.model_dump()})

on_connect_error(data) async #

On connection error, check if a Planner is already connected and exit, or retry connection.

Source code in cogip/tools/copilot/sio_events.py
47
48
49
50
51
52
53
54
55
56
async def on_connect_error(self, data: dict[str, Any]) -> None:
    """
    On connection error, check if a Planner is already connected and exit,
    or retry connection.
    """
    if isinstance(data, dict) and "message" in data:
        message = data["message"]
    else:
        message = data
    logger.error(f"Connection to cogip-server failed: {message}")

on_disconnect() #

On disconnection from cogip-server.

Source code in cogip/tools/copilot/sio_events.py
41
42
43
44
45
def on_disconnect(self) -> None:
    """
    On disconnection from cogip-server.
    """
    logger.info("Disconnected from cogip-server")

on_game_end() async #

Callback on game_end message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
172
173
174
175
176
177
async def on_game_end(self):
    """
    Callback on game_end message.
    Forward to firmware.
    """
    await self.copilot.pbcom.send_can_message(copilot.game_end_uuid, None)

on_game_reset() async #

Callback on game_reset message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
179
180
181
182
183
184
async def on_game_reset(self):
    """
    Callback on game_reset message.
    Forward to firmware.
    """
    await self.copilot.pbcom.send_can_message(copilot.game_reset_uuid, None)

on_game_start() async #

Callback on game_start message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
165
166
167
168
169
170
async def on_game_start(self):
    """
    Callback on game_start message.
    Forward to firmware.
    """
    await self.copilot.pbcom.send_can_message(copilot.game_start_uuid, None)

on_pose_order(data) async #

Callback on pose order (from planner). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
108
109
110
111
112
113
114
115
116
async def on_pose_order(self, data: dict[str, Any]):
    """
    Callback on pose order (from planner).
    Forward to mcu-firmware.
    """
    pose_order = models.PathPose.model_validate(data)
    pb_pose_order = PB_PathPose()
    pose_order.copy_pb(pb_pose_order)
    await self.copilot.pbcom.send_can_message(copilot.pose_order_uuid, pb_pose_order)

on_pose_start(data) async #

Callback on pose start (from planner). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
 98
 99
100
101
102
103
104
105
106
async def on_pose_start(self, data: dict[str, Any]):
    """
    Callback on pose start (from planner).
    Forward to mcu-firmware.
    """
    start_pose = models.PathPose.model_validate(data)
    pb_start_pose = PB_PathPose()
    start_pose.copy_pb(pb_start_pose)
    await self.copilot.pbcom.send_can_message(copilot.pose_start_uuid, pb_start_pose)

on_set_controller(controller) async #

Callback on set_controller message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
156
157
158
159
160
161
162
163
async def on_set_controller(self, controller: int):
    """
    Callback on set_controller message.
    Forward to firmware.
    """
    pb_controller = PB_Controller()
    pb_controller.id = controller
    await self.copilot.pbcom.send_can_message(copilot.controller_uuid, pb_controller)

on_shell_command(data) async #

Callback on shell command message.

Build the Protobuf command message:

  • split received string at first space if any.
  • first is the command and goes to cmd attribute.
  • second part is arguments, if any, and goes to desc attribute.
Source code in cogip/tools/copilot/sio_events.py
84
85
86
87
88
89
90
91
92
93
94
95
96
async def on_shell_command(self, data):
    """
    Callback on shell command message.

    Build the Protobuf command message:

    * split received string at first space if any.
    * first is the command and goes to `cmd` attribute.
    * second part is arguments, if any, and goes to `desc` attribute.
    """
    response = PB_Command()
    response.cmd, _, response.desc = data.partition(" ")
    await self.copilot.pbcom.send_can_message(copilot.command_uuid, response)