Skip to content

sio_events

SioEvents #

Bases: AsyncClientNamespace

Handle all SocketIO events received by Planner.

Source code in cogip/tools/copilot/sio_events.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class SioEvents(socketio.AsyncClientNamespace):
    """
    Handle all SocketIO events received by Planner.
    """

    def __init__(self, copilot: "copilot.Copilot"):
        super().__init__("/copilot")
        self.copilot = copilot
        self.connected = False

    async def on_connect(self):
        """
        On connection to cogip-server.
        """
        await asyncio.to_thread(
            polling2.poll,
            lambda: self.client.connected is True,
            step=1,
            poll_forever=True,
        )
        logger.info("Connected to cogip-server")
        await self.emit("connected")

        self.copilot.create_shared_memory()
        self.connected = True

        if self.copilot.shell_menu:
            await self.emit("menu", self.copilot.shell_menu.model_dump(exclude_defaults=True, exclude_unset=True))
        await self.emit("register_menu", {"name": "copilot", "menu": menu.model_dump()})

    async def on_disconnect(self) -> None:
        """
        On disconnection from cogip-server.
        """
        logger.info("Disconnected from cogip-server")
        self.connected = False
        await self.copilot.delete_shared_memory()

    async def on_connect_error(self, data: dict[str, Any]) -> None:
        """
        On connection error, check if a Planner is already connected and exit,
        or retry connection.
        """
        if isinstance(data, dict) and "message" in data:
            message = data["message"]
        else:
            message = data
        logger.error(f"Connection to cogip-server failed: {message}")

    async def on_command(self, data):
        """
        Callback on tool command message.
        """
        cmd, _, _ = data.partition(" ")
        pid_id = PB_Pid_Id()
        match cmd:
            case "angular_speed_pid_config":
                # Request angular speed pid state
                pid_id.id = PB_PidEnum.ANGULAR_SPEED_PID
                await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
            case "linear_speed_pid_config":
                # Request linear_speed pid state
                pid_id.id = PB_PidEnum.LINEAR_SPEED_PID
                await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
            case "angular_position_pid_config":
                # Request angular position pid state
                pid_id.id = PB_PidEnum.ANGULAR_POSE_PID
                await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
            case "linear_position_pid_config":
                # Request linear position pid state
                pid_id.id = PB_PidEnum.LINEAR_POSE_PID
                await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
            case _:
                logger.warning(f"Unknown command: {cmd}")

    async def on_pose_start(self, data: dict[str, Any]):
        """
        Callback on pose start (from planner).
        Forward to mcu-firmware.
        """
        logger.info(f"[SIO] Pose start: {data}")
        start_pose = models.PathPose.model_validate(data)
        pb_start_pose = PB_PathPose()
        start_pose.copy_pb(pb_start_pose)
        await self.copilot.pbcom.send_can_message(copilot.pose_start_uuid, pb_start_pose)

    async def on_pose_order(self, data: dict[str, Any]):
        """
        Callback on pose order (from planner).
        Forward to mcu-firmware.
        """
        logger.info(f"[SIO] Pose order: {data}")
        pose_order = models.PathPose.model_validate(data)
        if self.copilot.id > 1:
            pose_order.motion_direction = models.MotionDirection.FORWARD_ONLY
        pb_pose_order = PB_PathPose()
        pose_order.copy_pb(pb_pose_order)
        await self.copilot.pbcom.send_can_message(copilot.pose_order_uuid, pb_pose_order)

    async def on_actuators_start(self):
        """
        Callback on actuators_start (from dashboard).
        Forward to mcu-firmware.
        """
        await self.copilot.pbcom.send_can_message(copilot.actuators_thread_start_uuid, None)

    async def on_actuators_stop(self):
        """
        Callback on actuators_stop (from dashboard).
        Forward to mcu-firmware.
        """
        await self.copilot.pbcom.send_can_message(copilot.actuators_thread_stop_uuid, None)

    async def on_actuator_command(self, data: dict[str, Any]):
        """
        Callback on actuator_command (from dashboard).
        Forward to mcu-firmware.
        """
        logger.info(f"[SIO] Actuator command: {data}")
        command = TypeAdapter(ActuatorCommand).validate_python(data)

        pb_command = PB_ActuatorCommand()
        if isinstance(command, PositionalActuatorCommand):
            command.pb_copy(pb_command.positional_actuator)
        await self.copilot.pbcom.send_can_message(copilot.actuator_command_uuid, pb_command)

    async def on_actuator_init(self):
        """
        Callback on actuator_init (from dashboard).
        Forward to mcu-firmware.
        """
        logger.info("[SIO] Actuator init")
        await self.copilot.pbcom.send_can_message(copilot.actuator_init_uuid, None)

    async def on_config_updated(self, config: dict[str, Any]) -> None:
        """
        Callback on config_updated from dashboard.
        Update pid PB message and send it back to firmware.
        """
        pid_id, _, name = config["name"].partition("-")
        if pid_id and name:
            setattr(self.copilot.pb_pids[int(pid_id)], name, config["value"])
            await self.copilot.pbcom.send_can_message(copilot.pid_uuid, self.copilot.pb_pids[int(pid_id)])

    async def on_set_controller(self, controller: int):
        """
        Callback on set_controller message.
        Forward to firmware.
        """
        pb_controller = PB_Controller()
        pb_controller.id = controller
        await self.copilot.pbcom.send_can_message(copilot.controller_uuid, pb_controller)

    async def on_game_start(self):
        """
        Callback on game_start message.
        Forward to firmware.
        """
        logger.info("[SIO] Game start")
        await self.copilot.pbcom.send_can_message(copilot.game_start_uuid, None)

    async def on_game_end(self):
        """
        Callback on game_end message.
        Forward to firmware.
        """
        logger.info("[SIO] Game end")
        await self.copilot.pbcom.send_can_message(copilot.game_end_uuid, None)

    async def on_game_reset(self):
        """
        Callback on game_reset message.
        Forward to firmware.
        """
        logger.info("[SIO] Game reset")
        await self.copilot.pbcom.send_can_message(copilot.game_reset_uuid, None)

    async def on_brake(self):
        """
        Callback on brake message.
        Forward to firmware.
        """
        logger.info("[SIO] Brake")
        await self.copilot.pbcom.send_can_message(copilot.brake_uuid, None)

    async def on_get_parameter_value(self, data: dict[str, Any]):
        """
        Callback on get_parameter_value.
        Forward to firmware.
        """
        logger.info(f"[SIO] Get parameter: {data}")

        parameter = FirmwareParameter.model_validate(data)
        pb_get_request = PB_ParameterGetRequest()
        parameter.pb_copy(pb_get_request)

        await self.copilot.pbcom.send_can_message(copilot.parameter_get_uuid, pb_get_request)

    async def on_set_parameter_value(self, data: dict[str, Any]):
        """
        Callback on set_parameter_value.
        Forward to firmware.
        """
        logger.info(f"[SIO] Set parameter: {data}")

        parameter = FirmwareParameter.model_validate(data)
        pb_set_request = PB_ParameterSetRequest()
        parameter.pb_copy(pb_set_request)

        await self.copilot.pbcom.send_can_message(copilot.parameter_set_uuid, pb_set_request)

    async def on_telemetry_enable(self, data: dict[str, Any] | None = None):
        """
        Callback on telemetry enable message.
        Forward to firmware.
        """
        logger.info("[SIO] Telemetry enable")
        await self.copilot.pbcom.send_can_message(copilot.telemetry_enable_uuid, None)

    async def on_telemetry_disable(self, data: dict[str, Any] | None = None):
        """
        Callback on telemetry disable message.
        Forward to firmware.
        """
        logger.info("[SIO] Telemetry disable")
        await self.copilot.pbcom.send_can_message(copilot.telemetry_disable_uuid, None)

on_actuator_command(data) async #

Callback on actuator_command (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
137
138
139
140
141
142
143
144
145
146
147
148
async def on_actuator_command(self, data: dict[str, Any]):
    """
    Callback on actuator_command (from dashboard).
    Forward to mcu-firmware.
    """
    logger.info(f"[SIO] Actuator command: {data}")
    command = TypeAdapter(ActuatorCommand).validate_python(data)

    pb_command = PB_ActuatorCommand()
    if isinstance(command, PositionalActuatorCommand):
        command.pb_copy(pb_command.positional_actuator)
    await self.copilot.pbcom.send_can_message(copilot.actuator_command_uuid, pb_command)

on_actuator_init() async #

Callback on actuator_init (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
150
151
152
153
154
155
156
async def on_actuator_init(self):
    """
    Callback on actuator_init (from dashboard).
    Forward to mcu-firmware.
    """
    logger.info("[SIO] Actuator init")
    await self.copilot.pbcom.send_can_message(copilot.actuator_init_uuid, None)

on_actuators_start() async #

Callback on actuators_start (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
123
124
125
126
127
128
async def on_actuators_start(self):
    """
    Callback on actuators_start (from dashboard).
    Forward to mcu-firmware.
    """
    await self.copilot.pbcom.send_can_message(copilot.actuators_thread_start_uuid, None)

on_actuators_stop() async #

Callback on actuators_stop (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
130
131
132
133
134
135
async def on_actuators_stop(self):
    """
    Callback on actuators_stop (from dashboard).
    Forward to mcu-firmware.
    """
    await self.copilot.pbcom.send_can_message(copilot.actuators_thread_stop_uuid, None)

on_brake() async #

Callback on brake message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
201
202
203
204
205
206
207
async def on_brake(self):
    """
    Callback on brake message.
    Forward to firmware.
    """
    logger.info("[SIO] Brake")
    await self.copilot.pbcom.send_can_message(copilot.brake_uuid, None)

on_command(data) async #

Callback on tool command message.

Source code in cogip/tools/copilot/sio_events.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
async def on_command(self, data):
    """
    Callback on tool command message.
    """
    cmd, _, _ = data.partition(" ")
    pid_id = PB_Pid_Id()
    match cmd:
        case "angular_speed_pid_config":
            # Request angular speed pid state
            pid_id.id = PB_PidEnum.ANGULAR_SPEED_PID
            await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
        case "linear_speed_pid_config":
            # Request linear_speed pid state
            pid_id.id = PB_PidEnum.LINEAR_SPEED_PID
            await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
        case "angular_position_pid_config":
            # Request angular position pid state
            pid_id.id = PB_PidEnum.ANGULAR_POSE_PID
            await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
        case "linear_position_pid_config":
            # Request linear position pid state
            pid_id.id = PB_PidEnum.LINEAR_POSE_PID
            await self.copilot.pbcom.send_can_message(copilot.pid_request_uuid, pid_id)
        case _:
            logger.warning(f"Unknown command: {cmd}")

on_config_updated(config) async #

Callback on config_updated from dashboard. Update pid PB message and send it back to firmware.

Source code in cogip/tools/copilot/sio_events.py
158
159
160
161
162
163
164
165
166
async def on_config_updated(self, config: dict[str, Any]) -> None:
    """
    Callback on config_updated from dashboard.
    Update pid PB message and send it back to firmware.
    """
    pid_id, _, name = config["name"].partition("-")
    if pid_id and name:
        setattr(self.copilot.pb_pids[int(pid_id)], name, config["value"])
        await self.copilot.pbcom.send_can_message(copilot.pid_uuid, self.copilot.pb_pids[int(pid_id)])

on_connect() async #

On connection to cogip-server.

Source code in cogip/tools/copilot/sio_events.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
async def on_connect(self):
    """
    On connection to cogip-server.
    """
    await asyncio.to_thread(
        polling2.poll,
        lambda: self.client.connected is True,
        step=1,
        poll_forever=True,
    )
    logger.info("Connected to cogip-server")
    await self.emit("connected")

    self.copilot.create_shared_memory()
    self.connected = True

    if self.copilot.shell_menu:
        await self.emit("menu", self.copilot.shell_menu.model_dump(exclude_defaults=True, exclude_unset=True))
    await self.emit("register_menu", {"name": "copilot", "menu": menu.model_dump()})

on_connect_error(data) async #

On connection error, check if a Planner is already connected and exit, or retry connection.

Source code in cogip/tools/copilot/sio_events.py
62
63
64
65
66
67
68
69
70
71
async def on_connect_error(self, data: dict[str, Any]) -> None:
    """
    On connection error, check if a Planner is already connected and exit,
    or retry connection.
    """
    if isinstance(data, dict) and "message" in data:
        message = data["message"]
    else:
        message = data
    logger.error(f"Connection to cogip-server failed: {message}")

on_disconnect() async #

On disconnection from cogip-server.

Source code in cogip/tools/copilot/sio_events.py
54
55
56
57
58
59
60
async def on_disconnect(self) -> None:
    """
    On disconnection from cogip-server.
    """
    logger.info("Disconnected from cogip-server")
    self.connected = False
    await self.copilot.delete_shared_memory()

on_game_end() async #

Callback on game_end message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
185
186
187
188
189
190
191
async def on_game_end(self):
    """
    Callback on game_end message.
    Forward to firmware.
    """
    logger.info("[SIO] Game end")
    await self.copilot.pbcom.send_can_message(copilot.game_end_uuid, None)

on_game_reset() async #

Callback on game_reset message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
193
194
195
196
197
198
199
async def on_game_reset(self):
    """
    Callback on game_reset message.
    Forward to firmware.
    """
    logger.info("[SIO] Game reset")
    await self.copilot.pbcom.send_can_message(copilot.game_reset_uuid, None)

on_game_start() async #

Callback on game_start message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
177
178
179
180
181
182
183
async def on_game_start(self):
    """
    Callback on game_start message.
    Forward to firmware.
    """
    logger.info("[SIO] Game start")
    await self.copilot.pbcom.send_can_message(copilot.game_start_uuid, None)

on_get_parameter_value(data) async #

Callback on get_parameter_value. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
209
210
211
212
213
214
215
216
217
218
219
220
async def on_get_parameter_value(self, data: dict[str, Any]):
    """
    Callback on get_parameter_value.
    Forward to firmware.
    """
    logger.info(f"[SIO] Get parameter: {data}")

    parameter = FirmwareParameter.model_validate(data)
    pb_get_request = PB_ParameterGetRequest()
    parameter.pb_copy(pb_get_request)

    await self.copilot.pbcom.send_can_message(copilot.parameter_get_uuid, pb_get_request)

on_pose_order(data) async #

Callback on pose order (from planner). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
110
111
112
113
114
115
116
117
118
119
120
121
async def on_pose_order(self, data: dict[str, Any]):
    """
    Callback on pose order (from planner).
    Forward to mcu-firmware.
    """
    logger.info(f"[SIO] Pose order: {data}")
    pose_order = models.PathPose.model_validate(data)
    if self.copilot.id > 1:
        pose_order.motion_direction = models.MotionDirection.FORWARD_ONLY
    pb_pose_order = PB_PathPose()
    pose_order.copy_pb(pb_pose_order)
    await self.copilot.pbcom.send_can_message(copilot.pose_order_uuid, pb_pose_order)

on_pose_start(data) async #

Callback on pose start (from planner). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
 99
100
101
102
103
104
105
106
107
108
async def on_pose_start(self, data: dict[str, Any]):
    """
    Callback on pose start (from planner).
    Forward to mcu-firmware.
    """
    logger.info(f"[SIO] Pose start: {data}")
    start_pose = models.PathPose.model_validate(data)
    pb_start_pose = PB_PathPose()
    start_pose.copy_pb(pb_start_pose)
    await self.copilot.pbcom.send_can_message(copilot.pose_start_uuid, pb_start_pose)

on_set_controller(controller) async #

Callback on set_controller message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
168
169
170
171
172
173
174
175
async def on_set_controller(self, controller: int):
    """
    Callback on set_controller message.
    Forward to firmware.
    """
    pb_controller = PB_Controller()
    pb_controller.id = controller
    await self.copilot.pbcom.send_can_message(copilot.controller_uuid, pb_controller)

on_set_parameter_value(data) async #

Callback on set_parameter_value. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
222
223
224
225
226
227
228
229
230
231
232
233
async def on_set_parameter_value(self, data: dict[str, Any]):
    """
    Callback on set_parameter_value.
    Forward to firmware.
    """
    logger.info(f"[SIO] Set parameter: {data}")

    parameter = FirmwareParameter.model_validate(data)
    pb_set_request = PB_ParameterSetRequest()
    parameter.pb_copy(pb_set_request)

    await self.copilot.pbcom.send_can_message(copilot.parameter_set_uuid, pb_set_request)

on_telemetry_disable(data=None) async #

Callback on telemetry disable message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
243
244
245
246
247
248
249
async def on_telemetry_disable(self, data: dict[str, Any] | None = None):
    """
    Callback on telemetry disable message.
    Forward to firmware.
    """
    logger.info("[SIO] Telemetry disable")
    await self.copilot.pbcom.send_can_message(copilot.telemetry_disable_uuid, None)

on_telemetry_enable(data=None) async #

Callback on telemetry enable message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
235
236
237
238
239
240
241
async def on_telemetry_enable(self, data: dict[str, Any] | None = None):
    """
    Callback on telemetry enable message.
    Forward to firmware.
    """
    logger.info("[SIO] Telemetry enable")
    await self.copilot.pbcom.send_can_message(copilot.telemetry_enable_uuid, None)