-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathrwkv_tokenizer.py
103 lines (90 loc) · 3.13 KB
/
rwkv_tokenizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
########################################################################################################
# The RWKV Language Model - https://github.com/BlinkDL/RWKV-LM
########################################################################################################
class TRIE:
__slots__ = tuple("ch,to,values,front".split(","))
to:list
values:set
def __init__(self, front=None, ch=None):
self.ch = ch
self.to = [None for ch in range(256)]
self.values = set()
self.front = front
def __repr__(self):
fr = self
ret = []
while(fr!=None):
if(fr.ch!=None):
ret.append(fr.ch)
fr = fr.front
return "<TRIE %s %s>"%(ret[::-1], self.values)
def add(self, key:bytes, idx:int=0, val=None):
if(idx == len(key)):
if(val is None):
val = key
self.values.add(val)
return self
ch = key[idx]
if(self.to[ch] is None):
self.to[ch] = TRIE(front=self, ch=ch)
return self.to[ch].add(key, idx=idx+1, val=val)
def find_longest(self, key:bytes, idx:int=0):
u:TRIE = self
ch:int = key[idx]
while(u.to[ch] is not None):
u = u.to[ch]
idx += 1
if(u.values):
ret = idx, u, u.values
if(idx==len(key)):
break
ch = key[idx]
return ret
class TRIE_TOKENIZER():
def __init__(self, file_name):
self.idx2token = {}
sorted = [] # must be already sorted
with open(file_name, "r", encoding="utf-8") as f:
lines = f.readlines()
for l in lines:
idx = int(l[:l.index(' ')])
x = eval(l[l.index(' '):l.rindex(' ')])
x = x.encode("utf-8") if isinstance(x, str) else x
assert isinstance(x, bytes)
assert len(x) == int(l[l.rindex(' '):])
sorted += [x]
self.idx2token[idx] = x
self.token2idx = {}
for k,v in self.idx2token.items():
self.token2idx[v] = int(k)
self.root = TRIE()
for t, i in self.token2idx.items():
_ = self.root.add(t, val=(t, i))
def encodeBytes(self, src:bytes):
idx:int = 0
tokens = []
while (idx < len(src)):
_idx:int = idx
idx, _, values = self.root.find_longest(src, idx)
assert(idx != _idx)
_, token = next(iter(values))
tokens.append(token)
return tokens
def decodeBytes(self, tokens):
return b''.join(map(lambda i: self.idx2token[i], tokens))
def encode(self, src):
return self.encodeBytes(src.encode("utf-8"))
def decode(self, tokens):
try:
return self.decodeBytes(tokens).decode('utf-8')
except:
return '\ufffd' # bad utf-8
def printTokens(self, tokens):
for i in tokens:
s = self.idx2token[i]
try:
s = s.decode('utf-8')
except:
pass
print(f'{repr(s)}{i}', end=' ')
print()