Skip to content

pbcom

PBCom #

Source code in cogip/tools/copilot/pbcom.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class PBCom:
    can_is_fd: bool = True  # CAN frames are FD frames

    def __init__(
        self,
        can_channel: str,
        can_bitrate: int,
        canfd_data_bitrate: int,
        message_handlers: dict[int, Callable],
    ):
        self.can_bus = can.Bus(
            interface="socketcan",
            channel=can_channel,
            bitrate=can_bitrate,
            data_bitrate=canfd_data_bitrate,
            fd=self.can_is_fd,
        )
        self.message_handlers = message_handlers

        # Create asyncio queues
        self.messages_received = asyncio.Queue()  # Queue for messages received
        self.messages_to_send = asyncio.Queue()  # Queue for messages waiting to be sent

    def register_message_handler(self, uuid: int, handler: Callable) -> None:
        """
        Register a message handler at runtime.

        Args:
            uuid: The message UUID to handle.
            handler: The async callable to handle the message.

        Raises:
            ValueError: If a handler for this UUID is already registered.
        """
        if uuid in self.message_handlers:
            raise ValueError(f"Handler for UUID 0x{uuid:04x} is already registered")
        self.message_handlers[uuid] = handler

    async def run(self):
        """
        Start PBCom.
        """
        self.loop = asyncio.get_running_loop()
        self.can_reader = can.AsyncBufferedReader()
        self.notifier = can.Notifier(bus=self.can_bus, listeners=[self.can_reader], timeout=None, loop=self.loop)

        try:
            await asyncio.gather(
                self.payload_decoder(),
                self.can_receiver(),
                self.can_sender(),
            )
        except asyncio.CancelledError:
            self.can_bus.shutdown()

    async def payload_decoder(self):
        """
        Async worker decoding messages received from the robot.
        """
        uuid: int
        encoded_payload: bytes

        try:
            while True:
                uuid, encoded_payload = await self.messages_received.get()
                request_handler = self.message_handlers.get(uuid)
                if not request_handler:
                    logger.warning(f"No handler found for message uuid '0x{uuid:04x}'")
                else:
                    if not encoded_payload:
                        await request_handler()
                    else:
                        await request_handler(encoded_payload)

                self.messages_received.task_done()
        except asyncio.CancelledError:
            raise

    async def send_can_message(self, *args) -> None:
        await self.messages_to_send.put(args)

    async def can_receiver(self):
        """
        Async worker reading messages from the robot on CAN bus.

        Messages is base64-encoded.
        After decoding, first byte is the message type, following bytes are
        the Protobuf encoded message (if any).
        """
        try:
            while True:
                # Read next message
                can_message = await self.can_reader.get_message()

                # Get message uuid on first bytes
                uuid = can_message.arbitration_id

                if can_message.dlc == 0:
                    await self.messages_received.put((uuid, None))
                    continue

                # Base64 decoding
                try:
                    pb_message = base64.decodebytes(can_message.data)
                except binascii.Error:
                    logger.error("Failed to decode base64 message.")
                    continue

                # Send Protobuf message for decoding
                await self.messages_received.put((uuid, pb_message))
        except asyncio.CancelledError:
            raise

    async def can_sender(self):
        """
        Async worker encoding and sending Protobuf messages to the robot on CAN bus.

        See `can_receiver` for message encoding.
        """
        try:
            while True:
                uuid, pb_message = await self.messages_to_send.get()
                logger.info(f"Send 0x{uuid:04x}:\n{pb_message}")
                if pb_message:
                    response_serialized = await self.loop.run_in_executor(None, pb_message.SerializeToString)
                    response_base64 = await self.loop.run_in_executor(None, base64.encodebytes, response_serialized)
                else:
                    response_base64 = None
                try:
                    self.can_bus.send(can.Message(arbitration_id=uuid, data=response_base64, is_fd=self.can_is_fd))
                except Exception as e:
                    logger.error(e)
                self.messages_to_send.task_done()
        except asyncio.CancelledError:
            raise

can_receiver() async #

Async worker reading messages from the robot on CAN bus.

Messages is base64-encoded. After decoding, first byte is the message type, following bytes are the Protobuf encoded message (if any).

Source code in cogip/tools/copilot/pbcom.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
async def can_receiver(self):
    """
    Async worker reading messages from the robot on CAN bus.

    Messages is base64-encoded.
    After decoding, first byte is the message type, following bytes are
    the Protobuf encoded message (if any).
    """
    try:
        while True:
            # Read next message
            can_message = await self.can_reader.get_message()

            # Get message uuid on first bytes
            uuid = can_message.arbitration_id

            if can_message.dlc == 0:
                await self.messages_received.put((uuid, None))
                continue

            # Base64 decoding
            try:
                pb_message = base64.decodebytes(can_message.data)
            except binascii.Error:
                logger.error("Failed to decode base64 message.")
                continue

            # Send Protobuf message for decoding
            await self.messages_received.put((uuid, pb_message))
    except asyncio.CancelledError:
        raise

can_sender() async #

Async worker encoding and sending Protobuf messages to the robot on CAN bus.

See can_receiver for message encoding.

Source code in cogip/tools/copilot/pbcom.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
async def can_sender(self):
    """
    Async worker encoding and sending Protobuf messages to the robot on CAN bus.

    See `can_receiver` for message encoding.
    """
    try:
        while True:
            uuid, pb_message = await self.messages_to_send.get()
            logger.info(f"Send 0x{uuid:04x}:\n{pb_message}")
            if pb_message:
                response_serialized = await self.loop.run_in_executor(None, pb_message.SerializeToString)
                response_base64 = await self.loop.run_in_executor(None, base64.encodebytes, response_serialized)
            else:
                response_base64 = None
            try:
                self.can_bus.send(can.Message(arbitration_id=uuid, data=response_base64, is_fd=self.can_is_fd))
            except Exception as e:
                logger.error(e)
            self.messages_to_send.task_done()
    except asyncio.CancelledError:
        raise

payload_decoder() async #

Async worker decoding messages received from the robot.

Source code in cogip/tools/copilot/pbcom.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def payload_decoder(self):
    """
    Async worker decoding messages received from the robot.
    """
    uuid: int
    encoded_payload: bytes

    try:
        while True:
            uuid, encoded_payload = await self.messages_received.get()
            request_handler = self.message_handlers.get(uuid)
            if not request_handler:
                logger.warning(f"No handler found for message uuid '0x{uuid:04x}'")
            else:
                if not encoded_payload:
                    await request_handler()
                else:
                    await request_handler(encoded_payload)

            self.messages_received.task_done()
    except asyncio.CancelledError:
        raise

register_message_handler(uuid, handler) #

Register a message handler at runtime.

Parameters:

Name Type Description Default
uuid int

The message UUID to handle.

required
handler Callable

The async callable to handle the message.

required

Raises:

Type Description
ValueError

If a handler for this UUID is already registered.

Source code in cogip/tools/copilot/pbcom.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def register_message_handler(self, uuid: int, handler: Callable) -> None:
    """
    Register a message handler at runtime.

    Args:
        uuid: The message UUID to handle.
        handler: The async callable to handle the message.

    Raises:
        ValueError: If a handler for this UUID is already registered.
    """
    if uuid in self.message_handlers:
        raise ValueError(f"Handler for UUID 0x{uuid:04x} is already registered")
    self.message_handlers[uuid] = handler

run() async #

Start PBCom.

Source code in cogip/tools/copilot/pbcom.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
async def run(self):
    """
    Start PBCom.
    """
    self.loop = asyncio.get_running_loop()
    self.can_reader = can.AsyncBufferedReader()
    self.notifier = can.Notifier(bus=self.can_bus, listeners=[self.can_reader], timeout=None, loop=self.loop)

    try:
        await asyncio.gather(
            self.payload_decoder(),
            self.can_receiver(),
            self.can_sender(),
        )
    except asyncio.CancelledError:
        self.can_bus.shutdown()