Skip to content

server

Server #

Source code in cogip/tools/server_beacon/server.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class Server:
    original_uvicorn_exit_handler = UvicornServer.handle_exit
    exiting: bool = False
    robots: dict[int, Robot] = {}
    robot_tasks: set[asyncio.Task] = set()

    def __init__(self):
        """
        Class constructor.

        Create SocketIO server and robot servers connections
        """
        UvicornServer.handle_exit = Server.handle_exit

        self.sio = socketio.AsyncServer(
            always_connect=False,
            async_mode="asgi",
            cors_allowed_origins="*",
            logger=False,
            engineio_logger=False,
        )
        self.app = socketio.ASGIApp(self.sio)
        self.sio.register_namespace(namespaces.DashboardNamespace(self))

        self.camp = Camp.Colors.yellow
        self.table = TableEnum.Game

        for i in range(1, int(os.environ["SERVER_BEACON_MAX_ROBOTS"]) + 1):
            robot = Robot(self, i)
            task = asyncio.create_task(robot.run())
            Server.robots[i] = robot
            Server.robot_tasks.add(task)
            task.add_done_callback(Server.robot_tasks.discard)

        @self.sio.event
        def connect(sid, environ, auth):
            logger.warning(f"A client tried to connect to namespace / (sid={sid})")
            raise ConnectionRefusedError("Connection refused to namespace /")

        @self.sio.on("*")
        def catch_all(event, sid, data):
            logger.warning(f"A client tried to send data to namespace / (sid={sid}, event={event})")

    @staticmethod
    def handle_exit(*args, **kwargs):
        """Overload function for Uvicorn handle_exit"""
        Server.exiting = True
        for _, robot in Server.robots.items():
            robot.exiting = True
            robot.sio.reconnection_attempts = -1
        for task in Server.robot_tasks:
            task.cancel()

        Server.original_uvicorn_exit_handler(*args, **kwargs)

    async def choose_camp(self):
        """
        Choose camp command from the menu.
        Send camp wizard message.
        """
        await self.sio.emit(
            "wizard",
            {
                "name": "Choose Camp",
                "type": "camp",
                "value": self.camp.name,
            },
            namespace="/dashboard",
        )

    async def choose_table(self):
        """
        Choose table command from the menu.
        Send table wizard message.
        """
        await self.sio.emit(
            "wizard",
            {
                "name": "Choose Table",
                "type": "choice_str",
                "choices": [e.name for e in TableEnum],
                "value": self.table.name,
            },
            namespace="/dashboard",
        )

    async def reset_robots(self):
        for robot_id, robot in self.robots.items():
            if robot.sio.connected:
                position: StartPosition | None = None
                match robot_id:
                    case 1:
                        position = StartPosition.Bottom
                    case 2:
                        position = StartPosition.PAMI2
                    case 3:
                        position = StartPosition.PAMI3
                    case 4:
                        position = StartPosition.PAMI4
                if position:
                    await robot.sio.emit(
                        "wizard",
                        {
                            "name": "Choose Start Position",
                            "value": position.name,
                        },
                        namespace="/beacon",
                    )

__init__() #

Class constructor.

Create SocketIO server and robot servers connections

Source code in cogip/tools/server_beacon/server.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(self):
    """
    Class constructor.

    Create SocketIO server and robot servers connections
    """
    UvicornServer.handle_exit = Server.handle_exit

    self.sio = socketio.AsyncServer(
        always_connect=False,
        async_mode="asgi",
        cors_allowed_origins="*",
        logger=False,
        engineio_logger=False,
    )
    self.app = socketio.ASGIApp(self.sio)
    self.sio.register_namespace(namespaces.DashboardNamespace(self))

    self.camp = Camp.Colors.yellow
    self.table = TableEnum.Game

    for i in range(1, int(os.environ["SERVER_BEACON_MAX_ROBOTS"]) + 1):
        robot = Robot(self, i)
        task = asyncio.create_task(robot.run())
        Server.robots[i] = robot
        Server.robot_tasks.add(task)
        task.add_done_callback(Server.robot_tasks.discard)

    @self.sio.event
    def connect(sid, environ, auth):
        logger.warning(f"A client tried to connect to namespace / (sid={sid})")
        raise ConnectionRefusedError("Connection refused to namespace /")

    @self.sio.on("*")
    def catch_all(event, sid, data):
        logger.warning(f"A client tried to send data to namespace / (sid={sid}, event={event})")

choose_camp() async #

Choose camp command from the menu. Send camp wizard message.

Source code in cogip/tools/server_beacon/server.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
async def choose_camp(self):
    """
    Choose camp command from the menu.
    Send camp wizard message.
    """
    await self.sio.emit(
        "wizard",
        {
            "name": "Choose Camp",
            "type": "camp",
            "value": self.camp.name,
        },
        namespace="/dashboard",
    )

choose_table() async #

Choose table command from the menu. Send table wizard message.

Source code in cogip/tools/server_beacon/server.py
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
async def choose_table(self):
    """
    Choose table command from the menu.
    Send table wizard message.
    """
    await self.sio.emit(
        "wizard",
        {
            "name": "Choose Table",
            "type": "choice_str",
            "choices": [e.name for e in TableEnum],
            "value": self.table.name,
        },
        namespace="/dashboard",
    )

handle_exit(*args, **kwargs) staticmethod #

Overload function for Uvicorn handle_exit

Source code in cogip/tools/server_beacon/server.py
58
59
60
61
62
63
64
65
66
67
68
@staticmethod
def handle_exit(*args, **kwargs):
    """Overload function for Uvicorn handle_exit"""
    Server.exiting = True
    for _, robot in Server.robots.items():
        robot.exiting = True
        robot.sio.reconnection_attempts = -1
    for task in Server.robot_tasks:
        task.cancel()

    Server.original_uvicorn_exit_handler(*args, **kwargs)