Skip to content

sio_events

SioEvents #

Bases: AsyncClientNamespace

Handle all SocketIO events received by the odometry calibration tool.

Source code in cogip/tools/firmware_odometry_calibration/sio_events.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class SioEvents(socketio.AsyncClientNamespace):
    """
    Handle all SocketIO events received by the odometry calibration tool.
    """

    def __init__(self, calibration: "OdometryCalibration"):
        super().__init__("/calibration")
        self.calibration = calibration
        self.connected = False

    async def on_connect(self):
        """
        On connection to cogip-server.
        """
        await asyncio.to_thread(
            polling2.poll,
            lambda: self.client.connected is True,
            step=0.2,
            poll_forever=True,
        )
        logger.info("Connected to cogip-server on /calibration")
        await self.emit("connected")
        self.connected = True

    async def on_disconnect(self) -> None:
        """
        On disconnection from cogip-server.
        """
        logger.info("Disconnected from cogip-server")
        self.connected = False

    async def on_connect_error(self, data: dict[str, Any]) -> None:
        """
        On connection error.
        """
        if isinstance(data, dict) and "message" in data:
            message = data["message"]
        else:
            message = data
        logger.error(f"Connection to cogip-server failed: {message}")

    async def on_pose_reached(self) -> None:
        """
        Handle pose_reached event from copilot (via server).
        Signal that the robot has reached its target position.
        """
        logger.debug("Pose reached")
        self.calibration.pose_reached_event.set()

on_connect() async #

On connection to cogip-server.

Source code in cogip/tools/firmware_odometry_calibration/sio_events.py
23
24
25
26
27
28
29
30
31
32
33
34
35
async def on_connect(self):
    """
    On connection to cogip-server.
    """
    await asyncio.to_thread(
        polling2.poll,
        lambda: self.client.connected is True,
        step=0.2,
        poll_forever=True,
    )
    logger.info("Connected to cogip-server on /calibration")
    await self.emit("connected")
    self.connected = True

on_connect_error(data) async #

On connection error.

Source code in cogip/tools/firmware_odometry_calibration/sio_events.py
44
45
46
47
48
49
50
51
52
async def on_connect_error(self, data: dict[str, Any]) -> None:
    """
    On connection error.
    """
    if isinstance(data, dict) and "message" in data:
        message = data["message"]
    else:
        message = data
    logger.error(f"Connection to cogip-server failed: {message}")

on_disconnect() async #

On disconnection from cogip-server.

Source code in cogip/tools/firmware_odometry_calibration/sio_events.py
37
38
39
40
41
42
async def on_disconnect(self) -> None:
    """
    On disconnection from cogip-server.
    """
    logger.info("Disconnected from cogip-server")
    self.connected = False

on_pose_reached() async #

Handle pose_reached event from copilot (via server). Signal that the robot has reached its target position.

Source code in cogip/tools/firmware_odometry_calibration/sio_events.py
54
55
56
57
58
59
60
async def on_pose_reached(self) -> None:
    """
    Handle pose_reached event from copilot (via server).
    Signal that the robot has reached its target position.
    """
    logger.debug("Pose reached")
    self.calibration.pose_reached_event.set()