Skip to content

robot

Robot #

Source code in cogip/tools/monitor/robots/robot.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class Robot:
    update_pose_current_interval: int = 100

    def __init__(
        self,
        root: QObject,
        view: QObject | None,
        robot_id: int,
        virtual_planner: bool,
        virtual_detector: bool,
    ):
        self.root = root
        self.scene = self.root.parent() if self.root else None
        self.view = view if view is not None else (self.scene.parent() if self.scene else None)
        self.robot_id = robot_id
        self.virtual_planner = virtual_planner
        self.virtual_detector = virtual_detector
        self.shm = SharedMemoryManager()
        self.lidar_ray_nodes: dict[int, QObject] = {}
        self.lidar_distances_changed: bool = False

        self.node = self.root.findChild(QObject, "Scene")
        self.models = [m for m in self.node.children() if m.metaObject().className() == "QQuick3DModel"]
        for model in self.models:
            model.setObjectName(f"robot_{model.objectName()}")

        self.update_pose_current_timer = QTimer(self.root)
        self.update_pose_current_timer.setInterval(Robot.update_pose_current_interval)
        self.update_pose_current_timer.timeout.connect(self.update_pose_current_from_shm)

        if self.virtual_planner:
            self.update_pose_current_timer.start()

        if self.virtual_detector:
            for i in range(360):
                self.shm.shared_lidar_data[i][0] = i
                self.shm.shared_lidar_data[i][2] = 255
            if self.robot_id > 1:
                for i in range(90, 270):
                    self.shm.shared_lidar_data[i][1] = 65535
            self.shm.shared_lidar_data[360][0] = -1

            # Find Lidar ray nodes
            if self.view is None:
                logger.warning("Unable to locate View3D instance for lidar ray binding")
            else:
                lidar_ray_nodes_prop: QJSValue = self.view.property("lidarRayNodes")
                it = QJSValueIterator(lidar_ray_nodes_prop)
                while it.hasNext():
                    it.next()
                    try:
                        index = int(it.name())
                    except ValueError:
                        continue
                    q_object = it.value().toQObject()
                    if not q_object:
                        continue
                    self.lidar_ray_nodes[index] = q_object

                    try:
                        signal_distance_changed: QtSignalInstance = q_object.distanceChanged
                        if isinstance(signal_distance_changed, QtSignalInstance):
                            signal_distance_changed.connect(partial(self.handle_distance_changed, index))
                    except AttributeError:
                        continue

                try:
                    signal_post_lidar_update: QtSignalInstance = self.view.postLidarUpdate
                    if isinstance(signal_post_lidar_update, QtSignalInstance):
                        signal_post_lidar_update.connect(self.post_lidar_update)
                except AttributeError:
                    logger.error("No postLidarUpdate signal found on view")

    def update_pose_current_from_shm(self) -> None:
        """
        Update pose current from shared memory.
        """
        if self.shm.shared_pose_current_buffer is None:
            return

        pose_current = self.shm.shared_pose_current_buffer.last
        if pose_current is None or self.root is None:
            return

        self.root.setProperty("x", pose_current.x)
        self.root.setProperty("y", pose_current.y)
        self.root.setProperty("eulerRotation", QVector3D(0, 0, pose_current.angle))

    def update_pose_current_from_model(self, pose_current: models.Pose) -> None:
        """
        Update pose current from model.
        """
        self.root.setProperty("x", pose_current.x)
        self.root.setProperty("y", pose_current.y)
        self.root.setProperty("eulerRotation", QVector3D(0, 0, pose_current.O))

    def handle_distance_changed(self, index: int) -> None:
        q_object = self.lidar_ray_nodes.get(index)
        if not q_object:
            return
        distance = q_object.property("distance")
        if abs(distance - self.shm.shared_lidar_data[index][1]) < 1.0:
            return
        self.lidar_distances_changed = True
        self.shm.shared_lidar_data[index][1] = distance

    def post_lidar_update(self) -> None:
        """
        Post lidar update to shared memory if distances have changed.
        """
        if self.lidar_distances_changed:
            self.lidar_distances_changed = False
            self.shm.shared_lidar_data_lock.post_update()

post_lidar_update() #

Post lidar update to shared memory if distances have changed.

Source code in cogip/tools/monitor/robots/robot.py
119
120
121
122
123
124
125
def post_lidar_update(self) -> None:
    """
    Post lidar update to shared memory if distances have changed.
    """
    if self.lidar_distances_changed:
        self.lidar_distances_changed = False
        self.shm.shared_lidar_data_lock.post_update()

update_pose_current_from_model(pose_current) #

Update pose current from model.

Source code in cogip/tools/monitor/robots/robot.py
101
102
103
104
105
106
107
def update_pose_current_from_model(self, pose_current: models.Pose) -> None:
    """
    Update pose current from model.
    """
    self.root.setProperty("x", pose_current.x)
    self.root.setProperty("y", pose_current.y)
    self.root.setProperty("eulerRotation", QVector3D(0, 0, pose_current.O))

update_pose_current_from_shm() #

Update pose current from shared memory.

Source code in cogip/tools/monitor/robots/robot.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def update_pose_current_from_shm(self) -> None:
    """
    Update pose current from shared memory.
    """
    if self.shm.shared_pose_current_buffer is None:
        return

    pose_current = self.shm.shared_pose_current_buffer.last
    if pose_current is None or self.root is None:
        return

    self.root.setProperty("x", pose_current.x)
    self.root.setProperty("y", pose_current.y)
    self.root.setProperty("eulerRotation", QVector3D(0, 0, pose_current.angle))