Skip to content

firmware_telemetry

Firmware Telemetry Models.

This module provides models for parsing generic telemetry data received from the robot's MCU firmware via Protobuf messages using FNV-1a key hashes.

TelemetryData #

Bases: BaseModel

Generic telemetry data point with key hash, timestamp, and value.

Attributes:

Name Type Description
key_hash int

FNV-1a hash of the telemetry key.

timestamp_ms int

Timestamp in milliseconds when the data was captured.

value TelemetryValue

The telemetry value (float or int depending on type).

Source code in cogip/models/firmware_telemetry.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class TelemetryData(BaseModel):
    """
    Generic telemetry data point with key hash, timestamp, and value.

    Attributes:
        key_hash: FNV-1a hash of the telemetry key.
        timestamp_ms: Timestamp in milliseconds when the data was captured.
        value: The telemetry value (float or int depending on type).
    """

    model_config = ConfigDict(validate_assignment=True)

    key_hash: int
    timestamp_ms: int
    value: TelemetryValue

    @classmethod
    def from_protobuf(cls, message: PB_TelemetryData) -> "TelemetryData":
        """
        Parse a TelemetryData from a PB_TelemetryData protobuf message.

        Args:
            message: The PB_TelemetryData protobuf message.

        Returns:
            TelemetryData instance with parsed values.
        """
        key_hash = message.key_hash
        timestamp_ms = message.timestamp_ms

        # Extract the value from the oneof field
        which_value = message.WhichOneof("value")
        if which_value is None:
            value: TelemetryValue = 0
        else:
            value = getattr(message, which_value)

        return cls(key_hash=key_hash, timestamp_ms=timestamp_ms, value=value)

from_protobuf(message) classmethod #

Parse a TelemetryData from a PB_TelemetryData protobuf message.

Parameters:

Name Type Description Default
message PB_TelemetryData

The PB_TelemetryData protobuf message.

required

Returns:

Type Description
TelemetryData

TelemetryData instance with parsed values.

Source code in cogip/models/firmware_telemetry.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@classmethod
def from_protobuf(cls, message: PB_TelemetryData) -> "TelemetryData":
    """
    Parse a TelemetryData from a PB_TelemetryData protobuf message.

    Args:
        message: The PB_TelemetryData protobuf message.

    Returns:
        TelemetryData instance with parsed values.
    """
    key_hash = message.key_hash
    timestamp_ms = message.timestamp_ms

    # Extract the value from the oneof field
    which_value = message.WhichOneof("value")
    if which_value is None:
        value: TelemetryValue = 0
    else:
        value = getattr(message, which_value)

    return cls(key_hash=key_hash, timestamp_ms=timestamp_ms, value=value)

TelemetryDict #

Dict-like store for telemetry data points indexed by key hash.

This class collects telemetry data and provides access by key name.

Source code in cogip/models/firmware_telemetry.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class TelemetryDict:
    """
    Dict-like store for telemetry data points indexed by key hash.

    This class collects telemetry data and provides access by key name.
    """

    def __init__(self):
        self._data: dict[int, TelemetryData] = {}

    def update(self, data: TelemetryData) -> None:
        """
        Update the store with a new telemetry data point.

        Args:
            data: The telemetry data point to store.
        """
        self._data[data.key_hash] = data

    def get_model(self, key: str) -> TelemetryData:
        """
        Get telemetry data model by key name.

        Args:
            key: The telemetry key name to look up.

        Returns:
            TelemetryData for the key.

        Raises:
            KeyError: If the key is not found in the store.
        """
        return self._data[fnv1a_hash(key)]

    def __getitem__(self, key: str) -> TelemetryValue:
        """
        Get telemetry value by key name. Raises KeyError if not found.

        Args:
            key: The telemetry key name to look up.

        Returns:
            The telemetry value for the key.

        Raises:
            KeyError: If the key is not found in the store.
        """
        return self._data[fnv1a_hash(key)].value

    def __contains__(self, key: str) -> bool:
        """
        Check if a key exists in the store.

        Args:
            key: The telemetry key name to check.

        Returns:
            True if the key exists, False otherwise.
        """
        return fnv1a_hash(key) in self._data

    def __len__(self) -> int:
        """Return the number of telemetry entries."""
        return len(self._data)

    def __bool__(self) -> bool:
        """Return True if the store contains any data."""
        return bool(self._data)

    def items(self):
        """Iterate over (key_hash, TelemetryData) tuples."""
        return self._data.items()

    def values(self):
        """Iterate over TelemetryData values."""
        return self._data.values()

__bool__() #

Return True if the store contains any data.

Source code in cogip/models/firmware_telemetry.py
122
123
124
def __bool__(self) -> bool:
    """Return True if the store contains any data."""
    return bool(self._data)

__contains__(key) #

Check if a key exists in the store.

Parameters:

Name Type Description Default
key str

The telemetry key name to check.

required

Returns:

Type Description
bool

True if the key exists, False otherwise.

Source code in cogip/models/firmware_telemetry.py
106
107
108
109
110
111
112
113
114
115
116
def __contains__(self, key: str) -> bool:
    """
    Check if a key exists in the store.

    Args:
        key: The telemetry key name to check.

    Returns:
        True if the key exists, False otherwise.
    """
    return fnv1a_hash(key) in self._data

__getitem__(key) #

Get telemetry value by key name. Raises KeyError if not found.

Parameters:

Name Type Description Default
key str

The telemetry key name to look up.

required

Returns:

Type Description
TelemetryValue

The telemetry value for the key.

Raises:

Type Description
KeyError

If the key is not found in the store.

Source code in cogip/models/firmware_telemetry.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __getitem__(self, key: str) -> TelemetryValue:
    """
    Get telemetry value by key name. Raises KeyError if not found.

    Args:
        key: The telemetry key name to look up.

    Returns:
        The telemetry value for the key.

    Raises:
        KeyError: If the key is not found in the store.
    """
    return self._data[fnv1a_hash(key)].value

__len__() #

Return the number of telemetry entries.

Source code in cogip/models/firmware_telemetry.py
118
119
120
def __len__(self) -> int:
    """Return the number of telemetry entries."""
    return len(self._data)

get_model(key) #

Get telemetry data model by key name.

Parameters:

Name Type Description Default
key str

The telemetry key name to look up.

required

Returns:

Type Description
TelemetryData

TelemetryData for the key.

Raises:

Type Description
KeyError

If the key is not found in the store.

Source code in cogip/models/firmware_telemetry.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_model(self, key: str) -> TelemetryData:
    """
    Get telemetry data model by key name.

    Args:
        key: The telemetry key name to look up.

    Returns:
        TelemetryData for the key.

    Raises:
        KeyError: If the key is not found in the store.
    """
    return self._data[fnv1a_hash(key)]

items() #

Iterate over (key_hash, TelemetryData) tuples.

Source code in cogip/models/firmware_telemetry.py
126
127
128
def items(self):
    """Iterate over (key_hash, TelemetryData) tuples."""
    return self._data.items()

update(data) #

Update the store with a new telemetry data point.

Parameters:

Name Type Description Default
data TelemetryData

The telemetry data point to store.

required
Source code in cogip/models/firmware_telemetry.py
67
68
69
70
71
72
73
74
def update(self, data: TelemetryData) -> None:
    """
    Update the store with a new telemetry data point.

    Args:
        data: The telemetry data point to store.
    """
    self._data[data.key_hash] = data

values() #

Iterate over TelemetryData values.

Source code in cogip/models/firmware_telemetry.py
130
131
132
def values(self):
    """Iterate over TelemetryData values."""
    return self._data.values()