16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 | class Strategy(list[Action]):
"""
List of actions.
Just inherits from list for now.
"""
evaluated_strategies: list["Strategy"] = []
def __init__(self, planner: "Planner"):
super().__init__()
self.planner = planner
self.evaluated_actions: list[Action] = [] # Actions done in the evaluation
self.goap_allowed: bool = False # Allow GOAP evaluation
self.can_wait: bool = False # Use a WaitAction if no other action is possible
async def get_next_action(self) -> Action | None:
"""
Get a next action of the strategy.
"""
next_action: Action | None = None
if self.goap_allowed and self.planner.shared_properties.goap_depth > 0:
logger.info(f"Start GOAP evaluation with depth {self.planner.shared_properties.goap_depth}")
with Timer(verbose=False) as timer:
await self.start_evaluation()
logger.info(f" => Evaluation time: {timer.results[-1].elapsed():0.3f}s")
logger.info(f" => Evaluated strategies: {len(Strategy.evaluated_strategies)}")
# Sort by score, then by first action weight
sorted_strategies = sorted(
Strategy.evaluated_strategies,
key=lambda s: (-s.planner.game_context.score, -s.evaluated_actions[0].weight()),
)
if len(sorted_strategies) > 0 and len(sorted_strategies[0].evaluated_actions) > 0:
next_action = sorted_strategies[0].evaluated_actions[0]
logger.info(f" => Next action: {next_action.name if next_action else 'None'}")
self.print_evaluations()
else:
logger.info("Start standard evaluation")
with Timer(verbose=False) as timer:
sorted_actions = sorted(
[action for action in self if not action.recycled and action.weight() > 0],
key=lambda action: action.weight(),
)
logger.info(f" => Evaluation time: {timer.results[-1].elapsed():0.3f}s")
if len(sorted_actions):
next_action = sorted_actions[-1]
logger.info(f" => Next action: {next_action.name if next_action else 'None'}")
if next_action is None and self.can_wait:
return WaitAction(self.planner, self)
if next_action:
self.remove(next_action)
return next_action
def copy(self) -> "Strategy":
new_strategy = Strategy(self.planner)
new_strategy.goap_allowed = self.goap_allowed
new_strategy.can_wait = False
for action in self:
new_strategy.append(action)
new_strategy.evaluated_actions = self.evaluated_actions.copy()
return new_strategy
async def evaluate(self):
if len(self.evaluated_actions) >= self.planner.shared_properties.goap_depth:
return
if self.planner.game_context.countdown <= 0:
return
for index in range(len(self)):
mock_planner = mock.MockPlanner(self.planner)
strategy_copy = self.copy()
strategy_copy.planner = mock_planner
action = strategy_copy.pop(index)
action_planner_backup = action.planner
action_strategy_backup = action.strategy
action.planner = None
action.strategy = None
action_copy = copy.deepcopy(action)
action.planner = action_planner_backup
action.strategy = action_strategy_backup
action_copy.planner = mock_planner
action_copy.strategy = strategy_copy
if action_copy.weight() == 0:
continue
# Skip if action was already recycled and no action was done yet
if action_copy.recycled and len(strategy_copy.evaluated_actions) == 0:
continue
Strategy.evaluated_strategies.append(strategy_copy)
await action_copy.evaluate()
strategy_copy.evaluated_actions.append(action)
await strategy_copy.evaluate()
async def start_evaluation(self):
with (
patch("cogip.tools.planner.actuators.positional_motor_command", mock.async_no_op),
patch("cogip.tools.planner.actions.action.Action.logger", mock.MockDummyClass()),
patch("asyncio.sleep", mock.MockAsyncioSleep()),
):
Strategy.evaluated_strategies.clear()
await self.evaluate()
def print_evaluations(self, max: int = 10):
sorted_strategies = sorted(
Strategy.evaluated_strategies,
key=lambda s: (
-s.planner.game_context.score,
-s.evaluated_actions[0].weight(),
),
)
print("Evaluated strategies: ", len(Strategy.evaluated_strategies))
print("Best strategies:")
for n, strategy in enumerate(sorted_strategies[:max]):
print(
f"{n + 1:>2}. "
f"Score: {strategy.planner.game_context.score:> 3} - "
f"Actions: {', '.join([action.name for action in strategy.evaluated_actions])}"
)
def __str__(self) -> str:
return f"Strategy({self.__class__.__name__}, [{', '.join([action.name for action in self])}])"
def __repr__(self) -> str:
return str(self)
|