Skip to content

protocol

Packet #

Source code in cogip/scservo_async_sdk/protocol.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Packet:
    def __init__(self, id: int, instruction: int, params: list[int] | None = None):
        self.id = id
        self.instruction = instruction
        self.params = params or []

    def to_bytes(self) -> bytes:
        length = len(self.params) + 2  # instruction + checksum
        cks = checksum(self.id, length, self.instruction, self.params)

        data = [0xFF, 0xFF, self.id, length, self.instruction]
        data.extend(self.params)
        data.append(cks)
        return bytes(data)

    @staticmethod
    def parse(data: bytes) -> tuple["Packet", int]:
        """
        Parses a packet from bytes.
        Returns (Packet, consumed_bytes) if successful, or raises ValueError.
        """
        if len(data) < 6:
            raise ValueError("Data too short")

        # Check header
        if data[0] != 0xFF or data[1] != 0xFF:
            raise ValueError("Invalid Header")

        id = data[2]
        length = data[3]

        if len(data) < 4 + length:
            raise ValueError("Incomplete packet")

        instruction = data[4]
        params_len = length - 2
        params = list(data[5 : 5 + params_len])
        cks = data[4 + length - 1]

        calc_cks = checksum(id, length, instruction, params)
        if cks != calc_cks:
            raise ValueError("Invalid Checksum")

        # consumed bytes = 4 + length
        return Packet(id, instruction, params), 4 + length

parse(data) staticmethod #

Parses a packet from bytes. Returns (Packet, consumed_bytes) if successful, or raises ValueError.

Source code in cogip/scservo_async_sdk/protocol.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@staticmethod
def parse(data: bytes) -> tuple["Packet", int]:
    """
    Parses a packet from bytes.
    Returns (Packet, consumed_bytes) if successful, or raises ValueError.
    """
    if len(data) < 6:
        raise ValueError("Data too short")

    # Check header
    if data[0] != 0xFF or data[1] != 0xFF:
        raise ValueError("Invalid Header")

    id = data[2]
    length = data[3]

    if len(data) < 4 + length:
        raise ValueError("Incomplete packet")

    instruction = data[4]
    params_len = length - 2
    params = list(data[5 : 5 + params_len])
    cks = data[4 + length - 1]

    calc_cks = checksum(id, length, instruction, params)
    if cks != calc_cks:
        raise ValueError("Invalid Checksum")

    # consumed bytes = 4 + length
    return Packet(id, instruction, params), 4 + length