Skip to content

sio_events

SioEvents #

Bases: AsyncClientNamespace

Handle all SocketIO events received by FirmwareTelemetryManager.

Source code in cogip/tools/firmware_telemetry/sio_events.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class SioEvents(socketio.AsyncClientNamespace):
    """
    Handle all SocketIO events received by FirmwareTelemetryManager.
    """

    def __init__(self, manager: "FirmwareTelemetryManager"):
        super().__init__("/telemetry")
        self.manager = manager
        self.connected = False
        self._telemetry_callback: Callable[[TelemetryData], None] | None = None

    def set_telemetry_callback(self, callback: Callable[[TelemetryData], None] | None) -> None:
        """
        Set callback to be called for each telemetry data point.

        Args:
            callback: Function to call with each TelemetryData, or None to clear.
        """
        self._telemetry_callback = callback

    async def on_connect(self):
        """
        On connection to cogip-server.
        """
        await asyncio.to_thread(
            polling2.poll,
            lambda: self.client.connected is True,
            step=1,
            poll_forever=True,
        )
        logger.info("Connected to cogip-server")
        await self.emit("connected")

        self.connected = True

    async def on_disconnect(self) -> None:
        """
        On disconnection from cogip-server.
        """
        logger.info("Disconnected from cogip-server")
        self.connected = False

    async def on_connect_error(self, data: dict[str, Any]) -> None:
        """
        On connection error.
        """
        if isinstance(data, dict) and "message" in data:
            message = data["message"]
        else:
            message = data
        logger.error(f"Connection to cogip-server failed: {message}")

    async def on_telemetry_data(self, data: dict[str, Any]):
        """
        Handle telemetry_data from copilot.
        Store the telemetry data point and call callback if registered.
        """
        telemetry = ParseDict(data, PB_TelemetryData())
        telemetry_data = TelemetryData.from_protobuf(telemetry)
        self.manager.data.update(telemetry_data)

        if self._telemetry_callback:
            self._telemetry_callback(telemetry_data)

on_connect() async #

On connection to cogip-server.

Source code in cogip/tools/firmware_telemetry/sio_events.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
async def on_connect(self):
    """
    On connection to cogip-server.
    """
    await asyncio.to_thread(
        polling2.poll,
        lambda: self.client.connected is True,
        step=1,
        poll_forever=True,
    )
    logger.info("Connected to cogip-server")
    await self.emit("connected")

    self.connected = True

on_connect_error(data) async #

On connection error.

Source code in cogip/tools/firmware_telemetry/sio_events.py
59
60
61
62
63
64
65
66
67
async def on_connect_error(self, data: dict[str, Any]) -> None:
    """
    On connection error.
    """
    if isinstance(data, dict) and "message" in data:
        message = data["message"]
    else:
        message = data
    logger.error(f"Connection to cogip-server failed: {message}")

on_disconnect() async #

On disconnection from cogip-server.

Source code in cogip/tools/firmware_telemetry/sio_events.py
52
53
54
55
56
57
async def on_disconnect(self) -> None:
    """
    On disconnection from cogip-server.
    """
    logger.info("Disconnected from cogip-server")
    self.connected = False

on_telemetry_data(data) async #

Handle telemetry_data from copilot. Store the telemetry data point and call callback if registered.

Source code in cogip/tools/firmware_telemetry/sio_events.py
69
70
71
72
73
74
75
76
77
78
79
async def on_telemetry_data(self, data: dict[str, Any]):
    """
    Handle telemetry_data from copilot.
    Store the telemetry data point and call callback if registered.
    """
    telemetry = ParseDict(data, PB_TelemetryData())
    telemetry_data = TelemetryData.from_protobuf(telemetry)
    self.manager.data.update(telemetry_data)

    if self._telemetry_callback:
        self._telemetry_callback(telemetry_data)

set_telemetry_callback(callback) #

Set callback to be called for each telemetry data point.

Parameters:

Name Type Description Default
callback Callable[[TelemetryData], None] | None

Function to call with each TelemetryData, or None to clear.

required
Source code in cogip/tools/firmware_telemetry/sio_events.py
28
29
30
31
32
33
34
35
def set_telemetry_callback(self, callback: Callable[[TelemetryData], None] | None) -> None:
    """
    Set callback to be called for each telemetry data point.

    Args:
        callback: Function to call with each TelemetryData, or None to clear.
    """
    self._telemetry_callback = callback