Skip to content

planner

PlannerNamespace #

Bases: AsyncNamespace

Handle all SocketIO events related to planner.

Source code in cogip/tools/server/namespaces/planner.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class PlannerNamespace(socketio.AsyncNamespace):
    """
    Handle all SocketIO events related to planner.
    """

    def __init__(self, cogip_server: "server.Server"):
        super().__init__("/planner")
        self.cogip_server = cogip_server
        self.context = Context()
        self.connected = False
        self.recorder = GameRecorder()
        self.context.planner_sid = None

    async def on_connect(self, sid, environ):
        if self.context.planner_sid:
            logger.error("Planner connection refused: a planner is already connected")
            raise ConnectionRefusedError("A planner is already connected")
        self.context.planner_sid = sid

    async def on_connected(self, sid):
        logger.info("Planner connected.")
        if self.context.copilot_sid:
            await self.emit("copilot_connected", namespace="/planner")

    def on_disconnect(self, sid):
        self.context.planner_sid = None
        logger.info("Planner disconnected.")

    async def on_register_menu(self, sid, data: dict[str, Any]):
        """
        Callback on register_menu.
        """
        await self.cogip_server.register_menu("planner", data)

    async def on_pose_start(self, sid, pose: dict[str, Any]):
        """
        Callback on pose start.
        Forward to pose to copilot.
        """
        await self.emit("pose_start", pose, namespace="/copilot")

    async def on_pose_order(self, sid, pose: dict[str, Any]):
        """
        Callback on pose order.
        Forward to pose to copilot and dashboards.
        """
        await self.emit("pose_order", pose, namespace="/copilot")
        await self.emit("pose_order", (self.context.robot_id, pose), namespace="/dashboard")
        await self.recorder.async_record({"pose_order": pose})

    async def on_wizard(self, sid, message: list[dict[str, Any]]):
        """
        Callback on wizard message.
        Forward to dashboard.
        """
        message["namespace"] = "/planner"
        await self.emit("wizard", message, namespace="/dashboard")

    async def on_set_controller(self, sid, controller: int):
        """
        Callback on set_controller message.
        Forward to copilot.
        """
        await self.emit("set_controller", controller, namespace="/copilot")

    async def on_path(self, sid, path: list[dict[str, float]]):
        """
        Callback on robot path.
        Forward the path to dashboard.
        """
        await self.emit("path", (self.context.robot_id, path), namespace="/dashboard")
        await self.recorder.async_record({"pose_order": path})

    async def on_config(self, sid, config: dict[str, Any]):
        """
        Callback on config message.
        """
        await self.emit("config", config, namespace="/dashboard")

    async def on_cmd_reset(self, sid):
        """
        Callback on cmd_reset message.
        """
        await self.emit("cmd_reset", namespace="/monitor")

    async def on_starter_changed(self, sid, pushed: bool):
        """
        Callback on starter_pushed message.
        """
        await self.emit("starter_changed", (self.context.robot_id, pushed), namespace="/dashboard")

    async def on_close_wizard(self, sid):
        """
        Callback on close_wizard message.
        """
        await self.emit("close_wizard", namespace="/dashboard")

    async def on_game_start(self, sid):
        """
        Callback on game_start message.
        """
        await self.emit("game_start", namespace="/copilot")

    async def on_game_end(self, sid):
        """
        Callback on game_end message.
        """
        await self.emit("game_end", namespace="/copilot")

    async def on_robot_end(self, sid):
        """
        Callback on robot_end message.
        """
        await self.emit("game_end", namespace="/copilot")

    async def on_game_reset(self, sid):
        """
        Callback on game_reset message.
        """
        await self.emit("game_reset", namespace="/copilot")

    async def on_score(self, sid, score: int):
        """
        Callback on score message.
        """
        await self.emit("score", score, namespace="/dashboard")

    async def on_actuator_command(self, sid, data):
        """
        Callback on actuator_command message.
        """
        await self.emit("actuator_command", data, namespace="/copilot")

    async def on_start_video_record(self, sid):
        """
        Callback on start_video_record message.
        """
        await self.emit("start_video_record", namespace="/robotcam")

    async def on_stop_video_record(self, sid):
        """
        Callback on stop_video_record message.
        """
        await self.emit("stop_video_record", namespace="/robotcam")

    async def on_brake(self, sid):
        """
        Callback on brake message.
        """
        await self.emit("brake", namespace="/copilot")

    async def on_pami_reset(self, sid):
        """
        Callback on pami_reset message.
        """
        await self.emit("pami_reset", namespace="/beacon")

    async def on_pami_camp(self, sid, data):
        """
        Callback on pami_camp message.
        """
        await self.emit("pami_camp", data, namespace="/beacon")

    async def on_pami_table(self, sid, data):
        """
        Callback on pami_table message.
        """
        await self.emit("pami_table", data, namespace="/beacon")

    async def on_pami_play(self, sid):
        """
        Callback on pami_play message.
        """
        await self.emit("pami_play", namespace="/beacon")

on_actuator_command(sid, data) async #

Callback on actuator_command message.

Source code in cogip/tools/server/namespaces/planner.py
137
138
139
140
141
async def on_actuator_command(self, sid, data):
    """
    Callback on actuator_command message.
    """
    await self.emit("actuator_command", data, namespace="/copilot")

on_brake(sid) async #

Callback on brake message.

Source code in cogip/tools/server/namespaces/planner.py
155
156
157
158
159
async def on_brake(self, sid):
    """
    Callback on brake message.
    """
    await self.emit("brake", namespace="/copilot")

on_close_wizard(sid) async #

Callback on close_wizard message.

Source code in cogip/tools/server/namespaces/planner.py
101
102
103
104
105
async def on_close_wizard(self, sid):
    """
    Callback on close_wizard message.
    """
    await self.emit("close_wizard", namespace="/dashboard")

on_cmd_reset(sid) async #

Callback on cmd_reset message.

Source code in cogip/tools/server/namespaces/planner.py
89
90
91
92
93
async def on_cmd_reset(self, sid):
    """
    Callback on cmd_reset message.
    """
    await self.emit("cmd_reset", namespace="/monitor")

on_config(sid, config) async #

Callback on config message.

Source code in cogip/tools/server/namespaces/planner.py
83
84
85
86
87
async def on_config(self, sid, config: dict[str, Any]):
    """
    Callback on config message.
    """
    await self.emit("config", config, namespace="/dashboard")

on_game_end(sid) async #

Callback on game_end message.

Source code in cogip/tools/server/namespaces/planner.py
113
114
115
116
117
async def on_game_end(self, sid):
    """
    Callback on game_end message.
    """
    await self.emit("game_end", namespace="/copilot")

on_game_reset(sid) async #

Callback on game_reset message.

Source code in cogip/tools/server/namespaces/planner.py
125
126
127
128
129
async def on_game_reset(self, sid):
    """
    Callback on game_reset message.
    """
    await self.emit("game_reset", namespace="/copilot")

on_game_start(sid) async #

Callback on game_start message.

Source code in cogip/tools/server/namespaces/planner.py
107
108
109
110
111
async def on_game_start(self, sid):
    """
    Callback on game_start message.
    """
    await self.emit("game_start", namespace="/copilot")

on_pami_camp(sid, data) async #

Callback on pami_camp message.

Source code in cogip/tools/server/namespaces/planner.py
167
168
169
170
171
async def on_pami_camp(self, sid, data):
    """
    Callback on pami_camp message.
    """
    await self.emit("pami_camp", data, namespace="/beacon")

on_pami_play(sid) async #

Callback on pami_play message.

Source code in cogip/tools/server/namespaces/planner.py
179
180
181
182
183
async def on_pami_play(self, sid):
    """
    Callback on pami_play message.
    """
    await self.emit("pami_play", namespace="/beacon")

on_pami_reset(sid) async #

Callback on pami_reset message.

Source code in cogip/tools/server/namespaces/planner.py
161
162
163
164
165
async def on_pami_reset(self, sid):
    """
    Callback on pami_reset message.
    """
    await self.emit("pami_reset", namespace="/beacon")

on_pami_table(sid, data) async #

Callback on pami_table message.

Source code in cogip/tools/server/namespaces/planner.py
173
174
175
176
177
async def on_pami_table(self, sid, data):
    """
    Callback on pami_table message.
    """
    await self.emit("pami_table", data, namespace="/beacon")

on_path(sid, path) async #

Callback on robot path. Forward the path to dashboard.

Source code in cogip/tools/server/namespaces/planner.py
75
76
77
78
79
80
81
async def on_path(self, sid, path: list[dict[str, float]]):
    """
    Callback on robot path.
    Forward the path to dashboard.
    """
    await self.emit("path", (self.context.robot_id, path), namespace="/dashboard")
    await self.recorder.async_record({"pose_order": path})

on_pose_order(sid, pose) async #

Callback on pose order. Forward to pose to copilot and dashboards.

Source code in cogip/tools/server/namespaces/planner.py
51
52
53
54
55
56
57
58
async def on_pose_order(self, sid, pose: dict[str, Any]):
    """
    Callback on pose order.
    Forward to pose to copilot and dashboards.
    """
    await self.emit("pose_order", pose, namespace="/copilot")
    await self.emit("pose_order", (self.context.robot_id, pose), namespace="/dashboard")
    await self.recorder.async_record({"pose_order": pose})

on_pose_start(sid, pose) async #

Callback on pose start. Forward to pose to copilot.

Source code in cogip/tools/server/namespaces/planner.py
44
45
46
47
48
49
async def on_pose_start(self, sid, pose: dict[str, Any]):
    """
    Callback on pose start.
    Forward to pose to copilot.
    """
    await self.emit("pose_start", pose, namespace="/copilot")

on_register_menu(sid, data) async #

Callback on register_menu.

Source code in cogip/tools/server/namespaces/planner.py
38
39
40
41
42
async def on_register_menu(self, sid, data: dict[str, Any]):
    """
    Callback on register_menu.
    """
    await self.cogip_server.register_menu("planner", data)

on_robot_end(sid) async #

Callback on robot_end message.

Source code in cogip/tools/server/namespaces/planner.py
119
120
121
122
123
async def on_robot_end(self, sid):
    """
    Callback on robot_end message.
    """
    await self.emit("game_end", namespace="/copilot")

on_score(sid, score) async #

Callback on score message.

Source code in cogip/tools/server/namespaces/planner.py
131
132
133
134
135
async def on_score(self, sid, score: int):
    """
    Callback on score message.
    """
    await self.emit("score", score, namespace="/dashboard")

on_set_controller(sid, controller) async #

Callback on set_controller message. Forward to copilot.

Source code in cogip/tools/server/namespaces/planner.py
68
69
70
71
72
73
async def on_set_controller(self, sid, controller: int):
    """
    Callback on set_controller message.
    Forward to copilot.
    """
    await self.emit("set_controller", controller, namespace="/copilot")

on_start_video_record(sid) async #

Callback on start_video_record message.

Source code in cogip/tools/server/namespaces/planner.py
143
144
145
146
147
async def on_start_video_record(self, sid):
    """
    Callback on start_video_record message.
    """
    await self.emit("start_video_record", namespace="/robotcam")

on_starter_changed(sid, pushed) async #

Callback on starter_pushed message.

Source code in cogip/tools/server/namespaces/planner.py
95
96
97
98
99
async def on_starter_changed(self, sid, pushed: bool):
    """
    Callback on starter_pushed message.
    """
    await self.emit("starter_changed", (self.context.robot_id, pushed), namespace="/dashboard")

on_stop_video_record(sid) async #

Callback on stop_video_record message.

Source code in cogip/tools/server/namespaces/planner.py
149
150
151
152
153
async def on_stop_video_record(self, sid):
    """
    Callback on stop_video_record message.
    """
    await self.emit("stop_video_record", namespace="/robotcam")

on_wizard(sid, message) async #

Callback on wizard message. Forward to dashboard.

Source code in cogip/tools/server/namespaces/planner.py
60
61
62
63
64
65
66
async def on_wizard(self, sid, message: list[dict[str, Any]]):
    """
    Callback on wizard message.
    Forward to dashboard.
    """
    message["namespace"] = "/planner"
    await self.emit("wizard", message, namespace="/dashboard")