Skip to content

telemetry_view

Real-time telemetry visualization widget using PyQtGraph.

Configurable layout via YAML configuration file.

TelemetryView #

Bases: QWidget

Real-time telemetry graph widget using PyQtGraph.

Features: - Configurable grid layout (row/col) - Automatic color assignment - Automatic Y-axis scaling - Time-based pruning (retention_seconds) - Memory-limited data storage (max_points)

Source code in cogip/tools/firmware_telemetry/graph/telemetry_view.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class TelemetryView(QWidget):
    """
    Real-time telemetry graph widget using PyQtGraph.

    Features:
    - Configurable grid layout (row/col)
    - Automatic color assignment
    - Automatic Y-axis scaling
    - Time-based pruning (retention_seconds)
    - Memory-limited data storage (max_points)
    """

    DEFAULT_COLORS: list[Color] = [
        (31, 119, 180),  # Blue
        (255, 127, 14),  # Orange
        (44, 160, 44),  # Green
        (214, 39, 40),  # Red
        (148, 103, 189),  # Purple
        (140, 86, 75),  # Brown
        (227, 119, 194),  # Pink
        (127, 127, 127),  # Gray
        (188, 189, 34),  # Olive
        (23, 190, 207),  # Cyan
    ]

    def __init__(self, config: TelemetryGraphConfig, parent: QWidget | None = None):
        super().__init__(parent)
        self._config = config
        self._recording = False
        self._start_time_ms: int | None = None
        self._update_counter = 0

        # Data storage: key -> (times deque, values deque)
        self._data: DataStorage = {}

        # Curves: key -> PlotDataItem
        self._curves: dict[str, pg.PlotDataItem] = {}

        # Plots: title -> PlotItem
        self._plots: dict[str, pg.PlotItem] = {}

        # Key hash mapping: hash -> key name
        self._key_hashes: dict[int, str] = {}

        # Color assignment: key -> color tuple
        self._colors: dict[str, Color] = {}

        self._setup_ui()

    def _setup_ui(self) -> None:
        """Configure the Qt layout with PyQtGraph plots."""
        layout = QVBoxLayout(self)
        layout.setContentsMargins(5, 5, 5, 5)
        layout.setSpacing(0)

        # Configure pyqtgraph dark theme
        pg.setConfigOption("background", "#1E1E1E")
        pg.setConfigOption("foreground", "#CCCCCC")

        # Create graphics layout widget
        self._graph_layout = pg.GraphicsLayoutWidget()
        layout.addWidget(self._graph_layout)

        self._build_plots()

        # Set window properties
        self.setWindowTitle("Telemetry View")
        self.setMinimumSize(900, 700)

    def _build_plots(self) -> None:
        """Build plots from current config."""
        # Assign colors to all keys
        self._assign_colors()

        # Create plots from config
        for plot_config in self._config.plots:
            plot = self._graph_layout.addPlot(
                row=plot_config.row,
                col=plot_config.col,
                rowspan=plot_config.rowspan,
                colspan=plot_config.colspan,
                title=plot_config.title,
            )
            y_label = f"Value ({plot_config.y_unit})" if plot_config.y_unit else "Value"
            plot.setLabel("left", y_label)
            plot.setLabel("bottom", "Time (ms)")
            plot.addLegend(offset=(10, 10))
            plot.showGrid(x=True, y=True, alpha=0.3)

            self._plots[plot_config.title] = plot

            # Create curves for each key in this plot
            for key in plot_config.keys:
                color = self._colors[key]
                curve = plot.plot(
                    pen=pg.mkPen(color=color, width=2),
                    symbol="o",
                    symbolSize=5,
                    symbolBrush=color,
                    name=key,
                )
                self._curves[key] = curve

                # Initialize data storage
                self._data[key] = (
                    deque(maxlen=self._config.max_points),
                    deque(maxlen=self._config.max_points),
                )

                # Map key hash to key name
                self._key_hashes[fnv1a_hash(key)] = key

    def load_config(self, config: TelemetryGraphConfig) -> None:
        """Load a new configuration, rebuilding all plots."""
        self._config = config
        self._recording = False
        self._start_time_ms = None
        self._update_counter = 0

        # Clear existing state
        self._data.clear()
        self._curves.clear()
        self._plots.clear()
        self._key_hashes.clear()
        self._colors.clear()

        # Clear the graphics layout and rebuild
        self._graph_layout.clear()
        self._build_plots()

    def _assign_colors(self) -> None:
        """Assign colors to all telemetry keys."""
        color_idx = 0
        for plot_config in self._config.plots:
            for key in plot_config.keys:
                if key not in self._colors:
                    self._colors[key] = self.DEFAULT_COLORS[color_idx % len(self.DEFAULT_COLORS)]
                    color_idx += 1

    def start_recording(self) -> None:
        """Start recording telemetry data."""
        self._recording = True
        self._start_time_ms = None

    def stop_recording(self) -> None:
        """Stop recording telemetry data."""
        self._recording = False

    def clear(self) -> None:
        """Clear all curves and reset state."""
        self._start_time_ms = None
        self._recording = False
        self._update_counter = 0

        for times, values in self._data.values():
            times.clear()
            values.clear()

        for curve in self._curves.values():
            curve.setData([], [])

    def update_telemetry(self, data: TelemetryData) -> None:
        """
        Process incoming telemetry data and update curves.

        Args:
            data: TelemetryData from firmware
        """
        if not self._recording:
            return

        # Check if this key is tracked
        key = self._key_hashes.get(data.key_hash)
        if key is None:
            return

        # Initialize start time on first data point
        if self._start_time_ms is None:
            self._start_time_ms = data.timestamp_ms

        # Calculate relative time
        relative_time = data.timestamp_ms - self._start_time_ms

        # Append data
        times, values = self._data[key]
        times.append(relative_time)
        values.append(data.value)

        # Update curve
        self._curves[key].setData(list(times), list(values))

        # Periodic pruning
        self._update_counter += 1
        if self._update_counter % self._config.prune_interval == 0:
            self._prune_old_data(relative_time)

    def _prune_old_data(self, current_time_ms: float) -> None:
        """Remove data older than retention_seconds."""
        cutoff_ms = current_time_ms - (self._config.retention_seconds * 1000)

        for key, (times, values) in self._data.items():
            # Remove old data points
            while times and times[0] < cutoff_ms:
                times.popleft()
                values.popleft()

            # Update curve
            curve = self._curves.get(key)
            if curve:
                curve.setData(list(times), list(values))

clear() #

Clear all curves and reset state.

Source code in cogip/tools/firmware_telemetry/graph/telemetry_view.py
168
169
170
171
172
173
174
175
176
177
178
179
def clear(self) -> None:
    """Clear all curves and reset state."""
    self._start_time_ms = None
    self._recording = False
    self._update_counter = 0

    for times, values in self._data.values():
        times.clear()
        values.clear()

    for curve in self._curves.values():
        curve.setData([], [])

load_config(config) #

Load a new configuration, rebuilding all plots.

Source code in cogip/tools/firmware_telemetry/graph/telemetry_view.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def load_config(self, config: TelemetryGraphConfig) -> None:
    """Load a new configuration, rebuilding all plots."""
    self._config = config
    self._recording = False
    self._start_time_ms = None
    self._update_counter = 0

    # Clear existing state
    self._data.clear()
    self._curves.clear()
    self._plots.clear()
    self._key_hashes.clear()
    self._colors.clear()

    # Clear the graphics layout and rebuild
    self._graph_layout.clear()
    self._build_plots()

start_recording() #

Start recording telemetry data.

Source code in cogip/tools/firmware_telemetry/graph/telemetry_view.py
159
160
161
162
def start_recording(self) -> None:
    """Start recording telemetry data."""
    self._recording = True
    self._start_time_ms = None

stop_recording() #

Stop recording telemetry data.

Source code in cogip/tools/firmware_telemetry/graph/telemetry_view.py
164
165
166
def stop_recording(self) -> None:
    """Stop recording telemetry data."""
    self._recording = False

update_telemetry(data) #

Process incoming telemetry data and update curves.

Parameters:

Name Type Description Default
data TelemetryData

TelemetryData from firmware

required
Source code in cogip/tools/firmware_telemetry/graph/telemetry_view.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def update_telemetry(self, data: TelemetryData) -> None:
    """
    Process incoming telemetry data and update curves.

    Args:
        data: TelemetryData from firmware
    """
    if not self._recording:
        return

    # Check if this key is tracked
    key = self._key_hashes.get(data.key_hash)
    if key is None:
        return

    # Initialize start time on first data point
    if self._start_time_ms is None:
        self._start_time_ms = data.timestamp_ms

    # Calculate relative time
    relative_time = data.timestamp_ms - self._start_time_ms

    # Append data
    times, values = self._data[key]
    times.append(relative_time)
    values.append(data.value)

    # Update curve
    self._curves[key].setData(list(times), list(values))

    # Periodic pruning
    self._update_counter += 1
    if self._update_counter % self._config.prune_interval == 0:
        self._prune_old_data(relative_time)