Skip to content

sio_client

SocketioClient #

Bases: QObject

This class controls the socket.io port used to communicate with the server. Its main purpose is to get the shell and tool menus to update the interface, get the robot position to update its position, and send the commands to the robot and to the tools.

Attributes:

Name Type Description
signal_connected Signal

Qt signal emitted on server connection state changes

signal_exit Signal

Qt signal emitted to exit Monitor

signal_add_robot Signal

Qt signal emitted to add a new robot

signal_del_robot Signal

Qt signal emitted to remove a robot

signal_robot_path Signal

Qt signal emitted on robot path update

signal_tool_menu Signal

Qt signal emitted to load a new tool menu

signal_config_request Signal

Qt signal emitted on configuration requests

signal_wizard_request Signal

Qt signal emitted to forward wizard requests

signal_starter_changed Signal

Qt signal emitted the starter state has changed

Source code in cogip/tools/monitor/sio_client.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class SocketioClient(QtCore.QObject):
    """
    This class controls the socket.io port used to communicate with the server.
    Its main purpose is to get the shell and tool menus to update the interface,
    get the robot position to update its position, and send the commands
    to the robot and to the tools.

    Attributes:
        signal_connected:
            Qt signal emitted on server connection state changes
        signal_exit:
            Qt signal emitted to exit Monitor
        signal_add_robot:
            Qt signal emitted to add a new robot
        signal_del_robot:
            Qt signal emitted to remove a robot
        signal_robot_path:
            Qt signal emitted on robot path update
        signal_tool_menu:
            Qt signal emitted to load a new tool menu
        signal_config_request:
            Qt signal emitted on configuration requests
        signal_wizard_request:
            Qt signal emitted to forward wizard requests
        signal_starter_changed:
            Qt signal emitted the starter state has changed
    """

    signal_connected: QtSignal = QtSignal(bool)
    signal_exit: QtSignal = QtSignal()
    signal_add_robot: QtSignal = QtSignal(int, bool, bool)
    signal_del_robot: QtSignal = QtSignal(int)
    signal_robot_path: QtSignal = QtSignal(list)
    signal_tool_menu: QtSignal = QtSignal(models.ShellMenu)
    signal_config_request: QtSignal = QtSignal(dict)
    signal_wizard_request: QtSignal = QtSignal(dict)
    signal_close_wizard: QtSignal = QtSignal()
    signal_starter_changed: QtSignal = QtSignal(bool)

    def __init__(self, url: str):
        """
        Class constructor.

        Arguments:
            url: URL to socket.io server
        """
        super().__init__()

        self.url = url
        self.sio = socketio.Client()
        self.register_events()

    def start(self):
        """
        Connect to socket.io server.
        """
        # Poll in background to wait for the first connection.
        # Disconnections/re-connections are handled directly by the client.
        self.retry_connection = True
        Thread(target=self.try_connect).start()

    def try_connect(self):
        while self.retry_connection:
            try:
                self.sio.connect(self.url, namespaces=["/monitor", "/dashboard"])
            except ConnectionError as ex:
                logger.error(str(ex))
                time.sleep(2)
                continue
            break

    def stop(self):
        """
        Disconnect from socket.io server.
        """
        self.retry_connection = False
        if self.sio.connected:
            self.sio.disconnect()

    @QtSlot(str)
    def tool_command(self, command: str):
        """
        Send a command to the robot.

        Arguments:
            command: Command to send
        """
        self.sio.emit("tool_cmd", command, namespace="/dashboard")

    @QtSlot(str, str, str, "QVariant", bool)
    def send_config_update(self, namespace: str, sio_event: str, name: str, value: object, is_integer: bool = False):
        """
        Forward a configuration update to the server.

        Arguments:
            namespace: Socket.IO namespace to target
            sio_event: Event name to emit (defaults to 'config_updated' when empty)
            name: Property name
            value: New property value
            is_integer: Whether the value should be converted to an integer
                (integers and floats are sent as floats from QML)
        """
        value = int(value) if is_integer else value
        payload = {
            "namespace": namespace,
            "sio_event": sio_event or "config_updated",
            "name": name,
            "value": value,
        }
        self.sio.emit("config_updated", payload, namespace="/dashboard")

    @QtSlot(dict)
    def wizard_response(self, response: dict[str, Any]):
        if not response.get("namespace"):
            return
        match response["type"]:
            case "choice_integer":
                response["value"] = int(response["value"])
            case "choice_str_group":
                response["type"] = "choice_str"
        self.sio.emit("wizard", response, namespace="/dashboard")

    @QtSlot(bool)
    def starter_changed(self, pushed: bool):
        self.sio.emit("starter_changed", pushed, namespace="/dashboard")

    def register_events(self):
        """
        Define socket.io message handlers.
        """

        @self.sio.on("connect", namespace="/dashboard")
        def dashboard_connect():
            """
            Callback on server connection.
            """
            polling2.poll(lambda: self.sio.connected is True, step=0.2, poll_forever=True)
            logger.info("Dashboard connected to cogip-server")
            self.sio.emit("connected", namespace="/dashboard")

        @self.sio.on("connect", namespace="/monitor")
        def monitor_connect():
            """
            Callback on server connection.
            """
            polling2.poll(lambda: self.sio.connected is True, step=0.2, poll_forever=True)
            logger.info("Monitor connected to cogip-server")
            self.sio.emit("connected", namespace="/monitor")
            self.signal_connected.emit(True)

        @self.sio.event(namespace="/monitor")
        def connect_error(data):
            """
            Callback on server connection error.
            """
            if (
                data
                and isinstance(data, dict)
                and (message := data.get("message"))
                and message == "A monitor is already connected"
            ):
                logger.error(f"Error: {message}.")
                self.retry_connection = False
                self.signal_exit.emit()
                return
            logger.error(f"Monitor connection error: {data}")
            self.signal_connected.emit(False)

        @self.sio.event(namespace="/dashboard")
        def dashboard_disconnect():
            """
            Callback on server disconnection.
            """
            logger.info("Dashboard disconnected from cogip-server")

        @self.sio.event(namespace="/monitor")
        def monitor_disconnect():
            """
            Callback on server disconnection.
            """
            self.signal_connected.emit(False)
            logger.info("Monitor disconnected from cogip-server")

        @self.sio.on("add_robot", namespace="/monitor")
        def on_add_robot(robot_id: int, virtual_planner: bool, virtual_detector: bool) -> None:
            """
            Add a new robot.
            """
            self.signal_add_robot.emit(int(robot_id), virtual_planner, virtual_detector)

        @self.sio.on("del_robot", namespace="/monitor")
        def on_del_robot(robot_id: int) -> None:
            """
            Remove a robot.
            """
            self.signal_del_robot.emit(robot_id)

        @self.sio.on("path", namespace="/dashboard")
        def on_path(robot_id: int, data: list[dict[str, float]]) -> None:
            """
            Callback on robot path message.
            """
            try:
                path = TypeAdapter(list[models.Pose]).validate_python(data)
                self.signal_robot_path.emit(path)
            except ValidationError as exc:
                logger.warning("Failed to decode robot path: %s", exc)

        @self.sio.on("tool_menu", namespace="/dashboard")
        def on_tool_menu(data):
            """
            Callback on tool menu message.
            """
            try:
                menu = models.ShellMenu.model_validate(data)
            except ValidationError as exc:
                logger.warning("Failed to decode tool menu: %s", exc)
                return

            logger.info("Tool menu '%s' received with %d entries", menu.name, len(menu.entries))
            self.signal_tool_menu.emit(menu)

        @self.sio.on("config", namespace="/dashboard")
        def on_config(config):
            """
            Callback on config request.
            """
            logger.debug("Config received: %s", config)
            self.signal_config_request.emit(config)

        @self.sio.on("wizard", namespace="/dashboard")
        def on_wizard_request(data: dict[str, Any]) -> None:
            """
            Wizard request.
            """
            if (
                (choices := data.get("choices"))
                and isinstance(choices, list)
                and choices
                and isinstance(choices[0], list)
            ):
                data["type"] = "choice_str_group"
            self.signal_wizard_request.emit(data)

        @self.sio.on("close_wizard", namespace="/dashboard")
        def on_close_wizard() -> None:
            """
            Close wizard.
            """
            self.signal_close_wizard.emit()

        @self.sio.on("score", namespace="/dashboard")
        def on_score(data: int) -> None:
            """
            Score.
            """
            self.signal_wizard_request.emit(
                {
                    "name": "Score",
                    "type": "message",
                    "value": str(data),
                    "namespace": "",
                }
            )

        @self.sio.on("starter_changed", namespace="/dashboard")
        def on_starter_changed(pushed: bool) -> None:
            """
            Change the state of a starter.
            """
            self.signal_starter_changed.emit(pushed)

__init__(url) #

Class constructor.

Parameters:

Name Type Description Default
url str

URL to socket.io server

required
Source code in cogip/tools/monitor/sio_client.py
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(self, url: str):
    """
    Class constructor.

    Arguments:
        url: URL to socket.io server
    """
    super().__init__()

    self.url = url
    self.sio = socketio.Client()
    self.register_events()

register_events() #

Define socket.io message handlers.

Source code in cogip/tools/monitor/sio_client.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def register_events(self):
    """
    Define socket.io message handlers.
    """

    @self.sio.on("connect", namespace="/dashboard")
    def dashboard_connect():
        """
        Callback on server connection.
        """
        polling2.poll(lambda: self.sio.connected is True, step=0.2, poll_forever=True)
        logger.info("Dashboard connected to cogip-server")
        self.sio.emit("connected", namespace="/dashboard")

    @self.sio.on("connect", namespace="/monitor")
    def monitor_connect():
        """
        Callback on server connection.
        """
        polling2.poll(lambda: self.sio.connected is True, step=0.2, poll_forever=True)
        logger.info("Monitor connected to cogip-server")
        self.sio.emit("connected", namespace="/monitor")
        self.signal_connected.emit(True)

    @self.sio.event(namespace="/monitor")
    def connect_error(data):
        """
        Callback on server connection error.
        """
        if (
            data
            and isinstance(data, dict)
            and (message := data.get("message"))
            and message == "A monitor is already connected"
        ):
            logger.error(f"Error: {message}.")
            self.retry_connection = False
            self.signal_exit.emit()
            return
        logger.error(f"Monitor connection error: {data}")
        self.signal_connected.emit(False)

    @self.sio.event(namespace="/dashboard")
    def dashboard_disconnect():
        """
        Callback on server disconnection.
        """
        logger.info("Dashboard disconnected from cogip-server")

    @self.sio.event(namespace="/monitor")
    def monitor_disconnect():
        """
        Callback on server disconnection.
        """
        self.signal_connected.emit(False)
        logger.info("Monitor disconnected from cogip-server")

    @self.sio.on("add_robot", namespace="/monitor")
    def on_add_robot(robot_id: int, virtual_planner: bool, virtual_detector: bool) -> None:
        """
        Add a new robot.
        """
        self.signal_add_robot.emit(int(robot_id), virtual_planner, virtual_detector)

    @self.sio.on("del_robot", namespace="/monitor")
    def on_del_robot(robot_id: int) -> None:
        """
        Remove a robot.
        """
        self.signal_del_robot.emit(robot_id)

    @self.sio.on("path", namespace="/dashboard")
    def on_path(robot_id: int, data: list[dict[str, float]]) -> None:
        """
        Callback on robot path message.
        """
        try:
            path = TypeAdapter(list[models.Pose]).validate_python(data)
            self.signal_robot_path.emit(path)
        except ValidationError as exc:
            logger.warning("Failed to decode robot path: %s", exc)

    @self.sio.on("tool_menu", namespace="/dashboard")
    def on_tool_menu(data):
        """
        Callback on tool menu message.
        """
        try:
            menu = models.ShellMenu.model_validate(data)
        except ValidationError as exc:
            logger.warning("Failed to decode tool menu: %s", exc)
            return

        logger.info("Tool menu '%s' received with %d entries", menu.name, len(menu.entries))
        self.signal_tool_menu.emit(menu)

    @self.sio.on("config", namespace="/dashboard")
    def on_config(config):
        """
        Callback on config request.
        """
        logger.debug("Config received: %s", config)
        self.signal_config_request.emit(config)

    @self.sio.on("wizard", namespace="/dashboard")
    def on_wizard_request(data: dict[str, Any]) -> None:
        """
        Wizard request.
        """
        if (
            (choices := data.get("choices"))
            and isinstance(choices, list)
            and choices
            and isinstance(choices[0], list)
        ):
            data["type"] = "choice_str_group"
        self.signal_wizard_request.emit(data)

    @self.sio.on("close_wizard", namespace="/dashboard")
    def on_close_wizard() -> None:
        """
        Close wizard.
        """
        self.signal_close_wizard.emit()

    @self.sio.on("score", namespace="/dashboard")
    def on_score(data: int) -> None:
        """
        Score.
        """
        self.signal_wizard_request.emit(
            {
                "name": "Score",
                "type": "message",
                "value": str(data),
                "namespace": "",
            }
        )

    @self.sio.on("starter_changed", namespace="/dashboard")
    def on_starter_changed(pushed: bool) -> None:
        """
        Change the state of a starter.
        """
        self.signal_starter_changed.emit(pushed)

send_config_update(namespace, sio_event, name, value, is_integer=False) #

Forward a configuration update to the server.

Parameters:

Name Type Description Default
namespace str

Socket.IO namespace to target

required
sio_event str

Event name to emit (defaults to 'config_updated' when empty)

required
name str

Property name

required
value object

New property value

required
is_integer bool

Whether the value should be converted to an integer (integers and floats are sent as floats from QML)

False
Source code in cogip/tools/monitor/sio_client.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@QtSlot(str, str, str, "QVariant", bool)
def send_config_update(self, namespace: str, sio_event: str, name: str, value: object, is_integer: bool = False):
    """
    Forward a configuration update to the server.

    Arguments:
        namespace: Socket.IO namespace to target
        sio_event: Event name to emit (defaults to 'config_updated' when empty)
        name: Property name
        value: New property value
        is_integer: Whether the value should be converted to an integer
            (integers and floats are sent as floats from QML)
    """
    value = int(value) if is_integer else value
    payload = {
        "namespace": namespace,
        "sio_event": sio_event or "config_updated",
        "name": name,
        "value": value,
    }
    self.sio.emit("config_updated", payload, namespace="/dashboard")

start() #

Connect to socket.io server.

Source code in cogip/tools/monitor/sio_client.py
69
70
71
72
73
74
75
76
def start(self):
    """
    Connect to socket.io server.
    """
    # Poll in background to wait for the first connection.
    # Disconnections/re-connections are handled directly by the client.
    self.retry_connection = True
    Thread(target=self.try_connect).start()

stop() #

Disconnect from socket.io server.

Source code in cogip/tools/monitor/sio_client.py
88
89
90
91
92
93
94
def stop(self):
    """
    Disconnect from socket.io server.
    """
    self.retry_connection = False
    if self.sio.connected:
        self.sio.disconnect()

tool_command(command) #

Send a command to the robot.

Parameters:

Name Type Description Default
command str

Command to send

required
Source code in cogip/tools/monitor/sio_client.py
 96
 97
 98
 99
100
101
102
103
104
@QtSlot(str)
def tool_command(self, command: str):
    """
    Send a command to the robot.

    Arguments:
        command: Command to send
    """
    self.sio.emit("tool_cmd", command, namespace="/dashboard")