-
Notifications
You must be signed in to change notification settings - Fork 0
/
simple_dynamic.py
139 lines (128 loc) · 5.47 KB
/
simple_dynamic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import numpy as np
from multiagent.core import World, Agent, Landmark
from multiagent.scenario import BaseScenario
class Scenario(BaseScenario):
def make_world(self):
world = World()
# set any world properties first
world.dim_c = 2
num_agents = 2
num_adversaries = 1
num_landmarks = 2
# add agents
world.agents = [Agent() for i in range(num_agents)]
for i, agent in enumerate(world.agents):
agent.name = 'agent %d' % i
agent.collide = True
agent.silent = True
if i < num_adversaries:
agent.adversary = True
else:
agent.adversary = False
# add landmarks
world.landmarks = [Landmark() for i in range(num_landmarks)]
for i, landmark in enumerate(world.landmarks):
landmark.name = 'landmark %d' % i
landmark.collide = False
landmark.movable = False
# make initial conditions
self.reset_world(world)
return world
def is_collision(self, agent1, agent2, world):
good_agents = self.good_agents(world)
adv_agents = self.adversaries(world)
diff_side = False
if agent1 in good_agents and agent2 in adv_agents:
diff_side = True
elif agent2 in good_agents and agent1 in adv_agents:
diff_side = True
delta_pos = agent1.state.p_pos - agent2.state.p_pos
dist = np.sqrt(np.sum(np.square(delta_pos)))
dist_min = agent1.size + agent2.size
return True if (dist < dist_min and diff_side) else False
def reset_world(self, world):
# random properties for landmarks
for i, landmark in enumerate(world.landmarks):
landmark.color = np.array([0.25, 0.25, 0.25])
# set goal landmark
# goal = np.random.choice(world.landmarks)
for i, agent in enumerate(world.agents):
# agent.goal_a = goal
agent.color = np.array([0.25, 0.25, 0.25])
if agent.adversary:
agent.color = np.array([0.75, 0.25, 0.25])
# set random initial states
for agent in world.agents:
agent.state.p_pos = np.random.uniform(-1, +1, world.dim_p)
agent.state.p_vel = np.zeros(world.dim_p)
agent.state.c = np.zeros(world.dim_c)
for i, landmark in enumerate(world.landmarks):
landmark.state.p_pos = np.random.uniform(-1, +1, world.dim_p)
landmark.state.p_vel = np.zeros(world.dim_p)
def reward(self, agent, world):
# Agents are rewarded based on minimum agent distance to each landmark
return self.adversary_reward(agent, world) if agent.adversary else self.agent_reward(agent, world)
# return all agents that are not adversaries
def good_agents(self, world):
return [agent for agent in world.agents if not agent.adversary]
# return all adversarial agents
def adversaries(self, world):
return [agent for agent in world.agents if agent.adversary]
def agent_reward(self, agent, world):
# the distance to the goal
good_agents = self.good_agents(world)
adv_agents = self.adversaries(world)
pos_rew = 0
for l in world.landmarks:
dists = [np.sqrt(np.sum(np.square(a.state.p_pos - l.state.p_pos))) for a in good_agents]
pos_rew -= min(dists)
if agent.collide:
for a in good_agents:
if self.is_collision(a, agent, world):
pos_rew -= 1
adv_rew = 0
for l in world.landmarks:
dists = [np.sqrt(np.sum(np.square(a.state.p_pos - l.state.p_pos))) for a in adv_agents]
adv_rew -= min(dists)
if agent.collide:
for a in adv_agents:
if self.is_collision(a, agent, world):
adv_rew -= 1
return pos_rew - adv_rew
def adversary_reward(self, agent, world):
good_agents = self.good_agents(world)
adv_agents = self.adversaries(world)
pos_rew = 0
for l in world.landmarks:
dists = [np.sqrt(np.sum(np.square(a.state.p_pos - l.state.p_pos))) for a in good_agents]
pos_rew -= min(dists)
if agent.collide:
for a in good_agents:
if self.is_collision(a, agent, world):
pos_rew -= 1
adv_rew = 0
for l in world.landmarks:
dists = [np.sqrt(np.sum(np.square(a.state.p_pos - l.state.p_pos))) for a in adv_agents]
adv_rew -= min(dists)
if agent.collide:
for a in adv_agents:
if self.is_collision(a, agent, world):
adv_rew -= 1
return adv_rew - pos_rew
def observation(self, agent, world):
# get positions of all entities in this agent's reference frame
entity_pos = []
for entity in world.landmarks: # world.entities:
entity_pos.append(entity.state.p_pos - agent.state.p_pos)
# entity colors
entity_color = []
for entity in world.landmarks: # world.entities:
entity_color.append(entity.color)
# communication of all other agents
comm = []
other_pos = []
for other in world.agents:
if other is agent: continue
comm.append(other.state.c)
other_pos.append(other.state.p_pos - agent.state.p_pos)
return np.concatenate([agent.state.p_vel] + [agent.state.p_pos] + entity_pos + other_pos + comm)