forked from EurekaLabsAI/micrograd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmicrograd.py
306 lines (235 loc) · 9.05 KB
/
micrograd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
"""
Defines a simple autograd engine and uses it to classify points in the plane
to 3 classes (red, green, blue) using a simple multilayer perceptron (MLP).
"""
import math
# -----------------------------------------------------------------------------
# class that mimics the random interface in Python, fully deterministic,
# and in a way that we also control fully, and can also use in C, etc.
class RNG:
def __init__(self, seed):
self.state = seed
def random_u32(self):
# xorshift rng: https://en.wikipedia.org/wiki/Xorshift#xorshift.2A
# doing & 0xFFFFFFFFFFFFFFFF is the same as cast to uint64 in C
# doing & 0xFFFFFFFF is the same as cast to uint32 in C
self.state ^= (self.state >> 12) & 0xFFFFFFFFFFFFFFFF
self.state ^= (self.state << 25) & 0xFFFFFFFFFFFFFFFF
self.state ^= (self.state >> 27) & 0xFFFFFFFFFFFFFFFF
return ((self.state * 0x2545F4914F6CDD1D) >> 32) & 0xFFFFFFFF
def random(self):
# random float32 in [0, 1)
return (self.random_u32() >> 8) / 16777216.0
def uniform(self, a=0.0, b=1.0):
# random float32 in [a, b)
return a + (b-a) * self.random()
random = RNG(42)
# -----------------------------------------------------------------------------
# Value
class Value:
""" stores a single scalar value and its gradient """
def __init__(self, data, _children=(), _op=''):
self.data = data
self.grad = 0
# internal variables used for autograd graph construction
self._backward = lambda: None
self._prev = set(_children)
self._op = _op # the op that produced this node, for graphviz / debugging / etc
def __add__(self, other):
other = other if isinstance(other, Value) else Value(other)
out = Value(self.data + other.data, (self, other), '+')
def _backward():
self.grad += out.grad
other.grad += out.grad
out._backward = _backward
return out
def __mul__(self, other):
other = other if isinstance(other, Value) else Value(other)
out = Value(self.data * other.data, (self, other), '*')
def _backward():
self.grad += other.data * out.grad
other.grad += self.data * out.grad
out._backward = _backward
return out
def __pow__(self, other):
assert isinstance(other, (int, float)), "only supporting int/float powers for now"
out = Value(self.data**other, (self,), f'**{other}')
def _backward():
self.grad += (other * self.data**(other-1)) * out.grad
out._backward = _backward
return out
def relu(self):
out = Value(0 if self.data < 0 else self.data, (self,), 'ReLU')
def _backward():
self.grad += (out.data > 0) * out.grad
out._backward = _backward
return out
def tanh(self):
out = Value(math.tanh(self.data), (self,), 'tanh')
def _backward():
self.grad += (1 - out.data**2) * out.grad
out._backward = _backward
return out
def exp(self):
out = Value(math.exp(self.data), (self,), 'exp')
def _backward():
self.grad += math.exp(self.data) * out.grad
out._backward = _backward
return out
def log(self):
# (this is the natural log)
out = Value(math.log(self.data), (self,), 'log')
def _backward():
self.grad += (1/self.data) * out.grad
out._backward = _backward
return out
def backward(self):
# topological order all of the children in the graph
topo = []
visited = set()
def build_topo(v):
if v not in visited:
visited.add(v)
for child in v._prev:
build_topo(child)
topo.append(v)
build_topo(self)
# go one variable at a time and apply the chain rule to get its gradient
self.grad = 1
for v in reversed(topo):
v._backward()
def __neg__(self): # -self
return self * -1
def __radd__(self, other): # other + self
return self + other
def __sub__(self, other): # self - other
return self + (-other)
def __rsub__(self, other): # other - self
return other + (-self)
def __rmul__(self, other): # other * self
return self * other
def __truediv__(self, other): # self / other
return self * other**-1
def __rtruediv__(self, other): # other / self
return other * self**-1
def __repr__(self):
return f"Value(data={self.data}, grad={self.grad})"
# -----------------------------------------------------------------------------
# Multi-Layer Perceptron (MLP) network
class Module:
def zero_grad(self):
for p in self.parameters():
p.grad = 0
def parameters(self):
return []
class Neuron(Module):
def __init__(self, nin, nonlin=True):
r = random.uniform(-1, 1) * nin**-0.5
self.w = [Value(r) for _ in range(nin)]
self.b = Value(0)
self.nonlin = nonlin
def __call__(self, x):
act = sum((wi*xi for wi,xi in zip(self.w, x)), self.b)
return act.tanh() if self.nonlin else act
def parameters(self):
return self.w + [self.b]
def __repr__(self):
return f"{'TanH' if self.nonlin else 'Linear'}Neuron({len(self.w)})"
class Layer(Module):
def __init__(self, nin, nout, **kwargs):
self.neurons = [Neuron(nin, **kwargs) for _ in range(nout)]
def __call__(self, x):
out = [n(x) for n in self.neurons]
return out[0] if len(out) == 1 else out
def parameters(self):
return [p for n in self.neurons for p in n.parameters()]
def __repr__(self):
return f"Layer of [{', '.join(str(n) for n in self.neurons)}]"
class MLP(Module):
def __init__(self, nin, nouts):
sz = [nin] + nouts
self.layers = [Layer(sz[i], sz[i+1], nonlin=i!=len(nouts)-1) for i in range(len(nouts))]
def __call__(self, x):
for layer in self.layers:
x = layer(x)
return x
def parameters(self):
return [p for layer in self.layers for p in layer.parameters()]
def __repr__(self):
return f"MLP of [{', '.join(str(layer) for layer in self.layers)}]"
# -----------------------------------------------------------------------------
# loss function: the negative log likelihood (NLL) loss
def nll_loss(logits, target):
# subtract the max for numerical stability (avoids overflow)
max_val = max(val.data for val in logits)
logits = [val - max_val for val in logits]
# 1) evaluate elementwise e^x
ex = [x.exp() for x in logits]
# 2) compute the sum of the above
denom = sum(ex)
# 3) normalize by the sum to get probabilities
probs = [x / denom for x in ex]
# 4) log the probabilities at target
logp = (probs[target]).log()
# 5) the negative log likelihood loss (invert so we get a loss - lower is better)
nll = -logp
return nll
# -----------------------------------------------------------------------------
# let's train!
# generate a random dataset with 100 2-dimensional datapoints in 3 classes
def gen_data(n=100):
pts = []
for _ in range(n):
x = random.uniform(-2.0, 2.0)
y = random.uniform(-2.0, 2.0)
# concentric circles
# label = 0 if x**2 + y**2 < 1 else 1 if x**2 + y**2 < 2 else 2
# very simple dataset
label = 0 if x < 0 else 1 if y < 0 else 2
pts.append(([x, y], label))
# create train/val/test splits of the data (80%, 10%, 10%)
tr = pts[:int(0.8*n)]
val = pts[int(0.8*n):int(0.9*n)]
te = pts[int(0.9*n):]
return tr, val, te
train_split, val_split, test_split = gen_data()
# init the model: 2D inputs, 16 neurons, 3 outputs (logits)
model = MLP(2, [16, 3])
def eval_split(model, split):
# evaluate the loss of a split
loss = Value(0)
for x, y in split:
logits = model([Value(x[0]), Value(x[1])])
loss += nll_loss(logits, y)
loss = loss * (1.0/len(split)) # normalize the loss
return loss.data
# optimize using Adam
learning_rate = 1e-1
beta1 = 0.9
beta2 = 0.95
weight_decay = 1e-4
for p in model.parameters():
p.m = 0.0
p.v = 0.0
# train
for step in range(100):
# evaluate the validation split every few steps
if step % 10 == 0:
val_loss = eval_split(model, val_split)
print(f"step {step}, val loss {val_loss}")
# forward the network (get logits of all training datapoints)
loss = Value(0)
for x, y in train_split:
logits = model([Value(x[0]), Value(x[1])])
loss += nll_loss(logits, y)
loss = loss * (1.0/len(train_split)) # normalize the loss
# backward pass (deposit the gradients)
loss.backward()
# update with Adam
for p in model.parameters():
p.m = beta1 * p.m + (1 - beta1) * p.grad
p.v = beta2 * p.v + (1 - beta2) * p.grad**2
p.data -= learning_rate * p.m / (p.v**0.5 + 1e-8)
p.data -= weight_decay * p.data # weight decay
model.zero_grad()
print(f"step {step}, train loss {loss.data}")