Skip to content

firmware_telemetry_manager

FirmwareTelemetryManager #

Manager to receive firmware telemetry data via SocketIO through copilot.

This class is designed to be embedded into another Socket.IO client (e.g., Planner, Monitor, Calibration tool, or any tool that needs firmware telemetry access). It registers its own namespace (/telemetry) on the provided client, allowing the host tool to manage the connection lifecycle while benefiting from telemetry data reception.

Telemetry data is stored in a TelemetryDict and can be accessed by key name or hash.

Note on concurrent usage: Multiple clients can receive telemetry data simultaneously as the server broadcasts to all connected clients. However, having multiple consumers may lead to debugging complexity. Prefer having a single client handle telemetry at a time.

Source code in cogip/tools/firmware_telemetry/firmware_telemetry_manager.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class FirmwareTelemetryManager:
    """
    Manager to receive firmware telemetry data via SocketIO through copilot.

    This class is designed to be embedded into another Socket.IO client (e.g., Planner,
    Monitor, Calibration tool, or any tool that needs firmware telemetry access). It registers
    its own namespace (/telemetry) on the provided client, allowing the host tool to manage
    the connection lifecycle while benefiting from telemetry data reception.

    Telemetry data is stored in a TelemetryDict and can be accessed by key name or hash.

    Note on concurrent usage: Multiple clients can receive telemetry data simultaneously
    as the server broadcasts to all connected clients. However, having multiple consumers
    may lead to debugging complexity. Prefer having a single client handle telemetry at a time.
    """

    def __init__(
        self,
        sio: socketio.AsyncClient,
    ):
        """
        Initialize the FirmwareTelemetryManager.

        Args:
            sio: External Socket.IO client on which to register the /telemetry namespace.
                 The host client is responsible for connection management.
        """
        self.sio = sio
        self.data = TelemetryDict()

        self.sio_events = SioEvents(self)
        self.sio.register_namespace(self.sio_events)

    @property
    def namespace(self) -> str:
        """Return the namespace path to include when connecting the host client."""
        return self.sio_events.namespace

    @property
    def is_connected(self) -> bool:
        """Check if the namespace is connected and ready."""
        return self.sio_events.connected

    async def enable(self) -> None:
        """
        Enable telemetry on the firmware.
        Sends telemetry_enable event to copilot via the server.
        """
        await self.sio.emit("telemetry_enable", namespace="/telemetry")

    async def disable(self) -> None:
        """
        Disable telemetry on the firmware.
        Sends telemetry_disable event to copilot via the server.
        """
        await self.sio.emit("telemetry_disable", namespace="/telemetry")

    def get_value(self, key: str, default: float | int = 0) -> float | int:
        """
        Get the latest telemetry value for a key.

        Args:
            key: The telemetry key name.
            default: Default value if key not found.

        Returns:
            The telemetry value, or default if not found.
        """
        if key in self.data:
            return self.data[key]
        return default

    def get_model(self, key: str) -> TelemetryData:
        """
        Get the full telemetry data model for a key.

        Args:
            key: The telemetry key name.

        Returns:
            TelemetryData containing key_hash, timestamp_ms, and value.

        Raises:
            KeyError: If the key is not found in the store.
        """
        return self.data.get_model(key)

is_connected property #

Check if the namespace is connected and ready.

namespace property #

Return the namespace path to include when connecting the host client.

__init__(sio) #

Initialize the FirmwareTelemetryManager.

Parameters:

Name Type Description Default
sio AsyncClient

External Socket.IO client on which to register the /telemetry namespace. The host client is responsible for connection management.

required
Source code in cogip/tools/firmware_telemetry/firmware_telemetry_manager.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    sio: socketio.AsyncClient,
):
    """
    Initialize the FirmwareTelemetryManager.

    Args:
        sio: External Socket.IO client on which to register the /telemetry namespace.
             The host client is responsible for connection management.
    """
    self.sio = sio
    self.data = TelemetryDict()

    self.sio_events = SioEvents(self)
    self.sio.register_namespace(self.sio_events)

disable() async #

Disable telemetry on the firmware. Sends telemetry_disable event to copilot via the server.

Source code in cogip/tools/firmware_telemetry/firmware_telemetry_manager.py
57
58
59
60
61
62
async def disable(self) -> None:
    """
    Disable telemetry on the firmware.
    Sends telemetry_disable event to copilot via the server.
    """
    await self.sio.emit("telemetry_disable", namespace="/telemetry")

enable() async #

Enable telemetry on the firmware. Sends telemetry_enable event to copilot via the server.

Source code in cogip/tools/firmware_telemetry/firmware_telemetry_manager.py
50
51
52
53
54
55
async def enable(self) -> None:
    """
    Enable telemetry on the firmware.
    Sends telemetry_enable event to copilot via the server.
    """
    await self.sio.emit("telemetry_enable", namespace="/telemetry")

get_model(key) #

Get the full telemetry data model for a key.

Parameters:

Name Type Description Default
key str

The telemetry key name.

required

Returns:

Type Description
TelemetryData

TelemetryData containing key_hash, timestamp_ms, and value.

Raises:

Type Description
KeyError

If the key is not found in the store.

Source code in cogip/tools/firmware_telemetry/firmware_telemetry_manager.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def get_model(self, key: str) -> TelemetryData:
    """
    Get the full telemetry data model for a key.

    Args:
        key: The telemetry key name.

    Returns:
        TelemetryData containing key_hash, timestamp_ms, and value.

    Raises:
        KeyError: If the key is not found in the store.
    """
    return self.data.get_model(key)

get_value(key, default=0) #

Get the latest telemetry value for a key.

Parameters:

Name Type Description Default
key str

The telemetry key name.

required
default float | int

Default value if key not found.

0

Returns:

Type Description
float | int

The telemetry value, or default if not found.

Source code in cogip/tools/firmware_telemetry/firmware_telemetry_manager.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def get_value(self, key: str, default: float | int = 0) -> float | int:
    """
    Get the latest telemetry value for a key.

    Args:
        key: The telemetry key name.
        default: Default value if key not found.

    Returns:
        The telemetry value, or default if not found.
    """
    if key in self.data:
        return self.data[key]
    return default