Skip to content

robots

RobotManager #

Bases: QObject

Source code in cogip/tools/monitor/robots.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class RobotManager(QtCore.QObject):
    def __init__(self, win: MainWindow):
        """
        Class constructor.

        Parameters:
            game_view: parent of the robots
        """
        super().__init__()
        self._win = win
        self._game_view = win.game_view
        self._rect_obstacles_pool: list[DynRectObstacleEntity] = []
        self._round_obstacles_pool: list[DynCircleObstacleEntity] = []
        self._update_obstacles = QtCore.QTimer()
        self._update_obstacles.timeout.connect(self.update_obstacles)
        self._update_obstacles_interval = 100
        self.virtual_planner = False
        self.virtual_detector = False

    def add_robot(self, robot_id: int, virtual_planner: bool, virtual_detector: bool) -> None:
        """
        Add the robot.

        Parameters:
            robot_id: ID of the robot
            virtual_planner: whether the planner is virtual or not,
                if planner is virtual, use shared memory to get the robot current, pose order and obstacles.
            virtual_detector: whether the detector is virtual or not,
                if detector is virtual, detect virtual obstacles and write them in shared memory.
        """
        self.virtual_planner = virtual_planner
        self.virtual_detector = virtual_detector
        self.robot = RobotEntity(
            robot_id,
            self._win,
            self._game_view.scene_entity,
            virtual_planner=virtual_planner,
            virtual_detector=virtual_detector,
        )
        self._game_view.add_asset(self.robot)

        if virtual_planner:
            self._update_obstacles.start(self._update_obstacles_interval)

    def del_robot(self, robot_id: int = 0) -> None:
        """
        Remove a robot.

        Parameters:
            robot_id: ID of the robot to remove
        """
        if self.virtual_planner:
            self._update_obstacles.stop()
        self.robot.setParent(None)
        self.robot.delete_shared_memory()
        self.robot = None

    def new_robot_pose_order(self, new_pose: Pose) -> None:
        """
        Set the robot's new pose order.

        Arguments:
            new_pose: new robot pose
        """
        if self.robot:
            self.robot.new_robot_pose_order(new_pose)

    def update_obstacles(self) -> None:
        """
        Qt Slot

        Display the dynamic obstacles detected by the robot.

        Reuse already created dynamic obstacles to optimize performance
        and memory consumption.
        """
        # Store new and already existing dyn obstacles
        current_rect_obstacles = []
        current_round_obstacles = []

        if self.robot:
            if self.robot.shared_circle_obstacles is not None:
                self.robot.shared_obstacles_lock.start_reading()
                for circle_obstacle in self.robot.shared_circle_obstacles:
                    if len(self._round_obstacles_pool):
                        obstacle = self._round_obstacles_pool.pop(0)
                        obstacle.setEnabled(True)
                    else:
                        obstacle = DynCircleObstacleEntity(self._game_view.scene_entity)

                    obstacle.set_position(
                        x=circle_obstacle.center.x,
                        y=circle_obstacle.center.y,
                        radius=circle_obstacle.radius,
                    )
                    obstacle.set_bounding_box(circle_obstacle.bounding_box)

                    current_round_obstacles.append(obstacle)
                self.robot.shared_obstacles_lock.finish_reading()

            if self.robot.shared_rectangle_obstacles is not None:
                for rectangle_obstacle in self.robot.shared_rectangle_obstacles:
                    if len(self._rect_obstacles_pool):
                        obstacle = self._rect_obstacles_pool.pop(0)
                        obstacle.setEnabled(True)
                    else:
                        obstacle = DynRectObstacleEntity(self._game_view.scene_entity)

                    obstacle.set_position(
                        x=rectangle_obstacle.center.x,
                        y=rectangle_obstacle.center.y,
                        rotation=rectangle_obstacle.center.angle,
                    )
                    obstacle.set_size(length=rectangle_obstacle.length_y, width=rectangle_obstacle.length_x)
                    obstacle.set_bounding_box(rectangle_obstacle.bounding_box)

                    current_rect_obstacles.append(obstacle)

        # Disable remaining dyn obstacles
        while len(self._rect_obstacles_pool):
            dyn_obstacle = self._rect_obstacles_pool.pop(0)
            dyn_obstacle.setEnabled(False)
            current_rect_obstacles.append(dyn_obstacle)

        while len(self._round_obstacles_pool):
            dyn_obstacle = self._round_obstacles_pool.pop(0)
            dyn_obstacle.setEnabled(False)
            current_round_obstacles.append(dyn_obstacle)

        self._rect_obstacles_pool = current_rect_obstacles
        self._round_obstacles_pool = current_round_obstacles

    def update_shared_obstacles(self, obstacles: list[Vertex]):
        if self.robot:
            self.robot.shared_monitor_obstacles_lock.start_writing()
            self.robot.shared_monitor_obstacles.clear()
            for obstacle in obstacles:
                self.robot.shared_monitor_obstacles.append(obstacle.x, obstacle.y)
            self.robot.shared_monitor_obstacles_lock.finish_writing()

__init__(win) #

Class constructor.

Parameters:

Name Type Description Default
game_view

parent of the robots

required
Source code in cogip/tools/monitor/robots.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def __init__(self, win: MainWindow):
    """
    Class constructor.

    Parameters:
        game_view: parent of the robots
    """
    super().__init__()
    self._win = win
    self._game_view = win.game_view
    self._rect_obstacles_pool: list[DynRectObstacleEntity] = []
    self._round_obstacles_pool: list[DynCircleObstacleEntity] = []
    self._update_obstacles = QtCore.QTimer()
    self._update_obstacles.timeout.connect(self.update_obstacles)
    self._update_obstacles_interval = 100
    self.virtual_planner = False
    self.virtual_detector = False

add_robot(robot_id, virtual_planner, virtual_detector) #

Add the robot.

Parameters:

Name Type Description Default
robot_id int

ID of the robot

required
virtual_planner bool

whether the planner is virtual or not, if planner is virtual, use shared memory to get the robot current, pose order and obstacles.

required
virtual_detector bool

whether the detector is virtual or not, if detector is virtual, detect virtual obstacles and write them in shared memory.

required
Source code in cogip/tools/monitor/robots.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def add_robot(self, robot_id: int, virtual_planner: bool, virtual_detector: bool) -> None:
    """
    Add the robot.

    Parameters:
        robot_id: ID of the robot
        virtual_planner: whether the planner is virtual or not,
            if planner is virtual, use shared memory to get the robot current, pose order and obstacles.
        virtual_detector: whether the detector is virtual or not,
            if detector is virtual, detect virtual obstacles and write them in shared memory.
    """
    self.virtual_planner = virtual_planner
    self.virtual_detector = virtual_detector
    self.robot = RobotEntity(
        robot_id,
        self._win,
        self._game_view.scene_entity,
        virtual_planner=virtual_planner,
        virtual_detector=virtual_detector,
    )
    self._game_view.add_asset(self.robot)

    if virtual_planner:
        self._update_obstacles.start(self._update_obstacles_interval)

del_robot(robot_id=0) #

Remove a robot.

Parameters:

Name Type Description Default
robot_id int

ID of the robot to remove

0
Source code in cogip/tools/monitor/robots.py
53
54
55
56
57
58
59
60
61
62
63
64
def del_robot(self, robot_id: int = 0) -> None:
    """
    Remove a robot.

    Parameters:
        robot_id: ID of the robot to remove
    """
    if self.virtual_planner:
        self._update_obstacles.stop()
    self.robot.setParent(None)
    self.robot.delete_shared_memory()
    self.robot = None

new_robot_pose_order(new_pose) #

Set the robot's new pose order.

Parameters:

Name Type Description Default
new_pose Pose

new robot pose

required
Source code in cogip/tools/monitor/robots.py
66
67
68
69
70
71
72
73
74
def new_robot_pose_order(self, new_pose: Pose) -> None:
    """
    Set the robot's new pose order.

    Arguments:
        new_pose: new robot pose
    """
    if self.robot:
        self.robot.new_robot_pose_order(new_pose)

update_obstacles() #

Qt Slot

Display the dynamic obstacles detected by the robot.

Reuse already created dynamic obstacles to optimize performance and memory consumption.

Source code in cogip/tools/monitor/robots.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def update_obstacles(self) -> None:
    """
    Qt Slot

    Display the dynamic obstacles detected by the robot.

    Reuse already created dynamic obstacles to optimize performance
    and memory consumption.
    """
    # Store new and already existing dyn obstacles
    current_rect_obstacles = []
    current_round_obstacles = []

    if self.robot:
        if self.robot.shared_circle_obstacles is not None:
            self.robot.shared_obstacles_lock.start_reading()
            for circle_obstacle in self.robot.shared_circle_obstacles:
                if len(self._round_obstacles_pool):
                    obstacle = self._round_obstacles_pool.pop(0)
                    obstacle.setEnabled(True)
                else:
                    obstacle = DynCircleObstacleEntity(self._game_view.scene_entity)

                obstacle.set_position(
                    x=circle_obstacle.center.x,
                    y=circle_obstacle.center.y,
                    radius=circle_obstacle.radius,
                )
                obstacle.set_bounding_box(circle_obstacle.bounding_box)

                current_round_obstacles.append(obstacle)
            self.robot.shared_obstacles_lock.finish_reading()

        if self.robot.shared_rectangle_obstacles is not None:
            for rectangle_obstacle in self.robot.shared_rectangle_obstacles:
                if len(self._rect_obstacles_pool):
                    obstacle = self._rect_obstacles_pool.pop(0)
                    obstacle.setEnabled(True)
                else:
                    obstacle = DynRectObstacleEntity(self._game_view.scene_entity)

                obstacle.set_position(
                    x=rectangle_obstacle.center.x,
                    y=rectangle_obstacle.center.y,
                    rotation=rectangle_obstacle.center.angle,
                )
                obstacle.set_size(length=rectangle_obstacle.length_y, width=rectangle_obstacle.length_x)
                obstacle.set_bounding_box(rectangle_obstacle.bounding_box)

                current_rect_obstacles.append(obstacle)

    # Disable remaining dyn obstacles
    while len(self._rect_obstacles_pool):
        dyn_obstacle = self._rect_obstacles_pool.pop(0)
        dyn_obstacle.setEnabled(False)
        current_rect_obstacles.append(dyn_obstacle)

    while len(self._round_obstacles_pool):
        dyn_obstacle = self._round_obstacles_pool.pop(0)
        dyn_obstacle.setEnabled(False)
        current_round_obstacles.append(dyn_obstacle)

    self._rect_obstacles_pool = current_rect_obstacles
    self._round_obstacles_pool = current_round_obstacles