Skip to content

copilot

CopilotNamespace #

Bases: AsyncNamespace

Handle all SocketIO events related to copilot.

Source code in cogip/tools/server/namespaces/copilot.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class CopilotNamespace(socketio.AsyncNamespace):
    """
    Handle all SocketIO events related to copilot.
    """

    def __init__(self, cogip_server: "server.Server"):
        super().__init__("/copilot")
        self.cogip_server = cogip_server
        self.context = Context()
        self.context.copilot_sid = None

    async def on_connect(self, sid, environ):
        if self.context.copilot_sid:
            message = "A copilot is already connected"
            logger.error(f"Copilot connection refused: {message}")
            raise ConnectionRefusedError(message)

    async def on_connected(self, sid):
        logger.info("Copilot connected.")
        self.context.copilot_sid = sid
        if self.context.planner_sid:
            await self.emit("copilot_connected", namespace="/planner")

    async def on_disconnect(self, sid):
        self.context.copilot_sid = None
        self.context.shell_menu = None
        await self.emit("copilot_disconnected", namespace="/planner")
        logger.info("Copilot disconnected.")

    async def on_reset(self, sid) -> None:
        """
        Callback on reset event.
        """
        logger.info("[copilot => planner] reset.")
        await self.emit("reset", namespace="/planner")

    async def on_register_menu(self, sid, data: dict[str, Any]):
        """
        Callback on register_menu.
        """
        await self.cogip_server.register_menu("copilot", data)

    async def on_pose_reached(self, sid) -> None:
        """
        Callback on pose reached message.
        """
        logger.info("[copilot => planner] Pose reached.")
        await self.emit("pose_reached", namespace="/planner")
        if self.context.calibration_sid:
            await self.emit("pose_reached", namespace="/calibration")

    async def on_intermediate_pose_reached(self, sid) -> None:
        """
        Callback on intermediate pose reached message.
        """
        logger.info("[copilot => planner] Intermediate pose reached.")
        await self.emit("intermediate_pose_reached", namespace="/planner")

    async def on_blocked(self, sid) -> None:
        """
        Callback on blocked message.
        """
        logger.info("[copilot => planner] Blocked.")
        await self.emit("blocked", namespace="/planner")

    async def on_menu(self, sid, menu):
        """
        Callback on menu event.
        """
        self.context.shell_menu = models.ShellMenu.model_validate(menu)
        await self.emit("shell_menu", (self.context.robot_id, menu), namespace="/dashboard")

    async def on_state(self, sid, state):
        """
        Callback on state event.
        """
        await self.emit("state", (self.context.robot_id, state), namespace="/dashboard")

    async def on_actuator_state(self, sid, actuator_state: dict[str, Any]):
        """
        Callback on actuator_state message.
        """
        await self.emit("actuator_state", actuator_state, namespace="/planner")
        await self.emit("actuator_state", actuator_state, namespace="/dashboard")

    async def on_pid(self, sid, pid: dict[str, Any]):
        """
        Callback on pid message.
        """
        await self.emit("pid", pid, namespace="/dashboard")

    async def on_config(self, sid, config: dict[str, Any]):
        """
        Callback on config message.
        """
        await self.emit("config", config, namespace="/dashboard")

    async def on_game_end(self, sid) -> None:
        """
        Callback on game end message.
        """
        logger.info("[copilot => planner] Game end.")
        await self.emit("game_end", namespace="/planner")

    async def on_get_parameter_response(self, sid, response: dict[str, Any]) -> None:
        """
        Callback on get_parameter_response message from copilot.
        Forward to firmware parameter manager.
        """
        logger.info(f"[copilot => parameters] Get response: {response}")
        await self.emit("get_parameter_response", response, namespace="/parameters")

    async def on_set_parameter_response(self, sid, response: dict[str, Any]) -> None:
        """
        Callback on set_parameter_response message from copilot.
        Forward to firmware parameter manager.
        """
        logger.info(f"[copilot => parameters] Set response: {response}")
        await self.emit("set_parameter_response", response, namespace="/parameters")

    async def on_telemetry_data(self, sid, telemetry: dict[str, Any]) -> None:
        """
        Callback on telemetry_data message from copilot.
        Forward to telemetry clients.
        """
        logger.debug(f"[copilot => telemetry] Telemetry Data: {telemetry}")
        await self.emit("telemetry_data", telemetry, namespace="/telemetry")

on_actuator_state(sid, actuator_state) async #

Callback on actuator_state message.

Source code in cogip/tools/server/namespaces/copilot.py
89
90
91
92
93
94
async def on_actuator_state(self, sid, actuator_state: dict[str, Any]):
    """
    Callback on actuator_state message.
    """
    await self.emit("actuator_state", actuator_state, namespace="/planner")
    await self.emit("actuator_state", actuator_state, namespace="/dashboard")

on_blocked(sid) async #

Callback on blocked message.

Source code in cogip/tools/server/namespaces/copilot.py
69
70
71
72
73
74
async def on_blocked(self, sid) -> None:
    """
    Callback on blocked message.
    """
    logger.info("[copilot => planner] Blocked.")
    await self.emit("blocked", namespace="/planner")

on_config(sid, config) async #

Callback on config message.

Source code in cogip/tools/server/namespaces/copilot.py
102
103
104
105
106
async def on_config(self, sid, config: dict[str, Any]):
    """
    Callback on config message.
    """
    await self.emit("config", config, namespace="/dashboard")

on_game_end(sid) async #

Callback on game end message.

Source code in cogip/tools/server/namespaces/copilot.py
108
109
110
111
112
113
async def on_game_end(self, sid) -> None:
    """
    Callback on game end message.
    """
    logger.info("[copilot => planner] Game end.")
    await self.emit("game_end", namespace="/planner")

on_get_parameter_response(sid, response) async #

Callback on get_parameter_response message from copilot. Forward to firmware parameter manager.

Source code in cogip/tools/server/namespaces/copilot.py
115
116
117
118
119
120
121
async def on_get_parameter_response(self, sid, response: dict[str, Any]) -> None:
    """
    Callback on get_parameter_response message from copilot.
    Forward to firmware parameter manager.
    """
    logger.info(f"[copilot => parameters] Get response: {response}")
    await self.emit("get_parameter_response", response, namespace="/parameters")

on_intermediate_pose_reached(sid) async #

Callback on intermediate pose reached message.

Source code in cogip/tools/server/namespaces/copilot.py
62
63
64
65
66
67
async def on_intermediate_pose_reached(self, sid) -> None:
    """
    Callback on intermediate pose reached message.
    """
    logger.info("[copilot => planner] Intermediate pose reached.")
    await self.emit("intermediate_pose_reached", namespace="/planner")

on_menu(sid, menu) async #

Callback on menu event.

Source code in cogip/tools/server/namespaces/copilot.py
76
77
78
79
80
81
async def on_menu(self, sid, menu):
    """
    Callback on menu event.
    """
    self.context.shell_menu = models.ShellMenu.model_validate(menu)
    await self.emit("shell_menu", (self.context.robot_id, menu), namespace="/dashboard")

on_pid(sid, pid) async #

Callback on pid message.

Source code in cogip/tools/server/namespaces/copilot.py
 96
 97
 98
 99
100
async def on_pid(self, sid, pid: dict[str, Any]):
    """
    Callback on pid message.
    """
    await self.emit("pid", pid, namespace="/dashboard")

on_pose_reached(sid) async #

Callback on pose reached message.

Source code in cogip/tools/server/namespaces/copilot.py
53
54
55
56
57
58
59
60
async def on_pose_reached(self, sid) -> None:
    """
    Callback on pose reached message.
    """
    logger.info("[copilot => planner] Pose reached.")
    await self.emit("pose_reached", namespace="/planner")
    if self.context.calibration_sid:
        await self.emit("pose_reached", namespace="/calibration")

on_register_menu(sid, data) async #

Callback on register_menu.

Source code in cogip/tools/server/namespaces/copilot.py
47
48
49
50
51
async def on_register_menu(self, sid, data: dict[str, Any]):
    """
    Callback on register_menu.
    """
    await self.cogip_server.register_menu("copilot", data)

on_reset(sid) async #

Callback on reset event.

Source code in cogip/tools/server/namespaces/copilot.py
40
41
42
43
44
45
async def on_reset(self, sid) -> None:
    """
    Callback on reset event.
    """
    logger.info("[copilot => planner] reset.")
    await self.emit("reset", namespace="/planner")

on_set_parameter_response(sid, response) async #

Callback on set_parameter_response message from copilot. Forward to firmware parameter manager.

Source code in cogip/tools/server/namespaces/copilot.py
123
124
125
126
127
128
129
async def on_set_parameter_response(self, sid, response: dict[str, Any]) -> None:
    """
    Callback on set_parameter_response message from copilot.
    Forward to firmware parameter manager.
    """
    logger.info(f"[copilot => parameters] Set response: {response}")
    await self.emit("set_parameter_response", response, namespace="/parameters")

on_state(sid, state) async #

Callback on state event.

Source code in cogip/tools/server/namespaces/copilot.py
83
84
85
86
87
async def on_state(self, sid, state):
    """
    Callback on state event.
    """
    await self.emit("state", (self.context.robot_id, state), namespace="/dashboard")

on_telemetry_data(sid, telemetry) async #

Callback on telemetry_data message from copilot. Forward to telemetry clients.

Source code in cogip/tools/server/namespaces/copilot.py
131
132
133
134
135
136
137
async def on_telemetry_data(self, sid, telemetry: dict[str, Any]) -> None:
    """
    Callback on telemetry_data message from copilot.
    Forward to telemetry clients.
    """
    logger.debug(f"[copilot => telemetry] Telemetry Data: {telemetry}")
    await self.emit("telemetry_data", telemetry, namespace="/telemetry")