Skip to content

sio_events

SioEvents #

Bases: ClientNamespace

Handle all SocketIO events received by Detector.

Source code in cogip/tools/detector_pami/sio_events.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class SioEvents(socketio.ClientNamespace):
    """
    Handle all SocketIO events received by Detector.
    """

    def __init__(self, detector: "detector.Detector"):
        super().__init__("/detector")
        self._detector = detector

    def on_connect(self):
        """
        On connection to cogip-server, start detector threads.
        """
        polling2.poll(lambda: self.client.connected is True, step=0.2, poll_forever=True)
        logger.info("Connected to cogip-server")
        self.emit("connected")
        self.emit("register_menu", {"name": "detector", "menu": menu.model_dump()})
        self._detector.start()

    def on_disconnect(self) -> None:
        """
        On disconnection from cogip-server, stop detector threads.
        """
        logger.info("Disconnected from cogip-server")
        self._detector.stop()

    def on_connect_error(self, data: dict[str, Any]) -> None:
        """
        On connection error, check if a Detector is already connected and exit,
        or retry connection.
        """
        logger.error(f"Connect to cogip-server error: {data = }")
        if (
            data
            and isinstance(data, dict)
            and (message := data.get("message"))
            and message == "A detector is already connected"
        ):
            logger.error(message)
            self._detector.retry_connection = False
            return

    def on_command(self, cmd: str) -> None:
        """
        Callback on command message from dashboard.
        """
        if cmd == "config":
            # Get JSON Schema
            schema = self._detector.properties.model_json_schema()
            # Add namespace in JSON Schema
            schema["namespace"] = "/detector"
            # Add current values in JSON Schema
            for prop, value in self._detector.properties.model_dump().items():
                schema["properties"][prop]["value"] = value
            # Send config
            self.emit("config", schema)
        else:
            logger.warning(f"Unknown command: {cmd}")

    def on_config_updated(self, config: dict[str, Any]) -> None:
        self._detector.properties.__setattr__(name := config["name"], config["value"])
        if name == "refresh_interval":
            self._detector.update_refresh_interval()

    def on_pose_current(self, data: dict[str, Any]) -> None:
        """
        Callback on robot pose message.
        """
        self._detector.robot_pose = models.Pose.model_validate(data)

    def on_sensors_data(self, data: list[int]) -> None:
        """
        Callback on sensors data.
        """
        logger.debug("Received sensors data")
        self._detector.update_sensors_data(data)

on_command(cmd) #

Callback on command message from dashboard.

Source code in cogip/tools/detector_pami/sio_events.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def on_command(self, cmd: str) -> None:
    """
    Callback on command message from dashboard.
    """
    if cmd == "config":
        # Get JSON Schema
        schema = self._detector.properties.model_json_schema()
        # Add namespace in JSON Schema
        schema["namespace"] = "/detector"
        # Add current values in JSON Schema
        for prop, value in self._detector.properties.model_dump().items():
            schema["properties"][prop]["value"] = value
        # Send config
        self.emit("config", schema)
    else:
        logger.warning(f"Unknown command: {cmd}")

on_connect() #

On connection to cogip-server, start detector threads.

Source code in cogip/tools/detector_pami/sio_events.py
20
21
22
23
24
25
26
27
28
def on_connect(self):
    """
    On connection to cogip-server, start detector threads.
    """
    polling2.poll(lambda: self.client.connected is True, step=0.2, poll_forever=True)
    logger.info("Connected to cogip-server")
    self.emit("connected")
    self.emit("register_menu", {"name": "detector", "menu": menu.model_dump()})
    self._detector.start()

on_connect_error(data) #

On connection error, check if a Detector is already connected and exit, or retry connection.

Source code in cogip/tools/detector_pami/sio_events.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def on_connect_error(self, data: dict[str, Any]) -> None:
    """
    On connection error, check if a Detector is already connected and exit,
    or retry connection.
    """
    logger.error(f"Connect to cogip-server error: {data = }")
    if (
        data
        and isinstance(data, dict)
        and (message := data.get("message"))
        and message == "A detector is already connected"
    ):
        logger.error(message)
        self._detector.retry_connection = False
        return

on_disconnect() #

On disconnection from cogip-server, stop detector threads.

Source code in cogip/tools/detector_pami/sio_events.py
30
31
32
33
34
35
def on_disconnect(self) -> None:
    """
    On disconnection from cogip-server, stop detector threads.
    """
    logger.info("Disconnected from cogip-server")
    self._detector.stop()

on_pose_current(data) #

Callback on robot pose message.

Source code in cogip/tools/detector_pami/sio_events.py
75
76
77
78
79
def on_pose_current(self, data: dict[str, Any]) -> None:
    """
    Callback on robot pose message.
    """
    self._detector.robot_pose = models.Pose.model_validate(data)

on_sensors_data(data) #

Callback on sensors data.

Source code in cogip/tools/detector_pami/sio_events.py
81
82
83
84
85
86
def on_sensors_data(self, data: list[int]) -> None:
    """
    Callback on sensors data.
    """
    logger.debug("Received sensors data")
    self._detector.update_sensors_data(data)