forked from tinygrad/tinygrad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
transformer.py
74 lines (59 loc) · 3.29 KB
/
transformer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import numpy as np
from tinygrad.tensor import Tensor
class TransformerBlock:
def __init__(self, embed_dim, num_heads, ff_dim, prenorm=False, act=lambda x: x.relu(), dropout=0.1):
assert embed_dim % num_heads == 0, "embed_dim must be divisible by num_heads"
self.num_heads = num_heads
self.head_size = embed_dim // num_heads
self.prenorm, self.act = prenorm, act
self.dropout = dropout
self.query = (Tensor.scaled_uniform(embed_dim, embed_dim), Tensor.zeros(embed_dim))
self.key = (Tensor.scaled_uniform(embed_dim, embed_dim), Tensor.zeros(embed_dim))
self.value = (Tensor.scaled_uniform(embed_dim, embed_dim), Tensor.zeros(embed_dim))
self.out = (Tensor.scaled_uniform(embed_dim, embed_dim), Tensor.zeros(embed_dim))
self.ff1 = (Tensor.scaled_uniform(embed_dim, ff_dim), Tensor.zeros(ff_dim))
self.ff2 = (Tensor.scaled_uniform(ff_dim, embed_dim), Tensor.zeros(embed_dim))
self.ln1 = (Tensor.ones(embed_dim), Tensor.zeros(embed_dim))
self.ln2 = (Tensor.ones(embed_dim), Tensor.zeros(embed_dim))
def attn(self, x):
# x: (bs, time, embed_dim) -> (bs, time, embed_dim)
query, key, value = [x.linear(*y) \
.reshape(shape=(x.shape[0], -1, self.num_heads, self.head_size)) \
for y in [self.query, self.key, self.value]]
query = query.permute(order=(0,2,1,3)) # (bs, num_heads, time, head_size)
key = key.permute(order=(0,2,3,1)) # (bs, num_heads, head_size, time)
value = value.permute(order=(0,2,1,3)) # (bs, num_heads, time, head_size)
score = query.dot(key) * (1 / np.sqrt(self.head_size))
weights = score.softmax() # (bs, num_heads, time, time)
attention = weights.dot(value).permute(order=(0,2,1,3)) # (bs, time, num_heads, head_size)
return attention.reshape(shape=(x.shape[0], -1, self.num_heads * self.head_size)).linear(*self.out)
def __call__(self, x):
if self.prenorm:
x = x + self.attn(x.layernorm().linear(*self.ln1)).dropout(self.dropout)
x = x + self.act(x.layernorm().linear(*self.ln2).linear(*self.ff1)).linear(*self.ff2).dropout(self.dropout)
else:
x = x + self.attn(x).dropout(self.dropout)
x = x.layernorm().linear(*self.ln1)
x = x + self.act(x.linear(*self.ff1)).linear(*self.ff2).dropout(self.dropout)
x = x.layernorm().linear(*self.ln2)
return x
class Transformer:
def __init__(self, syms, maxlen, layers, embed_dim, num_heads, ff_dim):
self.maxlen, self.syms = maxlen, syms
self.embed = Tensor.scaled_uniform(maxlen+syms, embed_dim, requires_grad=False)
self.tbs = []
for i in range(layers):
self.tbs.append(TransformerBlock(embed_dim, num_heads, ff_dim))
self.final = Tensor.scaled_uniform(embed_dim, syms)
def forward(self, x):
bs = x.shape[0]
xnp = x.cpu().numpy().astype(np.int32)
onehot = np.zeros((bs, x.shape[1], self.maxlen+self.syms), dtype=np.float32)
for i in range(x.shape[1]):
onehot[range(bs), i, i] = 1
onehot[range(bs), i, self.maxlen + xnp[:, i]] = 1
onehot = onehot.reshape(bs*x.shape[1], self.maxlen+self.syms)
x = Tensor(onehot, device=x.device).dot(self.embed).reshape(shape=(bs, x.shape[1], -1))
x = x.sequential(self.tbs)
x = x.reshape(shape=(-1, x.shape[-1])).dot(self.final).log_softmax()
return x.reshape(shape=(bs, -1, x.shape[-1]))