forked from Lux-AI-Challenge/Lux-Design-S3
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
d3bb931
commit 08db03f
Showing
10 changed files
with
116 additions
and
321 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,96 +1,83 @@ | ||
from lux.kit import obs_to_game_state, GameState | ||
from lux.config import EnvConfig | ||
from lux.utils import direction_to, my_turn_to_place_factory | ||
import numpy as np | ||
# from lux.kit import obs_to_game_state, GameState | ||
# from lux.config import EnvConfig | ||
from lux.utils import direction_to | ||
import sys | ||
import numpy as np | ||
class Agent(): | ||
def __init__(self, player: str, env_cfg: EnvConfig) -> None: | ||
def __init__(self, player: str, env_cfg) -> None: | ||
self.player = player | ||
self.opp_player = "player_1" if self.player == "player_0" else "player_0" | ||
self.team_id = 0 if self.player == "player_0" else 1 | ||
self.opp_team_id = 1 if self.team_id == 0 else 0 | ||
np.random.seed(0) | ||
self.env_cfg: EnvConfig = env_cfg | ||
|
||
def early_setup(self, step: int, obs, remainingOverageTime: int = 60): | ||
if step == 0: | ||
# bid 0 to not waste resources bidding and declare as the default faction | ||
return dict(faction="AlphaStrike", bid=0) | ||
else: | ||
game_state = obs_to_game_state(step, self.env_cfg, obs) | ||
# factory placement period | ||
|
||
# how much water and metal you have in your starting pool to give to new factories | ||
water_left = game_state.teams[self.player].water | ||
metal_left = game_state.teams[self.player].metal | ||
|
||
# how many factories you have left to place | ||
factories_to_place = game_state.teams[self.player].factories_to_place | ||
# whether it is your turn to place a factory | ||
my_turn_to_place = my_turn_to_place_factory(game_state.teams[self.player].place_first, step) | ||
if factories_to_place > 0 and my_turn_to_place: | ||
# we will spawn our factory in a random location with 150 metal and water if it is our turn to place | ||
potential_spawns = np.array(list(zip(*np.where(obs["board"]["valid_spawns_mask"] == 1)))) | ||
spawn_loc = potential_spawns[np.random.randint(0, len(potential_spawns))] | ||
return dict(spawn=spawn_loc, metal=150, water=150) | ||
return dict() | ||
self.env_cfg = env_cfg | ||
|
||
self.relic_node_positions = [] | ||
self.discovered_relic_nodes_ids = set() | ||
|
||
def act(self, step: int, obs, remainingOverageTime: int = 60): | ||
actions = dict() | ||
"""implement this function to decide what actions to send to each available unit. | ||
step is the current timestep number of the game starting from 0 going up to max_steps_in_match * match_count_per_episode - 1. | ||
""" | ||
optionally do forward simulation to simulate positions of units, lichen, etc. in the future | ||
from lux.forward_sim import forward_sim | ||
forward_obs = forward_sim(obs, self.env_cfg, n=2) | ||
forward_game_states = [obs_to_game_state(step + i, self.env_cfg, f_obs) for i, f_obs in enumerate(forward_obs)] | ||
""" | ||
|
||
game_state = obs_to_game_state(step, self.env_cfg, obs) | ||
factories = game_state.factories[self.player] | ||
game_state.teams[self.player].place_first | ||
factory_tiles, factory_units = [], [] | ||
for unit_id, factory in factories.items(): | ||
if factory.power >= self.env_cfg.ROBOTS["HEAVY"].POWER_COST and \ | ||
factory.cargo.metal >= self.env_cfg.ROBOTS["HEAVY"].METAL_COST: | ||
actions[unit_id] = factory.build_heavy() | ||
if factory.water_cost(game_state) <= factory.cargo.water / 5 - 200: | ||
actions[unit_id] = factory.water() | ||
factory_tiles += [factory.pos] | ||
factory_units += [factory] | ||
factory_tiles = np.array(factory_tiles) | ||
|
||
units = game_state.units[self.player] | ||
ice_map = game_state.board.ice | ||
ice_tile_locations = np.argwhere(ice_map == 1) | ||
for unit_id, unit in units.items(): | ||
|
||
# track the closest factory | ||
closest_factory = None | ||
adjacent_to_factory = False | ||
if len(factory_tiles) > 0: | ||
factory_distances = np.mean((factory_tiles - unit.pos) ** 2, 1) | ||
closest_factory_tile = factory_tiles[np.argmin(factory_distances)] | ||
closest_factory = factory_units[np.argmin(factory_distances)] | ||
adjacent_to_factory = np.mean((closest_factory_tile - unit.pos) ** 2) == 0 | ||
# game_state = obs_to_game_state(step, self.env_cfg, obs) | ||
unit_mask = np.array(obs["units_mask"][self.team_id]) # shape (max_units, ) | ||
unit_positions = np.array(obs["units"]["position"][self.team_id]) # shape (max_units, 2) | ||
unit_energys = np.array(obs["units"]["energy"][self.team_id]) # shape (max_units, 1) | ||
observed_relic_node_positions = np.array(obs["relic_nodes"]) # shape (max_relic_nodes, 2) | ||
observed_relic_nodes_mask = np.array(obs["relic_nodes_mask"]) # shape (max_relic_nodes, ) | ||
|
||
# ids of units you can control at this timestep | ||
available_unit_ids = np.where(unit_mask)[0] | ||
# visible relic nodes | ||
visible_relic_node_ids = set(np.where(observed_relic_nodes_mask)[0]) | ||
|
||
actions = np.zeros((self.env_cfg["max_units"], 3), dtype=int) | ||
|
||
# previous ice mining code | ||
if unit.cargo.ice < 40: | ||
ice_tile_distances = np.mean((ice_tile_locations - unit.pos) ** 2, 1) | ||
closest_ice_tile = ice_tile_locations[np.argmin(ice_tile_distances)] | ||
if np.all(closest_ice_tile == unit.pos): | ||
if unit.power >= unit.dig_cost(game_state) + unit.action_queue_cost(game_state): | ||
actions[unit_id] = [unit.dig(repeat=0, n=1)] | ||
else: | ||
direction = direction_to(unit.pos, closest_ice_tile) | ||
move_cost = unit.move_cost(game_state, direction) | ||
if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state): | ||
actions[unit_id] = [unit.move(direction, repeat=0, n=1)] | ||
# else if we have enough ice, we go back to the factory and dump it. | ||
elif unit.cargo.ice >= 40: | ||
direction = direction_to(unit.pos, closest_factory_tile) | ||
if adjacent_to_factory: | ||
if unit.power >= unit.action_queue_cost(game_state): | ||
actions[unit_id] = [unit.transfer(direction, 0, unit.cargo.ice, repeat=0)] | ||
else: | ||
move_cost = unit.move_cost(game_state, direction) | ||
if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state): | ||
actions[unit_id] = [unit.move(direction, repeat=0, n=1)] | ||
# basic strategy here is simply to have some units randomly explore and some units collecting as much energy as possible | ||
# and once a relic node is found, we send all units to move randomly around the first relic node to gain points | ||
# and information about where relic nodes are found are saved for the next match | ||
|
||
|
||
# save any new relic nodes that we discover for the rest of the game. | ||
for id in visible_relic_node_ids: | ||
if id not in self.discovered_relic_nodes_ids: | ||
self.discovered_relic_nodes_ids.add(id) | ||
self.relic_node_positions.append(observed_relic_node_positions[id]) | ||
|
||
|
||
self.unit_explore_locations = dict() | ||
|
||
# unit ids range from 0 to max_units - 1 | ||
for unit_id in available_unit_ids: | ||
unit_pos = unit_positions[unit_id] | ||
unit_energy = unit_energys[unit_id] | ||
if len(self.relic_node_positions) > 0: | ||
nearest_relic_node_position = self.relic_node_positions[0] | ||
manhattan_distance = abs(unit_pos[0] - nearest_relic_node_position[0]) + abs(unit_pos[1] - nearest_relic_node_position[1]) | ||
if manhattan_distance <= 4: | ||
random_direction = np.random.randint(0, 4) | ||
actions[unit_id] = [random_direction, 0, 0] | ||
else: | ||
actions[unit_id] = [direction_to(unit_pos, nearest_relic_node_position), 0, 0] | ||
else: | ||
if unit_id % 2 == 0: | ||
# randomly explore by picking a random location on the map and moving there for about 20 steps | ||
print(f"step: {step}", file=sys.stderr) | ||
if step % 20 == 0 or unit_id not in self.unit_explore_locations: | ||
rand_loc = (np.random.randint(0, self.env_cfg["map_width"]), np.random.randint(0, self.env_cfg["map_height"])) | ||
self.unit_explore_locations[unit_id] = rand_loc | ||
actions[unit_id] = [direction_to(unit_pos, self.unit_explore_locations[unit_id]), 0, 0] | ||
else: | ||
# follow energy field to its peak | ||
for delta in np.array([[0, 0], [0, 1], [1, 0], [0, -1], [-1, 0]]): | ||
next_pos = unit_pos + delta # (x, y) format | ||
if next_pos[0] < 0 or next_pos[0] >= self.env_cfg["map_width"] or next_pos[1] < 0 or next_pos[1] >= self.env_cfg["map_height"]: | ||
continue | ||
next_pos_energy = obs["map_features"]["energy"][next_pos[0], next_pos[1]] | ||
cur_pos_energy = obs["map_features"]["energy"][unit_pos[0], unit_pos[1]] | ||
if next_pos_energy > cur_pos_energy: | ||
actions[unit_id] = [direction_to(unit_pos, next_pos), 0, 0] | ||
print(f"unit {unit_id} at {unit_pos} moving to {next_pos}, {next_pos_energy}, {cur_pos_energy}", file=sys.stderr) | ||
break | ||
return actions |
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.