Skip to content

firmware_adapter

Firmware Adapter for PID Calibration

Unified facade for all firmware interactions via SocketIO: - PID parameter load/save operations - Motion control via pose_order/pose_reached

FirmwareAdapter #

Unified adapter for firmware operations via SocketIO.

Handles PID parameter load/save and motion control via pose_order/pose_reached events.

Source code in cogip/tools/firmware_pid_calibration/firmware_adapter.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class FirmwareAdapter:
    """Unified adapter for firmware operations via SocketIO.

    Handles PID parameter load/save and motion control via pose_order/pose_reached
    events.
    """

    PARAM_TIMEOUT: float = 5.0

    def __init__(
        self,
        sio: socketio.AsyncClient,
        param_manager: FirmwareParameterManager,
        pose_reached_event: asyncio.Event,
        console: ConsoleUI | None = None,
    ):
        """
        Initialize the firmware adapter.

        Args:
            sio: SocketIO client for communication
            param_manager: Firmware parameter manager for read/write operations
            pose_reached_event: Event signaled when robot reaches target position
            console: Optional ConsoleUI for progress display
        """
        self.sio = sio
        self._param_manager = param_manager
        self.pose_reached_event = pose_reached_event
        self._console = console or ConsoleUI()

    # === Parameters ===

    async def load_pid_gains(self, pid_type: PidType) -> PidGains:
        """
        Load PID gains for a specific controller type.

        Args:
            pid_type: Type of PID controller to load

        Returns:
            PidGains with values from firmware

        Raises:
            TimeoutError: If firmware communication times out
        """
        kp_name, ki_name, kd_name = pid_type.param_names
        logger.info(f"Loading {pid_type.name} PID gains from firmware...")

        kp, ki, kd = await asyncio.gather(
            self._param_manager.get_parameter_value(kp_name, timeout=self.PARAM_TIMEOUT),
            self._param_manager.get_parameter_value(ki_name, timeout=self.PARAM_TIMEOUT),
            self._param_manager.get_parameter_value(kd_name, timeout=self.PARAM_TIMEOUT),
        )

        gains = PidGains(kp=kp, ki=ki, kd=kd)
        logger.info(f"Loaded {pid_type.name}: {gains}")

        return gains

    async def save_pid_gains(self, pid_type: PidType, gains: PidGains) -> None:
        """
        Save PID gains for a specific controller type.

        Args:
            pid_type: Type of PID controller to save
            gains: PidGains with values to save

        Raises:
            TimeoutError: If firmware communication times out
        """
        kp_name, ki_name, kd_name = pid_type.param_names
        logger.info(f"Saving {pid_type.name} PID gains: {gains}")

        await asyncio.gather(
            self._param_manager.set_parameter_value(kp_name, gains.kp, timeout=self.PARAM_TIMEOUT),
            self._param_manager.set_parameter_value(ki_name, gains.ki, timeout=self.PARAM_TIMEOUT),
            self._param_manager.set_parameter_value(kd_name, gains.kd, timeout=self.PARAM_TIMEOUT),
        )

        logger.info(f"{pid_type.name} PID gains saved successfully")

    # === Motion Control ===

    async def set_start_position(self, x: float, y: float, orientation: float) -> None:
        """
        Set the robot's starting reference position.

        Args:
            x: X coordinate in mm
            y: Y coordinate in mm
            orientation: Orientation in degrees
        """
        pose = Pose(x=x, y=y, O=orientation)

        logger.debug(f"Setting start position: {pose}")

        await self.sio.emit("pose_start", pose.model_dump(), namespace="/calibration")

    async def _send_pose_order(self, x: float, y: float, orientation: float) -> None:
        """
        Send a pose order to the robot.

        Args:
            x: Target X coordinate in mm
            y: Target Y coordinate in mm
            orientation: Target orientation in degrees
        """
        pose = Pose(x=x, y=y, O=orientation)
        self.pose_reached_event.clear()

        logger.debug(f"Sending pose order: {pose}")

        await self.sio.emit("pose_order", pose.model_dump(), namespace="/calibration")

    async def _wait_pose_reached(self, timeout: float = 60.0) -> bool:
        """
        Wait for the robot to reach its target position.

        Args:
            timeout: Maximum time to wait in seconds

        Returns:
            True if pose was reached, False if timeout
        """
        try:
            await asyncio.wait_for(self.pose_reached_event.wait(), timeout=timeout)
            return True
        except TimeoutError:
            logger.warning(f"Timeout waiting for pose reached (>{timeout}s)")
            return False

    async def goto(self, x: float, y: float, orientation: float, timeout: float = 10.0) -> bool:
        """
        Move robot to target position and wait for completion.

        Args:
            x: Target X coordinate in mm
            y: Target Y coordinate in mm
            orientation: Target orientation in degrees
            timeout: Maximum time to wait in seconds

        Returns:
            True if motion completed, False if timeout
        """
        await self._send_pose_order(x, y, orientation)
        return await self._wait_pose_reached(timeout)

    async def send_speed_order(self, linear_speed_mm_s: int, angular_speed_deg_s: int, duration_ms: int) -> None:
        """
        Send a speed order to the robot.

        Args:
            linear_speed_mm_s: Linear speed in mm/s (positive = forward, negative = backward)
            angular_speed_deg_s: Angular speed in deg/s (positive = counter-clockwise)
            duration_ms: Duration of the speed command in milliseconds
        """
        speed_order = SpeedOrder(
            linear_speed_mm_s=linear_speed_mm_s,
            angular_speed_deg_s=angular_speed_deg_s,
            duration_ms=duration_ms,
        )

        logger.debug(f"Sending speed order: {speed_order}")

        await self.sio.emit("speed_order", speed_order.model_dump(), namespace="/calibration")

    # === Reset ===

    async def reset(self) -> None:
        """Send a game reset to copilot via the server."""
        logger.info("Sending game end")
        await self.sio.emit("game_end", namespace="/calibration")

    # === Controller ===

    async def set_controller(self, controller: PB_ControllerEnum) -> None:
        """
        Set the robot controller.

        Args:
            controller: Controller type to set
        """
        logger.debug(f"Setting controller: {PB_ControllerEnum.Name(controller)}")

        await self.sio.emit("set_controller", controller, namespace="/calibration")

__init__(sio, param_manager, pose_reached_event, console=None) #

Initialize the firmware adapter.

Parameters:

Name Type Description Default
sio AsyncClient

SocketIO client for communication

required
param_manager FirmwareParameterManager

Firmware parameter manager for read/write operations

required
pose_reached_event Event

Event signaled when robot reaches target position

required
console ConsoleUI | None

Optional ConsoleUI for progress display

None
Source code in cogip/tools/firmware_pid_calibration/firmware_adapter.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(
    self,
    sio: socketio.AsyncClient,
    param_manager: FirmwareParameterManager,
    pose_reached_event: asyncio.Event,
    console: ConsoleUI | None = None,
):
    """
    Initialize the firmware adapter.

    Args:
        sio: SocketIO client for communication
        param_manager: Firmware parameter manager for read/write operations
        pose_reached_event: Event signaled when robot reaches target position
        console: Optional ConsoleUI for progress display
    """
    self.sio = sio
    self._param_manager = param_manager
    self.pose_reached_event = pose_reached_event
    self._console = console or ConsoleUI()

goto(x, y, orientation, timeout=10.0) async #

Move robot to target position and wait for completion.

Parameters:

Name Type Description Default
x float

Target X coordinate in mm

required
y float

Target Y coordinate in mm

required
orientation float

Target orientation in degrees

required
timeout float

Maximum time to wait in seconds

10.0

Returns:

Type Description
bool

True if motion completed, False if timeout

Source code in cogip/tools/firmware_pid_calibration/firmware_adapter.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
async def goto(self, x: float, y: float, orientation: float, timeout: float = 10.0) -> bool:
    """
    Move robot to target position and wait for completion.

    Args:
        x: Target X coordinate in mm
        y: Target Y coordinate in mm
        orientation: Target orientation in degrees
        timeout: Maximum time to wait in seconds

    Returns:
        True if motion completed, False if timeout
    """
    await self._send_pose_order(x, y, orientation)
    return await self._wait_pose_reached(timeout)

load_pid_gains(pid_type) async #

Load PID gains for a specific controller type.

Parameters:

Name Type Description Default
pid_type PidType

Type of PID controller to load

required

Returns:

Type Description
PidGains

PidGains with values from firmware

Raises:

Type Description
TimeoutError

If firmware communication times out

Source code in cogip/tools/firmware_pid_calibration/firmware_adapter.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def load_pid_gains(self, pid_type: PidType) -> PidGains:
    """
    Load PID gains for a specific controller type.

    Args:
        pid_type: Type of PID controller to load

    Returns:
        PidGains with values from firmware

    Raises:
        TimeoutError: If firmware communication times out
    """
    kp_name, ki_name, kd_name = pid_type.param_names
    logger.info(f"Loading {pid_type.name} PID gains from firmware...")

    kp, ki, kd = await asyncio.gather(
        self._param_manager.get_parameter_value(kp_name, timeout=self.PARAM_TIMEOUT),
        self._param_manager.get_parameter_value(ki_name, timeout=self.PARAM_TIMEOUT),
        self._param_manager.get_parameter_value(kd_name, timeout=self.PARAM_TIMEOUT),
    )

    gains = PidGains(kp=kp, ki=ki, kd=kd)
    logger.info(f"Loaded {pid_type.name}: {gains}")

    return gains

reset() async #

Send a game reset to copilot via the server.

Source code in cogip/tools/firmware_pid_calibration/firmware_adapter.py
190
191
192
193
async def reset(self) -> None:
    """Send a game reset to copilot via the server."""
    logger.info("Sending game end")
    await self.sio.emit("game_end", namespace="/calibration")

save_pid_gains(pid_type, gains) async #

Save PID gains for a specific controller type.

Parameters:

Name Type Description Default
pid_type PidType

Type of PID controller to save

required
gains PidGains

PidGains with values to save

required

Raises:

Type Description
TimeoutError

If firmware communication times out

Source code in cogip/tools/firmware_pid_calibration/firmware_adapter.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def save_pid_gains(self, pid_type: PidType, gains: PidGains) -> None:
    """
    Save PID gains for a specific controller type.

    Args:
        pid_type: Type of PID controller to save
        gains: PidGains with values to save

    Raises:
        TimeoutError: If firmware communication times out
    """
    kp_name, ki_name, kd_name = pid_type.param_names
    logger.info(f"Saving {pid_type.name} PID gains: {gains}")

    await asyncio.gather(
        self._param_manager.set_parameter_value(kp_name, gains.kp, timeout=self.PARAM_TIMEOUT),
        self._param_manager.set_parameter_value(ki_name, gains.ki, timeout=self.PARAM_TIMEOUT),
        self._param_manager.set_parameter_value(kd_name, gains.kd, timeout=self.PARAM_TIMEOUT),
    )

    logger.info(f"{pid_type.name} PID gains saved successfully")

send_speed_order(linear_speed_mm_s, angular_speed_deg_s, duration_ms) async #

Send a speed order to the robot.

Parameters:

Name Type Description Default
linear_speed_mm_s int

Linear speed in mm/s (positive = forward, negative = backward)

required
angular_speed_deg_s int

Angular speed in deg/s (positive = counter-clockwise)

required
duration_ms int

Duration of the speed command in milliseconds

required
Source code in cogip/tools/firmware_pid_calibration/firmware_adapter.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def send_speed_order(self, linear_speed_mm_s: int, angular_speed_deg_s: int, duration_ms: int) -> None:
    """
    Send a speed order to the robot.

    Args:
        linear_speed_mm_s: Linear speed in mm/s (positive = forward, negative = backward)
        angular_speed_deg_s: Angular speed in deg/s (positive = counter-clockwise)
        duration_ms: Duration of the speed command in milliseconds
    """
    speed_order = SpeedOrder(
        linear_speed_mm_s=linear_speed_mm_s,
        angular_speed_deg_s=angular_speed_deg_s,
        duration_ms=duration_ms,
    )

    logger.debug(f"Sending speed order: {speed_order}")

    await self.sio.emit("speed_order", speed_order.model_dump(), namespace="/calibration")

set_controller(controller) async #

Set the robot controller.

Parameters:

Name Type Description Default
controller PB_ControllerEnum

Controller type to set

required
Source code in cogip/tools/firmware_pid_calibration/firmware_adapter.py
197
198
199
200
201
202
203
204
205
206
async def set_controller(self, controller: PB_ControllerEnum) -> None:
    """
    Set the robot controller.

    Args:
        controller: Controller type to set
    """
    logger.debug(f"Setting controller: {PB_ControllerEnum.Name(controller)}")

    await self.sio.emit("set_controller", controller, namespace="/calibration")

set_start_position(x, y, orientation) async #

Set the robot's starting reference position.

Parameters:

Name Type Description Default
x float

X coordinate in mm

required
y float

Y coordinate in mm

required
orientation float

Orientation in degrees

required
Source code in cogip/tools/firmware_pid_calibration/firmware_adapter.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
async def set_start_position(self, x: float, y: float, orientation: float) -> None:
    """
    Set the robot's starting reference position.

    Args:
        x: X coordinate in mm
        y: Y coordinate in mm
        orientation: Orientation in degrees
    """
    pose = Pose(x=x, y=y, O=orientation)

    logger.debug(f"Setting start position: {pose}")

    await self.sio.emit("pose_start", pose.model_dump(), namespace="/calibration")