Bases: AsyncNamespace
Handle all SocketIO events related to firmware calibration clients.
Only one client connection is allowed at a time.
Source code in cogip/tools/server/namespaces/firmware_calibration.py
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 | class FirmwareCalibrationNamespace(socketio.AsyncNamespace):
"""
Handle all SocketIO events related to firmware calibration clients.
Only one client connection is allowed at a time.
"""
def __init__(self, cogip_server: "server.Server"):
super().__init__("/calibration")
self.cogip_server = cogip_server
self.context = Context()
async def on_connect(self, sid, environ):
if self.context.calibration_sid:
message = "A calibration client is already connected"
logger.error(f"Calibration connection refused: {message}")
raise ConnectionRefusedError(message)
async def on_connected(self, sid):
logger.info("Calibration client connected.")
self.context.calibration_sid = sid
async def on_disconnect(self, sid):
self.context.calibration_sid = None
logger.info("Calibration client disconnected.")
async def on_pose_start(self, sid, pose: dict[str, Any]):
"""
Callback on pose start (reset robot position).
Forward to copilot.
"""
logger.info(f"[calibration => copilot] Pose start: {pose}")
await self.emit("pose_start", pose, namespace="/copilot")
async def on_pose_order(self, sid, pose: dict[str, Any]):
"""
Callback on pose order.
Forward to pose to copilot and dashboards.
"""
logger.info(f"[calibration => copilot] Pose order: {pose}")
await self.emit("pose_order", pose, namespace="/copilot")
await self.emit("pose_order", (self.context.robot_id, pose), namespace="/dashboard")
|
Callback on pose order.
Forward to pose to copilot and dashboards.
Source code in cogip/tools/server/namespaces/firmware_calibration.py
| async def on_pose_order(self, sid, pose: dict[str, Any]):
"""
Callback on pose order.
Forward to pose to copilot and dashboards.
"""
logger.info(f"[calibration => copilot] Pose order: {pose}")
await self.emit("pose_order", pose, namespace="/copilot")
await self.emit("pose_order", (self.context.robot_id, pose), namespace="/dashboard")
|
Callback on pose start (reset robot position).
Forward to copilot.
Source code in cogip/tools/server/namespaces/firmware_calibration.py
| async def on_pose_start(self, sid, pose: dict[str, Any]):
"""
Callback on pose start (reset robot position).
Forward to copilot.
"""
logger.info(f"[calibration => copilot] Pose start: {pose}")
await self.emit("pose_start", pose, namespace="/copilot")
|