Skip to content

firmware_adapter

Firmware Adapter for Odometry Calibration

Unified facade for all firmware interactions via SocketIO: - Parameter load/save operations - Encoder telemetry - Motion control via pose_order/pose_reached - Calibration-specific movement sequences

FirmwareAdapter #

Unified adapter for firmware operations via SocketIO.

Handles odometry parameter load/save, encoder telemetry, motion control via pose_order/pose_reached events, and calibration movement sequences.

Source code in cogip/tools/firmware_odometry_calibration/firmware_adapter.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
class FirmwareAdapter:
    """Unified adapter for firmware operations via SocketIO.

    Handles odometry parameter load/save, encoder telemetry, motion control
    via pose_order/pose_reached events, and calibration movement sequences.
    """

    # Parameter name constants
    PARAM_WHEELS_DISTANCE = "encoder_wheels_distance_mm"
    PARAM_RIGHT_WHEEL_DIAMETER = "right_wheel_diameter_mm"
    PARAM_LEFT_WHEEL_DIAMETER = "left_wheel_diameter_mm"
    PARAM_LEFT_POLARITY = "qdec_left_polarity"
    PARAM_RIGHT_POLARITY = "qdec_right_polarity"
    PARAM_ENCODER_RESOLUTION = "encoder_wheels_resolution_pulses"

    # Telemetry keys for encoder values
    TELEMETRY_LEFT_ENCODER = "encoder_left"
    TELEMETRY_RIGHT_ENCODER = "encoder_right"

    def __init__(
        self,
        sio: socketio.AsyncClient,
        param_manager: FirmwareParameterManager,
        telemetry_manager: FirmwareTelemetryManager,
        pose_reached_event: asyncio.Event,
        console: ConsoleUI | None = None,
    ):
        """
        Initialize the firmware adapter.

        Args:
            sio: SocketIO client for communication
            param_manager: Firmware parameter manager for read/write operations
            telemetry_manager: Firmware telemetry manager for encoder tick counts
            pose_reached_event: Event signaled when robot reaches target position
            console: Optional ConsoleUI for progress display
        """
        self.sio = sio
        self._param_manager = param_manager
        self._telemetry = telemetry_manager
        self.pose_reached_event = pose_reached_event
        self._console = console or ConsoleUI()

    # === Parameters ===

    async def load_parameters(self) -> OdometryParameters:
        """
        Load current odometry parameters from firmware.

        Returns:
            OdometryParameters populated with values from firmware

        Raises:
            TimeoutError: If firmware communication times out
        """
        logger.info("Loading parameters from firmware...")
        (
            wheels_distance,
            right_diameter,
            left_diameter,
            left_polarity,
            right_polarity,
            encoder_ticks,
        ) = await asyncio.gather(
            self._param_manager.get_parameter_value(self.PARAM_WHEELS_DISTANCE),
            self._param_manager.get_parameter_value(self.PARAM_RIGHT_WHEEL_DIAMETER),
            self._param_manager.get_parameter_value(self.PARAM_LEFT_WHEEL_DIAMETER),
            self._param_manager.get_parameter_value(self.PARAM_LEFT_POLARITY),
            self._param_manager.get_parameter_value(self.PARAM_RIGHT_POLARITY),
            self._param_manager.get_parameter_value(self.PARAM_ENCODER_RESOLUTION),
        )

        params = OdometryParameters(
            wheels_distance=wheels_distance,
            right_wheel_radius=right_diameter / 2.0,
            left_wheel_radius=left_diameter / 2.0,
            left_polarity=left_polarity,
            right_polarity=right_polarity,
            encoder_ticks=encoder_ticks,
        )

        logger.info(f"Loaded parameters: {params}")

        return params

    async def save_parameters(self, params: OdometryParameters) -> None:
        """
        Save odometry parameters to firmware.

        Only saves the calibrated values (wheels_distance, wheel radii).
        Polarity and encoder ticks are not modified.

        Args:
            params: OdometryParameters with values to save

        Raises:
            TimeoutError: If firmware communication times out
        """
        logger.info(f"Saving parameters to firmware: {params}")

        await asyncio.gather(
            self._param_manager.set_parameter_value(self.PARAM_WHEELS_DISTANCE, params.wheels_distance),
            self._param_manager.set_parameter_value(self.PARAM_RIGHT_WHEEL_DIAMETER, params.right_wheel_radius * 2.0),
            self._param_manager.set_parameter_value(self.PARAM_LEFT_WHEEL_DIAMETER, params.left_wheel_radius * 2.0),
        )

        logger.info("Parameters saved successfully")

    # === Telemetry ===

    async def get_encoder_ticks(self) -> tuple[int, int]:
        """
        Get current encoder tick counts from firmware telemetry.

        Waits briefly for telemetry to stabilize before reading values.

        Returns:
            Tuple of (left_ticks, right_ticks)
        """
        await asyncio.sleep(0.1)

        left = self._telemetry.get_value(self.TELEMETRY_LEFT_ENCODER)
        right = self._telemetry.get_value(self.TELEMETRY_RIGHT_ENCODER)

        return int(left), int(right)

    # === Motion Control ===

    async def set_start_position(self, x: float, y: float, orientation: float) -> None:
        """
        Set the robot's starting reference position.

        Args:
            x: X coordinate in mm
            y: Y coordinate in mm
            orientation: Orientation in degrees
        """
        pose = Pose(x=x, y=y, O=orientation)

        logger.debug(f"Setting start position: {pose}")

        await self.sio.emit("pose_start", pose.model_dump(), namespace="/calibration")

    async def _send_pose_order(self, x: float, y: float, orientation: float) -> None:
        """
        Send a pose order to the robot.

        Args:
            x: Target X coordinate in mm
            y: Target Y coordinate in mm
            orientation: Target orientation in degrees
        """
        pose = Pose(x=x, y=y, O=orientation)
        self.pose_reached_event.clear()

        logger.debug(f"Sending pose order: {pose}")

        await self.sio.emit("pose_order", pose.model_dump(), namespace="/calibration")

    async def _wait_pose_reached(self, timeout: float = 60.0) -> bool:
        """
        Wait for the robot to reach its target position.

        Args:
            timeout: Maximum time to wait in seconds

        Returns:
            True if pose was reached, False if timeout
        """
        try:
            await asyncio.wait_for(self.pose_reached_event.wait(), timeout=timeout)
            return True
        except TimeoutError:
            logger.warning(f"Timeout waiting for pose reached (>{timeout}s)")
            return False

    async def goto(self, x: float, y: float, orientation: float, timeout: float = 60.0) -> bool:
        """
        Move robot to target position and wait for completion.

        Args:
            x: Target X coordinate in mm
            y: Target Y coordinate in mm
            orientation: Target orientation in degrees
            timeout: Maximum time to wait in seconds

        Returns:
            True if movement completed, False if timeout
        """
        await self._send_pose_order(x, y, orientation)
        return await self._wait_pose_reached(timeout)

    # === Calibration Movement Sequences ===

    async def execute_rotations(self, num_rotations: int) -> bool:
        """
        Execute N full rotations in place (Phase 1: WHEEL_DISTANCE).

        Args:
            num_rotations: Number of full 360-degree rotations

        Returns:
            True if successful, False otherwise.
        """
        target_orientation = num_rotations * 360.0
        logger.info(f"Executing {num_rotations} rotations ({target_orientation}°)...")

        # Reset position to origin
        await self.set_start_position(0, 0, 0)
        await asyncio.sleep(0.2)

        quarter_turns = [90, 180, -90, 0]

        with Progress(
            SpinnerColumn(),
            TextColumn("[cyan]Rotation {task.fields[rotation]}/{task.fields[total_count]}[/]"),
            TimeElapsedColumn(),
            console=self._console,
        ) as progress:
            task = progress.add_task("", rotation=1, total_count=num_rotations)

            for rotation_num in range(num_rotations):
                progress.update(task, rotation=rotation_num + 1)
                logger.debug(f"Rotation {rotation_num + 1}/{num_rotations}")
                for target_angle in quarter_turns:
                    success = await self.goto(0, 0, target_angle, timeout=30.0)
                    if not success:
                        self._console.show_error(
                            f"Rotation {rotation_num + 1}/{num_rotations} failed at {target_angle}° (timeout)"
                        )
                        return False

            progress.update(task, rotation=num_rotations)

        logger.info("Rotations completed")
        return True

    async def execute_squares(self, num_squares: int) -> bool:
        """
        Execute N square paths clockwise (Phase 2: RIGHT_WHEEL_RADIUS).

        Uses the predefined 500mm square path from __init__.py.

        Args:
            num_squares: Number of complete squares to execute

        Returns:
            True if successful, False otherwise.
        """
        logger.info(f"Executing {num_squares} squares...")

        # Reset position to origin
        await self.set_start_position(0, 0, 0)
        await asyncio.sleep(0.2)

        with Progress(
            SpinnerColumn(),
            TextColumn("[cyan]Square {task.fields[square]}/{task.fields[total_count]}[/]"),
            TimeElapsedColumn(),
            console=self._console,
        ) as progress:
            task = progress.add_task("", square=1, total_count=num_squares)

            for square_num in range(num_squares):
                progress.update(task, square=square_num + 1)
                for waypoint_idx, waypoint in enumerate(SQUARE_PATH_CCW):
                    orientation_degrees = math.degrees(waypoint.O)
                    logger.debug(
                        f"Square {square_num + 1}/{num_squares}, waypoint {waypoint_idx + 1}/4: "
                        f"({waypoint.x}, {waypoint.y}, {orientation_degrees}°)"
                    )

                    success = await self.goto(waypoint.x, waypoint.y, orientation_degrees, timeout=30.0)
                    if not success:
                        self._console.show_error(
                            f"Square path failed at waypoint {waypoint_idx + 1}/4 of square {square_num + 1}"
                        )
                        return False

            progress.update(task, square=num_squares)

        logger.info("Squares completed")
        return True

    async def execute_straight_line(self, distance_mm: int) -> bool:
        """
        Execute a straight line movement (Phase 3: LEFT_WHEEL_RADIUS).

        Moves forward from origin by the specified distance.

        Args:
            distance_mm: Distance to travel in mm

        Returns:
            True if successful, False otherwise.
        """
        logger.info(f"Executing straight line movement ({distance_mm}mm)...")

        # Reset position to origin facing forward (0°)
        await self.set_start_position(0, 0, 0)
        await asyncio.sleep(0.2)

        with Progress(
            SpinnerColumn(),
            TextColumn("[cyan]Straight line ({task.fields[distance]} mm)[/]"),
            TimeElapsedColumn(),
            console=self._console,
        ) as progress:
            progress.add_task("", distance=distance_mm)

            success = await self.goto(distance_mm, 0, 0, timeout=30.0)
            if not success:
                self._console.show_error("Straight line movement failed (timeout)")
                return False

        logger.info("Straight line completed")
        return True

__init__(sio, param_manager, telemetry_manager, pose_reached_event, console=None) #

Initialize the firmware adapter.

Parameters:

Name Type Description Default
sio AsyncClient

SocketIO client for communication

required
param_manager FirmwareParameterManager

Firmware parameter manager for read/write operations

required
telemetry_manager FirmwareTelemetryManager

Firmware telemetry manager for encoder tick counts

required
pose_reached_event Event

Event signaled when robot reaches target position

required
console ConsoleUI | None

Optional ConsoleUI for progress display

None
Source code in cogip/tools/firmware_odometry_calibration/firmware_adapter.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __init__(
    self,
    sio: socketio.AsyncClient,
    param_manager: FirmwareParameterManager,
    telemetry_manager: FirmwareTelemetryManager,
    pose_reached_event: asyncio.Event,
    console: ConsoleUI | None = None,
):
    """
    Initialize the firmware adapter.

    Args:
        sio: SocketIO client for communication
        param_manager: Firmware parameter manager for read/write operations
        telemetry_manager: Firmware telemetry manager for encoder tick counts
        pose_reached_event: Event signaled when robot reaches target position
        console: Optional ConsoleUI for progress display
    """
    self.sio = sio
    self._param_manager = param_manager
    self._telemetry = telemetry_manager
    self.pose_reached_event = pose_reached_event
    self._console = console or ConsoleUI()

execute_rotations(num_rotations) async #

Execute N full rotations in place (Phase 1: WHEEL_DISTANCE).

Parameters:

Name Type Description Default
num_rotations int

Number of full 360-degree rotations

required

Returns:

Type Description
bool

True if successful, False otherwise.

Source code in cogip/tools/firmware_odometry_calibration/firmware_adapter.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
async def execute_rotations(self, num_rotations: int) -> bool:
    """
    Execute N full rotations in place (Phase 1: WHEEL_DISTANCE).

    Args:
        num_rotations: Number of full 360-degree rotations

    Returns:
        True if successful, False otherwise.
    """
    target_orientation = num_rotations * 360.0
    logger.info(f"Executing {num_rotations} rotations ({target_orientation}°)...")

    # Reset position to origin
    await self.set_start_position(0, 0, 0)
    await asyncio.sleep(0.2)

    quarter_turns = [90, 180, -90, 0]

    with Progress(
        SpinnerColumn(),
        TextColumn("[cyan]Rotation {task.fields[rotation]}/{task.fields[total_count]}[/]"),
        TimeElapsedColumn(),
        console=self._console,
    ) as progress:
        task = progress.add_task("", rotation=1, total_count=num_rotations)

        for rotation_num in range(num_rotations):
            progress.update(task, rotation=rotation_num + 1)
            logger.debug(f"Rotation {rotation_num + 1}/{num_rotations}")
            for target_angle in quarter_turns:
                success = await self.goto(0, 0, target_angle, timeout=30.0)
                if not success:
                    self._console.show_error(
                        f"Rotation {rotation_num + 1}/{num_rotations} failed at {target_angle}° (timeout)"
                    )
                    return False

        progress.update(task, rotation=num_rotations)

    logger.info("Rotations completed")
    return True

execute_squares(num_squares) async #

Execute N square paths clockwise (Phase 2: RIGHT_WHEEL_RADIUS).

Uses the predefined 500mm square path from init.py.

Parameters:

Name Type Description Default
num_squares int

Number of complete squares to execute

required

Returns:

Type Description
bool

True if successful, False otherwise.

Source code in cogip/tools/firmware_odometry_calibration/firmware_adapter.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
async def execute_squares(self, num_squares: int) -> bool:
    """
    Execute N square paths clockwise (Phase 2: RIGHT_WHEEL_RADIUS).

    Uses the predefined 500mm square path from __init__.py.

    Args:
        num_squares: Number of complete squares to execute

    Returns:
        True if successful, False otherwise.
    """
    logger.info(f"Executing {num_squares} squares...")

    # Reset position to origin
    await self.set_start_position(0, 0, 0)
    await asyncio.sleep(0.2)

    with Progress(
        SpinnerColumn(),
        TextColumn("[cyan]Square {task.fields[square]}/{task.fields[total_count]}[/]"),
        TimeElapsedColumn(),
        console=self._console,
    ) as progress:
        task = progress.add_task("", square=1, total_count=num_squares)

        for square_num in range(num_squares):
            progress.update(task, square=square_num + 1)
            for waypoint_idx, waypoint in enumerate(SQUARE_PATH_CCW):
                orientation_degrees = math.degrees(waypoint.O)
                logger.debug(
                    f"Square {square_num + 1}/{num_squares}, waypoint {waypoint_idx + 1}/4: "
                    f"({waypoint.x}, {waypoint.y}, {orientation_degrees}°)"
                )

                success = await self.goto(waypoint.x, waypoint.y, orientation_degrees, timeout=30.0)
                if not success:
                    self._console.show_error(
                        f"Square path failed at waypoint {waypoint_idx + 1}/4 of square {square_num + 1}"
                    )
                    return False

        progress.update(task, square=num_squares)

    logger.info("Squares completed")
    return True

execute_straight_line(distance_mm) async #

Execute a straight line movement (Phase 3: LEFT_WHEEL_RADIUS).

Moves forward from origin by the specified distance.

Parameters:

Name Type Description Default
distance_mm int

Distance to travel in mm

required

Returns:

Type Description
bool

True if successful, False otherwise.

Source code in cogip/tools/firmware_odometry_calibration/firmware_adapter.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
async def execute_straight_line(self, distance_mm: int) -> bool:
    """
    Execute a straight line movement (Phase 3: LEFT_WHEEL_RADIUS).

    Moves forward from origin by the specified distance.

    Args:
        distance_mm: Distance to travel in mm

    Returns:
        True if successful, False otherwise.
    """
    logger.info(f"Executing straight line movement ({distance_mm}mm)...")

    # Reset position to origin facing forward (0°)
    await self.set_start_position(0, 0, 0)
    await asyncio.sleep(0.2)

    with Progress(
        SpinnerColumn(),
        TextColumn("[cyan]Straight line ({task.fields[distance]} mm)[/]"),
        TimeElapsedColumn(),
        console=self._console,
    ) as progress:
        progress.add_task("", distance=distance_mm)

        success = await self.goto(distance_mm, 0, 0, timeout=30.0)
        if not success:
            self._console.show_error("Straight line movement failed (timeout)")
            return False

    logger.info("Straight line completed")
    return True

get_encoder_ticks() async #

Get current encoder tick counts from firmware telemetry.

Waits briefly for telemetry to stabilize before reading values.

Returns:

Type Description
tuple[int, int]

Tuple of (left_ticks, right_ticks)

Source code in cogip/tools/firmware_odometry_calibration/firmware_adapter.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
async def get_encoder_ticks(self) -> tuple[int, int]:
    """
    Get current encoder tick counts from firmware telemetry.

    Waits briefly for telemetry to stabilize before reading values.

    Returns:
        Tuple of (left_ticks, right_ticks)
    """
    await asyncio.sleep(0.1)

    left = self._telemetry.get_value(self.TELEMETRY_LEFT_ENCODER)
    right = self._telemetry.get_value(self.TELEMETRY_RIGHT_ENCODER)

    return int(left), int(right)

goto(x, y, orientation, timeout=60.0) async #

Move robot to target position and wait for completion.

Parameters:

Name Type Description Default
x float

Target X coordinate in mm

required
y float

Target Y coordinate in mm

required
orientation float

Target orientation in degrees

required
timeout float

Maximum time to wait in seconds

60.0

Returns:

Type Description
bool

True if movement completed, False if timeout

Source code in cogip/tools/firmware_odometry_calibration/firmware_adapter.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
async def goto(self, x: float, y: float, orientation: float, timeout: float = 60.0) -> bool:
    """
    Move robot to target position and wait for completion.

    Args:
        x: Target X coordinate in mm
        y: Target Y coordinate in mm
        orientation: Target orientation in degrees
        timeout: Maximum time to wait in seconds

    Returns:
        True if movement completed, False if timeout
    """
    await self._send_pose_order(x, y, orientation)
    return await self._wait_pose_reached(timeout)

load_parameters() async #

Load current odometry parameters from firmware.

Returns:

Type Description
OdometryParameters

OdometryParameters populated with values from firmware

Raises:

Type Description
TimeoutError

If firmware communication times out

Source code in cogip/tools/firmware_odometry_calibration/firmware_adapter.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
async def load_parameters(self) -> OdometryParameters:
    """
    Load current odometry parameters from firmware.

    Returns:
        OdometryParameters populated with values from firmware

    Raises:
        TimeoutError: If firmware communication times out
    """
    logger.info("Loading parameters from firmware...")
    (
        wheels_distance,
        right_diameter,
        left_diameter,
        left_polarity,
        right_polarity,
        encoder_ticks,
    ) = await asyncio.gather(
        self._param_manager.get_parameter_value(self.PARAM_WHEELS_DISTANCE),
        self._param_manager.get_parameter_value(self.PARAM_RIGHT_WHEEL_DIAMETER),
        self._param_manager.get_parameter_value(self.PARAM_LEFT_WHEEL_DIAMETER),
        self._param_manager.get_parameter_value(self.PARAM_LEFT_POLARITY),
        self._param_manager.get_parameter_value(self.PARAM_RIGHT_POLARITY),
        self._param_manager.get_parameter_value(self.PARAM_ENCODER_RESOLUTION),
    )

    params = OdometryParameters(
        wheels_distance=wheels_distance,
        right_wheel_radius=right_diameter / 2.0,
        left_wheel_radius=left_diameter / 2.0,
        left_polarity=left_polarity,
        right_polarity=right_polarity,
        encoder_ticks=encoder_ticks,
    )

    logger.info(f"Loaded parameters: {params}")

    return params

save_parameters(params) async #

Save odometry parameters to firmware.

Only saves the calibrated values (wheels_distance, wheel radii). Polarity and encoder ticks are not modified.

Parameters:

Name Type Description Default
params OdometryParameters

OdometryParameters with values to save

required

Raises:

Type Description
TimeoutError

If firmware communication times out

Source code in cogip/tools/firmware_odometry_calibration/firmware_adapter.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def save_parameters(self, params: OdometryParameters) -> None:
    """
    Save odometry parameters to firmware.

    Only saves the calibrated values (wheels_distance, wheel radii).
    Polarity and encoder ticks are not modified.

    Args:
        params: OdometryParameters with values to save

    Raises:
        TimeoutError: If firmware communication times out
    """
    logger.info(f"Saving parameters to firmware: {params}")

    await asyncio.gather(
        self._param_manager.set_parameter_value(self.PARAM_WHEELS_DISTANCE, params.wheels_distance),
        self._param_manager.set_parameter_value(self.PARAM_RIGHT_WHEEL_DIAMETER, params.right_wheel_radius * 2.0),
        self._param_manager.set_parameter_value(self.PARAM_LEFT_WHEEL_DIAMETER, params.left_wheel_radius * 2.0),
    )

    logger.info("Parameters saved successfully")

set_start_position(x, y, orientation) async #

Set the robot's starting reference position.

Parameters:

Name Type Description Default
x float

X coordinate in mm

required
y float

Y coordinate in mm

required
orientation float

Orientation in degrees

required
Source code in cogip/tools/firmware_odometry_calibration/firmware_adapter.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
async def set_start_position(self, x: float, y: float, orientation: float) -> None:
    """
    Set the robot's starting reference position.

    Args:
        x: X coordinate in mm
        y: Y coordinate in mm
        orientation: Orientation in degrees
    """
    pose = Pose(x=x, y=y, O=orientation)

    logger.debug(f"Setting start position: {pose}")

    await self.sio.emit("pose_start", pose.model_dump(), namespace="/calibration")