Skip to content

detector

Detector #

Main detector class.

Read data from the ToF in monitoring mode or fake data provided by Monitor in emulation Mode.

Build obstacles and send the list to the server.

Source code in cogip/tools/detector_pami/detector.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class Detector:
    """
    Main detector class.

    Read data from the ToF in monitoring mode
    or fake data provided by `Monitor` in emulation Mode.

    Build obstacles and send the list to the server.
    """

    def __init__(
        self,
        server_url: str,
        tof_bus: int | None,
        tof_address: int | None,
        min_distance: int,
        max_distance: int,
        refresh_interval: float,
    ):
        """
        Class constructor.

        Arguments:
            server_url: server URL
            tof_bus: ToF i2c bus
            tof_address: ToF i2c address
            min_distance: Minimum distance to detect an obstacle
            max_distance: Maximum distance to detect an obstacle
            refresh_interval: Interval between each update of the obstacle list (in seconds)
        """
        self.server_url = server_url
        self.tof_bus = tof_bus
        self.tof_address = tof_address
        self.properties = Properties(
            min_distance=min_distance,
            max_distance=max_distance,
            refresh_interval=refresh_interval,
        )
        self.sensor: VL53L1X | None = None
        self.sensors_data: list[int] = list()
        self.sensors_data_lock = threading.Lock()
        self._robot_pose = models.Pose()
        self.robot_pose_lock = threading.Lock()

        self.obstacles_updater_loop = ThreadLoop(
            "Obstacles updater loop",
            refresh_interval,
            self.process_sensor_data,
            logger=True,
        )

        self.sensor_reader_loop = ThreadLoop(
            "Sensor reader loop",
            refresh_interval,
            self.read_sensors,
            logger=True,
        )

        self.init_sensor()

        self.sio = socketio.Client(logger=False)
        self.sio.register_namespace(SioEvents(self))

    def connect(self):
        """
        Connect to SocketIO server.
        """
        threading.Thread(target=self.try_connect).start()

    def init_sensor(self):
        if self.tof_bus and self.tof_address:
            self.sensor = VL53L1X(i2c_bus=self.tof_bus, i2c_address=self.tof_address)
            self.sensor.open(reset=True)

    def start(self):
        """
        Start updating obstacles list.
        """
        self.obstacles_updater_loop.start()
        if self.sensor:
            self.sensor_reader_loop.start()
            self.start_sensors()

    def stop(self) -> None:
        """
        Stop updating obstacles list.
        """
        self.obstacles_updater_loop.stop()
        if self.sensor:
            self.sensor_reader_loop.stop()
            self.stop_sensors()

    def try_connect(self):
        """
        Poll to wait for the first cogip-server connection.
        Disconnections/reconnections are handle directly by the client.
        """
        while True:
            try:
                self.sio.connect(
                    self.server_url,
                    namespaces=["/detector"],
                )
                self.sio.wait()
            except socketio.exceptions.ConnectionError:
                time.sleep(2)
                continue
            break

    @property
    def robot_pose(self) -> models.Pose:
        """
        Last position of the robot.
        """
        return self._robot_pose

    @robot_pose.setter
    def robot_pose(self, new_pose: models.Pose) -> None:
        with self.robot_pose_lock:
            self._robot_pose = new_pose

    def update_refresh_interval(self) -> None:
        self.obstacles_updater_loop.interval = self.properties.refresh_interval
        self.sensor_reader_loop.interval = self.properties.refresh_interval

    def update_sensors_data(self, sensors_data: list[int]):
        """
        Receive sensors data.
        """
        with self.sensors_data_lock:
            self.sensors_data[:] = sensors_data[:]

    def filter_distances(self) -> list[int]:
        """
        Nothing to do with ToF sensors of PAMI.
        Just keep the function for API compatibility.
        """
        return self.sensors_data

    def generate_obstacles(self, robot_pose: models.Pose, distances: list[int]) -> list[models.Vertex]:
        """
        Update obstacles list from sensors data.
        """
        if len(distances) == 0:
            return []

        distance = distances[0]
        if distance < self.properties.min_distance or distance >= self.properties.max_distance:
            return []

        # Compute obstacle position
        angle = math.radians(robot_pose.O)
        x = robot_pose.x + distance * math.cos(angle)
        y = robot_pose.y + distance * math.sin(angle)

        return [models.Vertex(x=x, y=y)]

    def process_sensor_data(self):
        """
        Function executed in a thread loop to update and send dynamic obstacles.
        """
        with self.sensors_data_lock:
            filtered_distances = self.filter_distances()
        with self.robot_pose_lock:
            robot_pose = self.robot_pose.model_copy()

        obstacles = self.generate_obstacles(robot_pose, filtered_distances)
        logger.debug(f"Generated obstacles: {obstacles}")
        if self.sio.connected:
            self.sio.emit("obstacles", [o.model_dump(exclude_defaults=True) for o in obstacles], namespace="/detector")

    def start_sensors(self):
        """
        Start sensors.
        """
        if self.sensor:
            self.sensor.start_ranging(mode=VL53L1xDistanceMode.LONG)
            self.sensor.set_timing(150, 200)

    def read_sensors(self):
        """
        Function executed in a thread loop to read sensors data.
        """
        if not self.sensor:
            return

        self.update_sensors_data([self.sensor.get_distance()])

    def stop_sensors(self):
        """
        Stop sensors.
        """
        if self.sensor:
            self.sensor.stop_ranging()
            self.sensor.close()

robot_pose: models.Pose property writable #

Last position of the robot.

__init__(server_url, tof_bus, tof_address, min_distance, max_distance, refresh_interval) #

Class constructor.

Parameters:

Name Type Description Default
server_url str

server URL

required
tof_bus int | None

ToF i2c bus

required
tof_address int | None

ToF i2c address

required
min_distance int

Minimum distance to detect an obstacle

required
max_distance int

Maximum distance to detect an obstacle

required
refresh_interval float

Interval between each update of the obstacle list (in seconds)

required
Source code in cogip/tools/detector_pami/detector.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def __init__(
    self,
    server_url: str,
    tof_bus: int | None,
    tof_address: int | None,
    min_distance: int,
    max_distance: int,
    refresh_interval: float,
):
    """
    Class constructor.

    Arguments:
        server_url: server URL
        tof_bus: ToF i2c bus
        tof_address: ToF i2c address
        min_distance: Minimum distance to detect an obstacle
        max_distance: Maximum distance to detect an obstacle
        refresh_interval: Interval between each update of the obstacle list (in seconds)
    """
    self.server_url = server_url
    self.tof_bus = tof_bus
    self.tof_address = tof_address
    self.properties = Properties(
        min_distance=min_distance,
        max_distance=max_distance,
        refresh_interval=refresh_interval,
    )
    self.sensor: VL53L1X | None = None
    self.sensors_data: list[int] = list()
    self.sensors_data_lock = threading.Lock()
    self._robot_pose = models.Pose()
    self.robot_pose_lock = threading.Lock()

    self.obstacles_updater_loop = ThreadLoop(
        "Obstacles updater loop",
        refresh_interval,
        self.process_sensor_data,
        logger=True,
    )

    self.sensor_reader_loop = ThreadLoop(
        "Sensor reader loop",
        refresh_interval,
        self.read_sensors,
        logger=True,
    )

    self.init_sensor()

    self.sio = socketio.Client(logger=False)
    self.sio.register_namespace(SioEvents(self))

connect() #

Connect to SocketIO server.

Source code in cogip/tools/detector_pami/detector.py
78
79
80
81
82
def connect(self):
    """
    Connect to SocketIO server.
    """
    threading.Thread(target=self.try_connect).start()

filter_distances() #

Nothing to do with ToF sensors of PAMI. Just keep the function for API compatibility.

Source code in cogip/tools/detector_pami/detector.py
147
148
149
150
151
152
def filter_distances(self) -> list[int]:
    """
    Nothing to do with ToF sensors of PAMI.
    Just keep the function for API compatibility.
    """
    return self.sensors_data

generate_obstacles(robot_pose, distances) #

Update obstacles list from sensors data.

Source code in cogip/tools/detector_pami/detector.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def generate_obstacles(self, robot_pose: models.Pose, distances: list[int]) -> list[models.Vertex]:
    """
    Update obstacles list from sensors data.
    """
    if len(distances) == 0:
        return []

    distance = distances[0]
    if distance < self.properties.min_distance or distance >= self.properties.max_distance:
        return []

    # Compute obstacle position
    angle = math.radians(robot_pose.O)
    x = robot_pose.x + distance * math.cos(angle)
    y = robot_pose.y + distance * math.sin(angle)

    return [models.Vertex(x=x, y=y)]

process_sensor_data() #

Function executed in a thread loop to update and send dynamic obstacles.

Source code in cogip/tools/detector_pami/detector.py
172
173
174
175
176
177
178
179
180
181
182
183
184
def process_sensor_data(self):
    """
    Function executed in a thread loop to update and send dynamic obstacles.
    """
    with self.sensors_data_lock:
        filtered_distances = self.filter_distances()
    with self.robot_pose_lock:
        robot_pose = self.robot_pose.model_copy()

    obstacles = self.generate_obstacles(robot_pose, filtered_distances)
    logger.debug(f"Generated obstacles: {obstacles}")
    if self.sio.connected:
        self.sio.emit("obstacles", [o.model_dump(exclude_defaults=True) for o in obstacles], namespace="/detector")

read_sensors() #

Function executed in a thread loop to read sensors data.

Source code in cogip/tools/detector_pami/detector.py
194
195
196
197
198
199
200
201
def read_sensors(self):
    """
    Function executed in a thread loop to read sensors data.
    """
    if not self.sensor:
        return

    self.update_sensors_data([self.sensor.get_distance()])

start() #

Start updating obstacles list.

Source code in cogip/tools/detector_pami/detector.py
89
90
91
92
93
94
95
96
def start(self):
    """
    Start updating obstacles list.
    """
    self.obstacles_updater_loop.start()
    if self.sensor:
        self.sensor_reader_loop.start()
        self.start_sensors()

start_sensors() #

Start sensors.

Source code in cogip/tools/detector_pami/detector.py
186
187
188
189
190
191
192
def start_sensors(self):
    """
    Start sensors.
    """
    if self.sensor:
        self.sensor.start_ranging(mode=VL53L1xDistanceMode.LONG)
        self.sensor.set_timing(150, 200)

stop() #

Stop updating obstacles list.

Source code in cogip/tools/detector_pami/detector.py
 98
 99
100
101
102
103
104
105
def stop(self) -> None:
    """
    Stop updating obstacles list.
    """
    self.obstacles_updater_loop.stop()
    if self.sensor:
        self.sensor_reader_loop.stop()
        self.stop_sensors()

stop_sensors() #

Stop sensors.

Source code in cogip/tools/detector_pami/detector.py
203
204
205
206
207
208
209
def stop_sensors(self):
    """
    Stop sensors.
    """
    if self.sensor:
        self.sensor.stop_ranging()
        self.sensor.close()

try_connect() #

Poll to wait for the first cogip-server connection. Disconnections/reconnections are handle directly by the client.

Source code in cogip/tools/detector_pami/detector.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def try_connect(self):
    """
    Poll to wait for the first cogip-server connection.
    Disconnections/reconnections are handle directly by the client.
    """
    while True:
        try:
            self.sio.connect(
                self.server_url,
                namespaces=["/detector"],
            )
            self.sio.wait()
        except socketio.exceptions.ConnectionError:
            time.sleep(2)
            continue
        break

update_sensors_data(sensors_data) #

Receive sensors data.

Source code in cogip/tools/detector_pami/detector.py
140
141
142
143
144
145
def update_sensors_data(self, sensors_data: list[int]):
    """
    Receive sensors data.
    """
    with self.sensors_data_lock:
        self.sensors_data[:] = sensors_data[:]