Skip to content

detector

DetectorNamespace #

Bases: AsyncNamespace

Handle all SocketIO events related to detector.

Source code in cogip/tools/server/namespaces/detector.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class DetectorNamespace(socketio.AsyncNamespace):
    """
    Handle all SocketIO events related to detector.
    """

    def __init__(self, cogip_server: "server.Server"):
        super().__init__("/detector")
        self.cogip_server = cogip_server
        self.context = Context()
        self.recorder = GameRecorder()

    async def on_connect(self, sid, environ):
        if self.context.detector_sid:
            message = "A detector is already connected"
            logger.error(f"Detector connection refused: {message}")
            raise ConnectionRefusedError(message)

        self.context.detector_sid = sid

    async def on_connected(self, sid):
        logger.info("Detector connected.")
        if self.context.virtual:
            await self.emit("start_sensors_emulation", self.context.robot_id, namespace="/monitor")

    async def on_disconnect(self, sid):
        if self.context.virtual:
            await self.emit("stop_sensors_emulation", self.context.robot_id, namespace="/monitor")
        self.context.detector_sid = None
        logger.info("Detector disconnected.")

    async def on_register_menu(self, sid, data: dict[str, Any]):
        """
        Callback on register_menu.
        """
        await self.cogip_server.register_menu("detector", data)

    async def on_obstacles(self, sid, obstacles: list[dict[str, Any]]):
        """
        Callback on obstacles message.

        Receive a list of obstacles, computed from sensors data by the Detector.
        These obstacles are sent to planner to compute avoidance path.
        """
        await self.emit("obstacles", obstacles, namespace="/planner")
        await self.recorder.async_record({"obstacles": obstacles})

    async def on_config(self, sid, config: dict[str, Any]):
        """
        Callback on config message.
        """
        await self.emit("config", config, namespace="/dashboard")

on_config(sid, config) async #

Callback on config message.

Source code in cogip/tools/server/namespaces/detector.py
56
57
58
59
60
async def on_config(self, sid, config: dict[str, Any]):
    """
    Callback on config message.
    """
    await self.emit("config", config, namespace="/dashboard")

on_obstacles(sid, obstacles) async #

Callback on obstacles message.

Receive a list of obstacles, computed from sensors data by the Detector. These obstacles are sent to planner to compute avoidance path.

Source code in cogip/tools/server/namespaces/detector.py
46
47
48
49
50
51
52
53
54
async def on_obstacles(self, sid, obstacles: list[dict[str, Any]]):
    """
    Callback on obstacles message.

    Receive a list of obstacles, computed from sensors data by the Detector.
    These obstacles are sent to planner to compute avoidance path.
    """
    await self.emit("obstacles", obstacles, namespace="/planner")
    await self.recorder.async_record({"obstacles": obstacles})

on_register_menu(sid, data) async #

Callback on register_menu.

Source code in cogip/tools/server/namespaces/detector.py
40
41
42
43
44
async def on_register_menu(self, sid, data: dict[str, Any]):
    """
    Callback on register_menu.
    """
    await self.cogip_server.register_menu("detector", data)