Skip to content

pbcom

PBCom #

Source code in cogip/tools/copilot_pami/pbcom.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class PBCom:
    _loop: asyncio.AbstractEventLoop = None  # Event loop to use for all async objects
    _serial_port: AioSerial = None  # Async serial port
    _serial_messages_received: asyncio.Queue = None  # Queue for messages received from serial port
    _serial_messages_to_send: asyncio.Queue = None  # Queue for messages waiting to be sent on serial port

    def __init__(self, serial_port: Path, serial_baud: int, message_handlers: dict[int, Callable]):
        self._serial_port = AioSerial()
        self._serial_port.port = str(serial_port)
        self._serial_port.baudrate = serial_baud
        self._message_handlers = message_handlers

        # Create asyncio queues
        self._serial_messages_received = asyncio.Queue()
        self._serial_messages_received = asyncio.Queue()
        self._serial_messages_to_send = asyncio.Queue()

        self._serial_port.open()

    async def run(self):
        """
        Start PBCom.
        """
        self._loop = asyncio.get_running_loop()

        try:
            await asyncio.gather(
                self.serial_decoder(),
                self.serial_receiver(),
                self.serial_sender(),
            )
        except asyncio.CancelledError:
            pass

    async def serial_decoder(self):
        """
        Async worker decoding messages received from the robot.
        """
        uuid: int
        encoded_message: bytes

        try:
            while True:
                uuid, encoded_message = await self._serial_messages_received.get()
                request_handler = self._message_handlers.get(uuid)
                if not request_handler:
                    print(f"No handler found for message uuid '{uuid}'")
                else:
                    if not encoded_message:
                        await request_handler()
                    else:
                        await request_handler(encoded_message)

                self._serial_messages_received.task_done()
        except asyncio.CancelledError:
            raise

    async def send_serial_message(self, *args) -> None:
        await self._serial_messages_to_send.put(args)

    async def serial_receiver(self):
        """
        Async worker reading messages from the robot on serial ports.

        Messages is base64-encoded and separated by `\\n`.
        After decoding, first byte is the message type, following bytes are
        the Protobuf encoded message (if any).
        """
        try:
            while True:
                # Read next message
                message = await self._serial_port.readline_async()
                message = message.rstrip(b"\n")

                # Get message uuid on first bytes
                uuid = int.from_bytes(message[:4], "little")

                if len(message) == 4:
                    await self._serial_messages_received.put((uuid, None))
                    continue

                # Base64 decoding
                try:
                    pb_message = base64.decodebytes(message[4:])
                except binascii.Error:
                    print("Failed to decode base64 message.")
                    continue

                # Send Protobuf message for decoding
                await self._serial_messages_received.put((uuid, pb_message))
        except asyncio.CancelledError:
            raise

    async def serial_sender(self):
        """
        Async worker encoding and sending Protobuf messages to the robot on serial ports.

        See `serial_receiver` for message encoding.
        """
        try:
            while True:
                uuid, pb_message = await self._serial_messages_to_send.get()
                logger.info(f"Send PB message: {uuid} {pb_message}")
                await self._serial_port.write_async(uuid.to_bytes(4, "little"))
                if pb_message:
                    response_serialized = await self._loop.run_in_executor(None, pb_message.SerializeToString)
                    response_base64 = await self._loop.run_in_executor(None, base64.encodebytes, response_serialized)
                    await self._serial_port.write_async(response_base64)
                await self._serial_port.write_async(b"\0")
                self._serial_messages_to_send.task_done()
        except asyncio.CancelledError:
            raise

run() async #

Start PBCom.

Source code in cogip/tools/copilot_pami/pbcom.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
async def run(self):
    """
    Start PBCom.
    """
    self._loop = asyncio.get_running_loop()

    try:
        await asyncio.gather(
            self.serial_decoder(),
            self.serial_receiver(),
            self.serial_sender(),
        )
    except asyncio.CancelledError:
        pass

serial_decoder() async #

Async worker decoding messages received from the robot.

Source code in cogip/tools/copilot_pami/pbcom.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def serial_decoder(self):
    """
    Async worker decoding messages received from the robot.
    """
    uuid: int
    encoded_message: bytes

    try:
        while True:
            uuid, encoded_message = await self._serial_messages_received.get()
            request_handler = self._message_handlers.get(uuid)
            if not request_handler:
                print(f"No handler found for message uuid '{uuid}'")
            else:
                if not encoded_message:
                    await request_handler()
                else:
                    await request_handler(encoded_message)

            self._serial_messages_received.task_done()
    except asyncio.CancelledError:
        raise

serial_receiver() async #

Async worker reading messages from the robot on serial ports.

Messages is base64-encoded and separated by \n. After decoding, first byte is the message type, following bytes are the Protobuf encoded message (if any).

Source code in cogip/tools/copilot_pami/pbcom.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
async def serial_receiver(self):
    """
    Async worker reading messages from the robot on serial ports.

    Messages is base64-encoded and separated by `\\n`.
    After decoding, first byte is the message type, following bytes are
    the Protobuf encoded message (if any).
    """
    try:
        while True:
            # Read next message
            message = await self._serial_port.readline_async()
            message = message.rstrip(b"\n")

            # Get message uuid on first bytes
            uuid = int.from_bytes(message[:4], "little")

            if len(message) == 4:
                await self._serial_messages_received.put((uuid, None))
                continue

            # Base64 decoding
            try:
                pb_message = base64.decodebytes(message[4:])
            except binascii.Error:
                print("Failed to decode base64 message.")
                continue

            # Send Protobuf message for decoding
            await self._serial_messages_received.put((uuid, pb_message))
    except asyncio.CancelledError:
        raise

serial_sender() async #

Async worker encoding and sending Protobuf messages to the robot on serial ports.

See serial_receiver for message encoding.

Source code in cogip/tools/copilot_pami/pbcom.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def serial_sender(self):
    """
    Async worker encoding and sending Protobuf messages to the robot on serial ports.

    See `serial_receiver` for message encoding.
    """
    try:
        while True:
            uuid, pb_message = await self._serial_messages_to_send.get()
            logger.info(f"Send PB message: {uuid} {pb_message}")
            await self._serial_port.write_async(uuid.to_bytes(4, "little"))
            if pb_message:
                response_serialized = await self._loop.run_in_executor(None, pb_message.SerializeToString)
                response_base64 = await self._loop.run_in_executor(None, base64.encodebytes, response_serialized)
                await self._serial_port.write_async(response_base64)
            await self._serial_port.write_async(b"\0")
            self._serial_messages_to_send.task_done()
    except asyncio.CancelledError:
        raise