Skip to content

firmware_calibration

FirmwareCalibrationNamespace #

Bases: AsyncNamespace

Handle all SocketIO events related to firmware calibration clients. Only one client connection is allowed at a time.

Source code in cogip/tools/server/namespaces/firmware_calibration.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class FirmwareCalibrationNamespace(socketio.AsyncNamespace):
    """
    Handle all SocketIO events related to firmware calibration clients.
    Only one client connection is allowed at a time.
    """

    def __init__(self, cogip_server: "server.Server"):
        super().__init__("/calibration")
        self.cogip_server = cogip_server
        self.context = Context()

    async def on_connect(self, sid, environ):
        if self.context.calibration_sid:
            message = "A calibration client is already connected"
            logger.error(f"Calibration connection refused: {message}")
            raise ConnectionRefusedError(message)

    async def on_connected(self, sid):
        logger.info("Calibration client connected.")
        self.context.calibration_sid = sid

    async def on_disconnect(self, sid):
        self.context.calibration_sid = None
        logger.info("Calibration client disconnected.")

    async def on_pose_start(self, sid, pose: dict[str, Any]):
        """
        Callback on pose start (reset robot position).
        Forward to copilot.
        """
        logger.info(f"[calibration => copilot] Pose start: {pose}")
        await self.emit("pose_start", pose, namespace="/copilot")

    async def on_pose_order(self, sid, pose: dict[str, Any]):
        """
        Callback on pose order.
        Forward to pose to copilot and dashboards.
        """
        logger.info(f"[calibration => copilot] Pose order: {pose}")
        await self.emit("pose_order", pose, namespace="/copilot")
        await self.emit("pose_order", (self.context.robot_id, pose), namespace="/dashboard")

on_pose_order(sid, pose) async #

Callback on pose order. Forward to pose to copilot and dashboards.

Source code in cogip/tools/server/namespaces/firmware_calibration.py
42
43
44
45
46
47
48
49
async def on_pose_order(self, sid, pose: dict[str, Any]):
    """
    Callback on pose order.
    Forward to pose to copilot and dashboards.
    """
    logger.info(f"[calibration => copilot] Pose order: {pose}")
    await self.emit("pose_order", pose, namespace="/copilot")
    await self.emit("pose_order", (self.context.robot_id, pose), namespace="/dashboard")

on_pose_start(sid, pose) async #

Callback on pose start (reset robot position). Forward to copilot.

Source code in cogip/tools/server/namespaces/firmware_calibration.py
34
35
36
37
38
39
40
async def on_pose_start(self, sid, pose: dict[str, Any]):
    """
    Callback on pose start (reset robot position).
    Forward to copilot.
    """
    logger.info(f"[calibration => copilot] Pose start: {pose}")
    await self.emit("pose_start", pose, namespace="/copilot")