Skip to content

__main__

main() #

Run firmware telemetry utility.

During installation of cogip-tools, setuptools is configured to create the cogip-firmware-telemetry script using this function as entrypoint.

Source code in cogip/tools/firmware_telemetry/__main__.py
88
89
90
91
92
93
94
95
def main():
    """
    Run firmware telemetry utility.

    During installation of cogip-tools, `setuptools` is configured
    to create the `cogip-firmware-telemetry` script using this function as entrypoint.
    """
    typer.run(main_opt)

main_async(server_url, duration) async #

CLI utility to test firmware telemetry reception. Creates its own Socket.IO client to host the FirmwareTelemetryManager.

Source code in cogip/tools/firmware_telemetry/__main__.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def main_async(server_url: str, duration: float):
    """
    CLI utility to test firmware telemetry reception.
    Creates its own Socket.IO client to host the FirmwareTelemetryManager.
    """
    sio = socketio.AsyncClient(logger=False)
    telemetry = FirmwareTelemetryManager(sio=sio)

    print(f"Connecting to {server_url}...")
    await sio.connect(server_url, namespaces=[telemetry.namespace])

    print("Enabling telemetry...")
    await telemetry.enable()

    print(f"Receiving telemetry data for {duration} seconds...")
    print("-" * 60)

    start_time = time.monotonic()
    last_print_time = 0.0

    try:
        while (time.monotonic() - start_time) < duration:
            await asyncio.sleep(0.1)

            # Print telemetry values every second
            current_time = time.monotonic() - start_time
            if current_time - last_print_time >= 1.0:
                last_print_time = current_time
                print(f"\n[{current_time:.1f}s] Telemetry store ({len(telemetry.store)} entries):")
                for key_hash, data in telemetry.store.items():
                    print(f"  hash=0x{key_hash:08x} ts={data.timestamp_ms}ms value={data.value}")

    except KeyboardInterrupt:
        print("\nInterrupted by user.")

    print("-" * 60)
    print("Disabling telemetry...")
    await telemetry.disable()

    await sio.disconnect()
    print("Disconnected.")