Skip to content

sio_events

SioEvents #

Bases: AsyncClientNamespace

Handle all SocketIO events received by FirmwareParameterManager.

Source code in cogip/tools/firmware_parameter_manager/sio_events.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class SioEvents(socketio.AsyncClientNamespace):
    """
    Handle all SocketIO events received by FirmwareParameterManager.
    """

    def __init__(self, manager: "FirmwareParameterManager"):
        super().__init__("/parameters")
        self.manager = manager
        self.connected = False

    async def on_connect(self):
        """
        On connection to cogip-server.
        """
        await asyncio.to_thread(
            polling2.poll,
            lambda: self.client.connected is True,
            step=1,
            poll_forever=True,
        )
        logger.info("Connected to cogip-server")
        await self.emit("connected")

        self.connected = True

    async def on_disconnect(self) -> None:
        """
        On disconnection from cogip-server.
        """
        logger.info("Disconnected from cogip-server")
        self.connected = False

    async def on_connect_error(self, data: dict[str, Any]) -> None:
        """
        On connection error, check if a firmware parameter manager is already connected and exit,
        or retry connection.
        """
        if isinstance(data, dict) and "message" in data:
            message = data["message"]
        else:
            message = data
        logger.error(f"Connection to cogip-server failed: {message}")

    async def on_get_parameter_response(self, data: dict[str, Any]):
        """
        Handle get_parameter_response from copilot.
        Resolve the pending Future with the response.
        """
        response = ParseDict(data, PB_ParameterGetResponse())

        logger.info(f"[SIO] Received get_response for firmware parameter hash: 0x{response.key_hash:08x}")

        # Retrieve the Future corresponding to this request
        if response.key_hash in self.manager.pending_get_requests:
            future = self.manager.pending_get_requests.pop(response.key_hash)
            if not future.done():
                future.set_result(response)
        else:
            logger.warning(f"No pending request found for firmware parameter hash: 0x{response.key_hash:08x}")

    async def on_set_parameter_response(self, data: dict[str, Any]):
        """
        Handle set_parameter_response from copilot.
        Resolve the pending Future with the response.
        """
        response = ParseDict(data, PB_ParameterSetResponse())

        logger.info(f"[SIO] Received set_response for firmware parameter hash: 0x{response.key_hash:08x}")

        # Retrieve the Future corresponding to this request
        if response.key_hash in self.manager.pending_set_requests:
            future = self.manager.pending_set_requests.pop(response.key_hash)
            if not future.done():
                future.set_result(response)
        else:
            logger.warning(f"No pending request found for firmware parameter hash: 0x{response.key_hash:08x}")

on_connect() async #

On connection to cogip-server.

Source code in cogip/tools/firmware_parameter_manager/sio_events.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async def on_connect(self):
    """
    On connection to cogip-server.
    """
    await asyncio.to_thread(
        polling2.poll,
        lambda: self.client.connected is True,
        step=1,
        poll_forever=True,
    )
    logger.info("Connected to cogip-server")
    await self.emit("connected")

    self.connected = True

on_connect_error(data) async #

On connection error, check if a firmware parameter manager is already connected and exit, or retry connection.

Source code in cogip/tools/firmware_parameter_manager/sio_events.py
50
51
52
53
54
55
56
57
58
59
async def on_connect_error(self, data: dict[str, Any]) -> None:
    """
    On connection error, check if a firmware parameter manager is already connected and exit,
    or retry connection.
    """
    if isinstance(data, dict) and "message" in data:
        message = data["message"]
    else:
        message = data
    logger.error(f"Connection to cogip-server failed: {message}")

on_disconnect() async #

On disconnection from cogip-server.

Source code in cogip/tools/firmware_parameter_manager/sio_events.py
43
44
45
46
47
48
async def on_disconnect(self) -> None:
    """
    On disconnection from cogip-server.
    """
    logger.info("Disconnected from cogip-server")
    self.connected = False

on_get_parameter_response(data) async #

Handle get_parameter_response from copilot. Resolve the pending Future with the response.

Source code in cogip/tools/firmware_parameter_manager/sio_events.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
async def on_get_parameter_response(self, data: dict[str, Any]):
    """
    Handle get_parameter_response from copilot.
    Resolve the pending Future with the response.
    """
    response = ParseDict(data, PB_ParameterGetResponse())

    logger.info(f"[SIO] Received get_response for firmware parameter hash: 0x{response.key_hash:08x}")

    # Retrieve the Future corresponding to this request
    if response.key_hash in self.manager.pending_get_requests:
        future = self.manager.pending_get_requests.pop(response.key_hash)
        if not future.done():
            future.set_result(response)
    else:
        logger.warning(f"No pending request found for firmware parameter hash: 0x{response.key_hash:08x}")

on_set_parameter_response(data) async #

Handle set_parameter_response from copilot. Resolve the pending Future with the response.

Source code in cogip/tools/firmware_parameter_manager/sio_events.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def on_set_parameter_response(self, data: dict[str, Any]):
    """
    Handle set_parameter_response from copilot.
    Resolve the pending Future with the response.
    """
    response = ParseDict(data, PB_ParameterSetResponse())

    logger.info(f"[SIO] Received set_response for firmware parameter hash: 0x{response.key_hash:08x}")

    # Retrieve the Future corresponding to this request
    if response.key_hash in self.manager.pending_set_requests:
        future = self.manager.pending_set_requests.pop(response.key_hash)
        if not future.done():
            future.set_result(response)
    else:
        logger.warning(f"No pending request found for firmware parameter hash: 0x{response.key_hash:08x}")