Skip to content

sio_events

SioEvents #

Bases: AsyncClientNamespace

Handle all SocketIO events received by Planner.

Source code in cogip/tools/copilot/sio_events.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
class SioEvents(socketio.AsyncClientNamespace):
    """
    Handle all SocketIO events received by Planner.
    """

    def __init__(self, copilot: "copilot.Copilot"):
        super().__init__("/copilot")
        self.copilot = copilot
        self.connected = False

    async def on_connect(self):
        """
        On connection to cogip-server.
        """
        await asyncio.to_thread(
            polling2.poll,
            lambda: self.client.connected is True,
            step=1,
            poll_forever=True,
        )
        logger.info("Connected to cogip-server")
        await self.emit("connected")

        self.copilot.create_shared_memory()
        self.connected = True

    async def on_disconnect(self) -> None:
        """
        On disconnection from cogip-server.
        """
        logger.info("Disconnected from cogip-server")
        self.connected = False
        await self.copilot.delete_shared_memory()

    async def on_connect_error(self, data: dict[str, Any]) -> None:
        """
        On connection error, check if a Planner is already connected and exit,
        or retry connection.
        """
        if isinstance(data, dict) and "message" in data:
            message = data["message"]
        else:
            message = data
        logger.error(f"Connection to cogip-server failed: {message}")

    async def on_pose_start(self, data: dict[str, Any]):
        """
        Callback on pose start (from planner).
        Forward to mcu-firmware.
        """
        logger.info(f"[SIO] Pose start: {data}")
        start_pose = models.PathPose.model_validate(data)
        pb_start_pose = PB_PathPose()
        start_pose.copy_pb(pb_start_pose)
        await self.copilot.pbcom.send_can_message(copilot.pose_start_uuid, pb_start_pose)

    async def on_pose_order(self, data: dict[str, Any]):
        """
        Callback on pose order (from planner).
        Forward to mcu-firmware.
        """
        logger.info(f"[SIO] Pose order: {data}")
        pose_order = models.PathPose.model_validate(data)
        if self.copilot.id > 1:
            pose_order.motion_direction = models.MotionDirection.FORWARD_ONLY
        pb_pose_order = PB_PathPose()
        pose_order.copy_pb(pb_pose_order)
        await self.copilot.pbcom.send_can_message(copilot.pose_order_uuid, pb_pose_order)

    async def on_speed_order(self, data: dict[str, Any]):
        """
        Callback on speed order (from calibration tool).
        Forward to mcu-firmware.
        """
        logger.info(f"[SIO] Speed order: {data}")
        speed_order = models.SpeedOrder.model_validate(data)
        pb_speed_order = PB_SpeedOrder()
        speed_order.copy_pb(pb_speed_order)
        await self.copilot.pbcom.send_can_message(copilot.speed_order_uuid, pb_speed_order)

    async def on_actuators_start(self):
        """
        Callback on actuators_start (from dashboard).
        Forward to mcu-firmware.
        """
        await self.copilot.pbcom.send_can_message(copilot.actuators_thread_start_uuid, None)

    async def on_actuators_stop(self):
        """
        Callback on actuators_stop (from dashboard).
        Forward to mcu-firmware.
        """
        await self.copilot.pbcom.send_can_message(copilot.actuators_thread_stop_uuid, None)

    async def on_actuator_command(self, data: dict[str, Any]):
        """
        Callback on actuator_command (from dashboard).
        Forward to mcu-firmware.
        """
        logger.info(f"[SIO] Actuator command: {data}")
        command = TypeAdapter(ActuatorCommand).validate_python(data)

        pb_command = PB_ActuatorCommand()
        if isinstance(command, PositionalActuatorCommand):
            command.pb_copy(pb_command.positional_actuator)
        await self.copilot.pbcom.send_can_message(copilot.actuator_command_uuid, pb_command)

    async def on_actuator_init(self):
        """
        Callback on actuator_init (from dashboard).
        Forward to mcu-firmware.
        """
        logger.info("[SIO] Actuator init")
        await self.copilot.pbcom.send_can_message(copilot.actuator_init_uuid, None)

    async def on_set_controller(self, controller: int):
        """
        Callback on set_controller message.
        Forward to firmware.
        """
        pb_controller = PB_Controller()
        pb_controller.id = controller
        await self.copilot.pbcom.send_can_message(copilot.controller_uuid, pb_controller)

    async def on_game_start(self):
        """
        Callback on game_start message.
        Forward to firmware.
        """
        logger.info("[SIO] Game start")
        await self.copilot.pbcom.send_can_message(copilot.game_start_uuid, None)

    async def on_game_end(self):
        """
        Callback on game_end message.
        Forward to firmware.
        """
        logger.info("[SIO] Game end")
        await self.copilot.pbcom.send_can_message(copilot.game_end_uuid, None)

    async def on_game_reset(self):
        """
        Callback on game_reset message.
        Forward to firmware.
        """
        logger.info("[SIO] Game reset")
        await self.copilot.pbcom.send_can_message(copilot.game_reset_uuid, None)

    async def on_brake(self):
        """
        Callback on brake message.
        Forward to firmware.
        """
        logger.info("[SIO] Brake")
        await self.copilot.pbcom.send_can_message(copilot.brake_uuid, None)

    async def on_get_parameter_value(self, data: dict[str, Any]):
        """
        Callback on get_parameter_value.
        Forward to firmware.
        """
        logger.info(f"[SIO] Get parameter: {data}")

        parameter = FirmwareParameter.model_validate(data)
        pb_get_request = PB_ParameterGetRequest()
        parameter.pb_copy(pb_get_request)

        await self.copilot.pbcom.send_can_message(copilot.parameter_get_uuid, pb_get_request)

    async def on_set_parameter_value(self, data: dict[str, Any]):
        """
        Callback on set_parameter_value.
        Forward to firmware.
        """
        logger.info(f"[SIO] Set parameter: {data}")

        parameter = FirmwareParameter.model_validate(data)
        pb_set_request = PB_ParameterSetRequest()
        parameter.pb_copy(pb_set_request)

        await self.copilot.pbcom.send_can_message(copilot.parameter_set_uuid, pb_set_request)

    async def on_telemetry_enable(self, data: dict[str, Any] | None = None):
        """
        Callback on telemetry enable message.
        Forward to firmware.
        """
        logger.info("[SIO] Telemetry enable")
        await self.copilot.pbcom.send_can_message(copilot.telemetry_enable_uuid, None)

    async def on_telemetry_disable(self, data: dict[str, Any] | None = None):
        """
        Callback on telemetry disable message.
        Forward to firmware.
        """
        logger.info("[SIO] Telemetry disable")
        await self.copilot.pbcom.send_can_message(copilot.telemetry_disable_uuid, None)

on_actuator_command(data) async #

Callback on actuator_command (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
116
117
118
119
120
121
122
123
124
125
126
127
async def on_actuator_command(self, data: dict[str, Any]):
    """
    Callback on actuator_command (from dashboard).
    Forward to mcu-firmware.
    """
    logger.info(f"[SIO] Actuator command: {data}")
    command = TypeAdapter(ActuatorCommand).validate_python(data)

    pb_command = PB_ActuatorCommand()
    if isinstance(command, PositionalActuatorCommand):
        command.pb_copy(pb_command.positional_actuator)
    await self.copilot.pbcom.send_can_message(copilot.actuator_command_uuid, pb_command)

on_actuator_init() async #

Callback on actuator_init (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
129
130
131
132
133
134
135
async def on_actuator_init(self):
    """
    Callback on actuator_init (from dashboard).
    Forward to mcu-firmware.
    """
    logger.info("[SIO] Actuator init")
    await self.copilot.pbcom.send_can_message(copilot.actuator_init_uuid, None)

on_actuators_start() async #

Callback on actuators_start (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
102
103
104
105
106
107
async def on_actuators_start(self):
    """
    Callback on actuators_start (from dashboard).
    Forward to mcu-firmware.
    """
    await self.copilot.pbcom.send_can_message(copilot.actuators_thread_start_uuid, None)

on_actuators_stop() async #

Callback on actuators_stop (from dashboard). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
109
110
111
112
113
114
async def on_actuators_stop(self):
    """
    Callback on actuators_stop (from dashboard).
    Forward to mcu-firmware.
    """
    await self.copilot.pbcom.send_can_message(copilot.actuators_thread_stop_uuid, None)

on_brake() async #

Callback on brake message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
170
171
172
173
174
175
176
async def on_brake(self):
    """
    Callback on brake message.
    Forward to firmware.
    """
    logger.info("[SIO] Brake")
    await self.copilot.pbcom.send_can_message(copilot.brake_uuid, None)

on_connect() async #

On connection to cogip-server.

Source code in cogip/tools/copilot/sio_events.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
async def on_connect(self):
    """
    On connection to cogip-server.
    """
    await asyncio.to_thread(
        polling2.poll,
        lambda: self.client.connected is True,
        step=1,
        poll_forever=True,
    )
    logger.info("Connected to cogip-server")
    await self.emit("connected")

    self.copilot.create_shared_memory()
    self.connected = True

on_connect_error(data) async #

On connection error, check if a Planner is already connected and exit, or retry connection.

Source code in cogip/tools/copilot/sio_events.py
56
57
58
59
60
61
62
63
64
65
async def on_connect_error(self, data: dict[str, Any]) -> None:
    """
    On connection error, check if a Planner is already connected and exit,
    or retry connection.
    """
    if isinstance(data, dict) and "message" in data:
        message = data["message"]
    else:
        message = data
    logger.error(f"Connection to cogip-server failed: {message}")

on_disconnect() async #

On disconnection from cogip-server.

Source code in cogip/tools/copilot/sio_events.py
48
49
50
51
52
53
54
async def on_disconnect(self) -> None:
    """
    On disconnection from cogip-server.
    """
    logger.info("Disconnected from cogip-server")
    self.connected = False
    await self.copilot.delete_shared_memory()

on_game_end() async #

Callback on game_end message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
154
155
156
157
158
159
160
async def on_game_end(self):
    """
    Callback on game_end message.
    Forward to firmware.
    """
    logger.info("[SIO] Game end")
    await self.copilot.pbcom.send_can_message(copilot.game_end_uuid, None)

on_game_reset() async #

Callback on game_reset message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
162
163
164
165
166
167
168
async def on_game_reset(self):
    """
    Callback on game_reset message.
    Forward to firmware.
    """
    logger.info("[SIO] Game reset")
    await self.copilot.pbcom.send_can_message(copilot.game_reset_uuid, None)

on_game_start() async #

Callback on game_start message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
146
147
148
149
150
151
152
async def on_game_start(self):
    """
    Callback on game_start message.
    Forward to firmware.
    """
    logger.info("[SIO] Game start")
    await self.copilot.pbcom.send_can_message(copilot.game_start_uuid, None)

on_get_parameter_value(data) async #

Callback on get_parameter_value. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
178
179
180
181
182
183
184
185
186
187
188
189
async def on_get_parameter_value(self, data: dict[str, Any]):
    """
    Callback on get_parameter_value.
    Forward to firmware.
    """
    logger.info(f"[SIO] Get parameter: {data}")

    parameter = FirmwareParameter.model_validate(data)
    pb_get_request = PB_ParameterGetRequest()
    parameter.pb_copy(pb_get_request)

    await self.copilot.pbcom.send_can_message(copilot.parameter_get_uuid, pb_get_request)

on_pose_order(data) async #

Callback on pose order (from planner). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
78
79
80
81
82
83
84
85
86
87
88
89
async def on_pose_order(self, data: dict[str, Any]):
    """
    Callback on pose order (from planner).
    Forward to mcu-firmware.
    """
    logger.info(f"[SIO] Pose order: {data}")
    pose_order = models.PathPose.model_validate(data)
    if self.copilot.id > 1:
        pose_order.motion_direction = models.MotionDirection.FORWARD_ONLY
    pb_pose_order = PB_PathPose()
    pose_order.copy_pb(pb_pose_order)
    await self.copilot.pbcom.send_can_message(copilot.pose_order_uuid, pb_pose_order)

on_pose_start(data) async #

Callback on pose start (from planner). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
67
68
69
70
71
72
73
74
75
76
async def on_pose_start(self, data: dict[str, Any]):
    """
    Callback on pose start (from planner).
    Forward to mcu-firmware.
    """
    logger.info(f"[SIO] Pose start: {data}")
    start_pose = models.PathPose.model_validate(data)
    pb_start_pose = PB_PathPose()
    start_pose.copy_pb(pb_start_pose)
    await self.copilot.pbcom.send_can_message(copilot.pose_start_uuid, pb_start_pose)

on_set_controller(controller) async #

Callback on set_controller message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
137
138
139
140
141
142
143
144
async def on_set_controller(self, controller: int):
    """
    Callback on set_controller message.
    Forward to firmware.
    """
    pb_controller = PB_Controller()
    pb_controller.id = controller
    await self.copilot.pbcom.send_can_message(copilot.controller_uuid, pb_controller)

on_set_parameter_value(data) async #

Callback on set_parameter_value. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
191
192
193
194
195
196
197
198
199
200
201
202
async def on_set_parameter_value(self, data: dict[str, Any]):
    """
    Callback on set_parameter_value.
    Forward to firmware.
    """
    logger.info(f"[SIO] Set parameter: {data}")

    parameter = FirmwareParameter.model_validate(data)
    pb_set_request = PB_ParameterSetRequest()
    parameter.pb_copy(pb_set_request)

    await self.copilot.pbcom.send_can_message(copilot.parameter_set_uuid, pb_set_request)

on_speed_order(data) async #

Callback on speed order (from calibration tool). Forward to mcu-firmware.

Source code in cogip/tools/copilot/sio_events.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def on_speed_order(self, data: dict[str, Any]):
    """
    Callback on speed order (from calibration tool).
    Forward to mcu-firmware.
    """
    logger.info(f"[SIO] Speed order: {data}")
    speed_order = models.SpeedOrder.model_validate(data)
    pb_speed_order = PB_SpeedOrder()
    speed_order.copy_pb(pb_speed_order)
    await self.copilot.pbcom.send_can_message(copilot.speed_order_uuid, pb_speed_order)

on_telemetry_disable(data=None) async #

Callback on telemetry disable message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
212
213
214
215
216
217
218
async def on_telemetry_disable(self, data: dict[str, Any] | None = None):
    """
    Callback on telemetry disable message.
    Forward to firmware.
    """
    logger.info("[SIO] Telemetry disable")
    await self.copilot.pbcom.send_can_message(copilot.telemetry_disable_uuid, None)

on_telemetry_enable(data=None) async #

Callback on telemetry enable message. Forward to firmware.

Source code in cogip/tools/copilot/sio_events.py
204
205
206
207
208
209
210
async def on_telemetry_enable(self, data: dict[str, Any] | None = None):
    """
    Callback on telemetry enable message.
    Forward to firmware.
    """
    logger.info("[SIO] Telemetry enable")
    await self.copilot.pbcom.send_can_message(copilot.telemetry_enable_uuid, None)