Skip to content

planner

PlannerNamespace #

Bases: AsyncNamespace

Handle all SocketIO events related to planner.

Source code in cogip/tools/server/namespaces/planner.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class PlannerNamespace(socketio.AsyncNamespace):
    """
    Handle all SocketIO events related to planner.
    """

    def __init__(self, cogip_server: "server.Server"):
        super().__init__("/planner")
        self.cogip_server = cogip_server
        self.context = Context()
        self.connected = False
        self.recorder = GameRecorder()
        self.context.planner_sid = None

    async def on_connect(self, sid, environ):
        if self.context.planner_sid:
            logger.error("Planner connection refused: a planner is already connected")
            raise ConnectionRefusedError("A planner is already connected")
        self.context.planner_sid = sid

    async def on_connected(self, sid):
        logger.info("Planner connected.")
        if self.context.copilot_sid:
            await self.emit("copilot_connected", namespace="/planner")

    def on_disconnect(self, sid):
        self.context.planner_sid = None
        logger.info("Planner disconnected.")

    async def on_register_menu(self, sid, data: dict[str, Any]):
        """
        Callback on register_menu.
        """
        await self.cogip_server.register_menu("planner", data)

    async def on_pose_start(self, sid, pose: dict[str, Any]):
        """
        Callback on pose start.
        Forward to pose to copilot.
        """
        await self.emit("pose_start", pose, namespace="/copilot")

    async def on_pose_order(self, sid, pose: dict[str, Any]):
        """
        Callback on pose order.
        Forward to pose to copilot and dashboards.
        """
        await self.emit("pose_order", pose, namespace="/copilot")
        await self.emit("pose_order", (self.context.robot_id, pose), namespace="/dashboard")
        await self.recorder.async_record({"pose_order": pose})

    async def on_obstacles(self, sid, obstacles: list[dict[str, Any]]):
        """
        Callback on obstacles message.

        Receive a list of all obstacles.
        These obstacles are sent to planner to monitor/dashboards for display.
        """
        await self.emit("obstacles", obstacles, namespace="/dashboard")

    async def on_wizard(self, sid, message: list[dict[str, Any]]):
        """
        Callback on wizard message.
        Forward to dashboard.
        """
        message["namespace"] = "/planner"
        await self.emit("wizard", message, namespace="/dashboard")

    async def on_set_controller(self, sid, controller: int):
        """
        Callback on set_controller message.
        Forward to copilot.
        """
        await self.emit("set_controller", controller, namespace="/copilot")

    async def on_path(self, sid, path: list[dict[str, float]]):
        """
        Callback on robot path.
        Forward the path to dashboard.
        """
        await self.emit("path", (self.context.robot_id, path), namespace="/dashboard")
        await self.recorder.async_record({"pose_order": path})

    async def on_config(self, sid, config: dict[str, Any]):
        """
        Callback on config message.
        """
        await self.emit("config", config, namespace="/dashboard")

    async def on_cmd_reset(self, sid):
        """
        Callback on cmd_reset message.
        """
        await self.emit("cmd_reset", namespace="/monitor")

    async def on_starter_changed(self, sid, pushed: bool):
        """
        Callback on starter_pushed message.
        """
        await self.emit("starter_changed", (self.context.robot_id, pushed), namespace="/monitor")

    async def on_close_wizard(self, sid):
        """
        Callback on close_wizard message.
        """
        await self.emit("close_wizard", namespace="/dashboard")

    async def on_game_start(self, sid):
        """
        Callback on game_start message.
        """
        await self.emit("game_start", namespace="/copilot")

    async def on_game_end(self, sid):
        """
        Callback on game_end message.
        """
        await self.emit("game_end", namespace="/copilot")

    async def on_robot_end(self, sid):
        """
        Callback on robot_end message.
        """
        await self.emit("game_end", namespace="/copilot")

    async def on_game_reset(self, sid):
        """
        Callback on game_reset message.
        """
        await self.emit("game_reset", namespace="/copilot")

    async def on_score(self, sid, score: int):
        """
        Callback on score message.
        """
        await self.emit("score", score, namespace="/dashboard")

    async def on_actuator_command(self, sid, data):
        """
        Callback on actuator_command message.
        """
        await self.emit("actuator_command", data, namespace="/copilot")

    async def on_start_video_record(self, sid):
        """
        Callback on start_video_record message.
        """
        await self.emit("start_video_record", namespace="/robotcam")

    async def on_stop_video_record(self, sid):
        """
        Callback on stop_video_record message.
        """
        await self.emit("stop_video_record", namespace="/robotcam")

    async def on_brake(self, sid):
        """
        Callback on brake message.
        """
        await self.emit("brake", namespace="/copilot")

    async def on_pami_reset(self, sid):
        """
        Callback on pami_reset message.
        """
        await self.emit("pami_reset", namespace="/beacon")

    async def on_pami_camp(self, sid, data):
        """
        Callback on pami_camp message.
        """
        await self.emit("pami_camp", data, namespace="/beacon")

    async def on_pami_table(self, sid, data):
        """
        Callback on pami_table message.
        """
        await self.emit("pami_table", data, namespace="/beacon")

    async def on_pami_play(self, sid):
        """
        Callback on pami_play message.
        """
        await self.emit("pami_play", namespace="/beacon")

on_actuator_command(sid, data) async #

Callback on actuator_command message.

Source code in cogip/tools/server/namespaces/planner.py
146
147
148
149
150
async def on_actuator_command(self, sid, data):
    """
    Callback on actuator_command message.
    """
    await self.emit("actuator_command", data, namespace="/copilot")

on_brake(sid) async #

Callback on brake message.

Source code in cogip/tools/server/namespaces/planner.py
164
165
166
167
168
async def on_brake(self, sid):
    """
    Callback on brake message.
    """
    await self.emit("brake", namespace="/copilot")

on_close_wizard(sid) async #

Callback on close_wizard message.

Source code in cogip/tools/server/namespaces/planner.py
110
111
112
113
114
async def on_close_wizard(self, sid):
    """
    Callback on close_wizard message.
    """
    await self.emit("close_wizard", namespace="/dashboard")

on_cmd_reset(sid) async #

Callback on cmd_reset message.

Source code in cogip/tools/server/namespaces/planner.py
 98
 99
100
101
102
async def on_cmd_reset(self, sid):
    """
    Callback on cmd_reset message.
    """
    await self.emit("cmd_reset", namespace="/monitor")

on_config(sid, config) async #

Callback on config message.

Source code in cogip/tools/server/namespaces/planner.py
92
93
94
95
96
async def on_config(self, sid, config: dict[str, Any]):
    """
    Callback on config message.
    """
    await self.emit("config", config, namespace="/dashboard")

on_game_end(sid) async #

Callback on game_end message.

Source code in cogip/tools/server/namespaces/planner.py
122
123
124
125
126
async def on_game_end(self, sid):
    """
    Callback on game_end message.
    """
    await self.emit("game_end", namespace="/copilot")

on_game_reset(sid) async #

Callback on game_reset message.

Source code in cogip/tools/server/namespaces/planner.py
134
135
136
137
138
async def on_game_reset(self, sid):
    """
    Callback on game_reset message.
    """
    await self.emit("game_reset", namespace="/copilot")

on_game_start(sid) async #

Callback on game_start message.

Source code in cogip/tools/server/namespaces/planner.py
116
117
118
119
120
async def on_game_start(self, sid):
    """
    Callback on game_start message.
    """
    await self.emit("game_start", namespace="/copilot")

on_obstacles(sid, obstacles) async #

Callback on obstacles message.

Receive a list of all obstacles. These obstacles are sent to planner to monitor/dashboards for display.

Source code in cogip/tools/server/namespaces/planner.py
60
61
62
63
64
65
66
67
async def on_obstacles(self, sid, obstacles: list[dict[str, Any]]):
    """
    Callback on obstacles message.

    Receive a list of all obstacles.
    These obstacles are sent to planner to monitor/dashboards for display.
    """
    await self.emit("obstacles", obstacles, namespace="/dashboard")

on_pami_camp(sid, data) async #

Callback on pami_camp message.

Source code in cogip/tools/server/namespaces/planner.py
176
177
178
179
180
async def on_pami_camp(self, sid, data):
    """
    Callback on pami_camp message.
    """
    await self.emit("pami_camp", data, namespace="/beacon")

on_pami_play(sid) async #

Callback on pami_play message.

Source code in cogip/tools/server/namespaces/planner.py
188
189
190
191
192
async def on_pami_play(self, sid):
    """
    Callback on pami_play message.
    """
    await self.emit("pami_play", namespace="/beacon")

on_pami_reset(sid) async #

Callback on pami_reset message.

Source code in cogip/tools/server/namespaces/planner.py
170
171
172
173
174
async def on_pami_reset(self, sid):
    """
    Callback on pami_reset message.
    """
    await self.emit("pami_reset", namespace="/beacon")

on_pami_table(sid, data) async #

Callback on pami_table message.

Source code in cogip/tools/server/namespaces/planner.py
182
183
184
185
186
async def on_pami_table(self, sid, data):
    """
    Callback on pami_table message.
    """
    await self.emit("pami_table", data, namespace="/beacon")

on_path(sid, path) async #

Callback on robot path. Forward the path to dashboard.

Source code in cogip/tools/server/namespaces/planner.py
84
85
86
87
88
89
90
async def on_path(self, sid, path: list[dict[str, float]]):
    """
    Callback on robot path.
    Forward the path to dashboard.
    """
    await self.emit("path", (self.context.robot_id, path), namespace="/dashboard")
    await self.recorder.async_record({"pose_order": path})

on_pose_order(sid, pose) async #

Callback on pose order. Forward to pose to copilot and dashboards.

Source code in cogip/tools/server/namespaces/planner.py
51
52
53
54
55
56
57
58
async def on_pose_order(self, sid, pose: dict[str, Any]):
    """
    Callback on pose order.
    Forward to pose to copilot and dashboards.
    """
    await self.emit("pose_order", pose, namespace="/copilot")
    await self.emit("pose_order", (self.context.robot_id, pose), namespace="/dashboard")
    await self.recorder.async_record({"pose_order": pose})

on_pose_start(sid, pose) async #

Callback on pose start. Forward to pose to copilot.

Source code in cogip/tools/server/namespaces/planner.py
44
45
46
47
48
49
async def on_pose_start(self, sid, pose: dict[str, Any]):
    """
    Callback on pose start.
    Forward to pose to copilot.
    """
    await self.emit("pose_start", pose, namespace="/copilot")

on_register_menu(sid, data) async #

Callback on register_menu.

Source code in cogip/tools/server/namespaces/planner.py
38
39
40
41
42
async def on_register_menu(self, sid, data: dict[str, Any]):
    """
    Callback on register_menu.
    """
    await self.cogip_server.register_menu("planner", data)

on_robot_end(sid) async #

Callback on robot_end message.

Source code in cogip/tools/server/namespaces/planner.py
128
129
130
131
132
async def on_robot_end(self, sid):
    """
    Callback on robot_end message.
    """
    await self.emit("game_end", namespace="/copilot")

on_score(sid, score) async #

Callback on score message.

Source code in cogip/tools/server/namespaces/planner.py
140
141
142
143
144
async def on_score(self, sid, score: int):
    """
    Callback on score message.
    """
    await self.emit("score", score, namespace="/dashboard")

on_set_controller(sid, controller) async #

Callback on set_controller message. Forward to copilot.

Source code in cogip/tools/server/namespaces/planner.py
77
78
79
80
81
82
async def on_set_controller(self, sid, controller: int):
    """
    Callback on set_controller message.
    Forward to copilot.
    """
    await self.emit("set_controller", controller, namespace="/copilot")

on_start_video_record(sid) async #

Callback on start_video_record message.

Source code in cogip/tools/server/namespaces/planner.py
152
153
154
155
156
async def on_start_video_record(self, sid):
    """
    Callback on start_video_record message.
    """
    await self.emit("start_video_record", namespace="/robotcam")

on_starter_changed(sid, pushed) async #

Callback on starter_pushed message.

Source code in cogip/tools/server/namespaces/planner.py
104
105
106
107
108
async def on_starter_changed(self, sid, pushed: bool):
    """
    Callback on starter_pushed message.
    """
    await self.emit("starter_changed", (self.context.robot_id, pushed), namespace="/monitor")

on_stop_video_record(sid) async #

Callback on stop_video_record message.

Source code in cogip/tools/server/namespaces/planner.py
158
159
160
161
162
async def on_stop_video_record(self, sid):
    """
    Callback on stop_video_record message.
    """
    await self.emit("stop_video_record", namespace="/robotcam")

on_wizard(sid, message) async #

Callback on wizard message. Forward to dashboard.

Source code in cogip/tools/server/namespaces/planner.py
69
70
71
72
73
74
75
async def on_wizard(self, sid, message: list[dict[str, Any]]):
    """
    Callback on wizard message.
    Forward to dashboard.
    """
    message["namespace"] = "/planner"
    await self.emit("wizard", message, namespace="/dashboard")