Skip to content

sio_events

SioEvents #

Bases: AsyncClientNamespace

Handle all SocketIO events received by FirmwareTelemetryManager.

Source code in cogip/tools/firmware_telemetry/sio_events.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class SioEvents(socketio.AsyncClientNamespace):
    """
    Handle all SocketIO events received by FirmwareTelemetryManager.
    """

    def __init__(self, manager: "FirmwareTelemetryManager"):
        super().__init__("/telemetry")
        self.manager = manager
        self.connected = False

    async def on_connect(self):
        """
        On connection to cogip-server.
        """
        await asyncio.to_thread(
            polling2.poll,
            lambda: self.client.connected is True,
            step=1,
            poll_forever=True,
        )
        logger.info("Connected to cogip-server")
        await self.emit("connected")

        self.connected = True

    async def on_disconnect(self) -> None:
        """
        On disconnection from cogip-server.
        """
        logger.info("Disconnected from cogip-server")
        self.connected = False

    async def on_connect_error(self, data: dict[str, Any]) -> None:
        """
        On connection error.
        """
        if isinstance(data, dict) and "message" in data:
            message = data["message"]
        else:
            message = data
        logger.error(f"Connection to cogip-server failed: {message}")

    async def on_telemetry_data(self, data: dict[str, Any]):
        """
        Handle telemetry_data from copilot.
        Store the telemetry data point.
        """
        telemetry = ParseDict(data, PB_TelemetryData())
        telemetry_data = TelemetryData.from_protobuf(telemetry)
        self.manager.store.update(telemetry_data)

on_connect() async #

On connection to cogip-server.

Source code in cogip/tools/firmware_telemetry/sio_events.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def on_connect(self):
    """
    On connection to cogip-server.
    """
    await asyncio.to_thread(
        polling2.poll,
        lambda: self.client.connected is True,
        step=1,
        poll_forever=True,
    )
    logger.info("Connected to cogip-server")
    await self.emit("connected")

    self.connected = True

on_connect_error(data) async #

On connection error.

Source code in cogip/tools/firmware_telemetry/sio_events.py
48
49
50
51
52
53
54
55
56
async def on_connect_error(self, data: dict[str, Any]) -> None:
    """
    On connection error.
    """
    if isinstance(data, dict) and "message" in data:
        message = data["message"]
    else:
        message = data
    logger.error(f"Connection to cogip-server failed: {message}")

on_disconnect() async #

On disconnection from cogip-server.

Source code in cogip/tools/firmware_telemetry/sio_events.py
41
42
43
44
45
46
async def on_disconnect(self) -> None:
    """
    On disconnection from cogip-server.
    """
    logger.info("Disconnected from cogip-server")
    self.connected = False

on_telemetry_data(data) async #

Handle telemetry_data from copilot. Store the telemetry data point.

Source code in cogip/tools/firmware_telemetry/sio_events.py
58
59
60
61
62
63
64
65
async def on_telemetry_data(self, data: dict[str, Any]):
    """
    Handle telemetry_data from copilot.
    Store the telemetry data point.
    """
    telemetry = ParseDict(data, PB_TelemetryData())
    telemetry_data = TelemetryData.from_protobuf(telemetry)
    self.manager.store.update(telemetry_data)