-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathAutoTimes_Opt_1b.py
82 lines (71 loc) · 3.66 KB
/
AutoTimes_Opt_1b.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import torch
import torch.nn as nn
from transformers import OPTForCausalLM
from layers.mlp import MLP
class Model(nn.Module):
def __init__(self, configs):
super(Model, self).__init__()
self.token_len = configs.token_len
if configs.use_multi_gpu:
self.device = f"cuda:{configs.local_rank}"
else:
self.device = f"cuda:{configs.gpu}"
print(self.device)
self.opt = OPTForCausalLM.from_pretrained(configs.llm_ckp_dir, torch_dtype=torch.float16)
self.opt.model.decoder.project_in = None
self.opt.model.decoder.project_out = None
self.hidden_dim_of_opt1b = 2048
self.mix = configs.mix_embeds
if self.mix:
self.add_scale = nn.Parameter(torch.ones([]))
for name, param in self.opt.named_parameters():
param.requires_grad = False
if configs.mlp_hidden_layers == 0:
if not configs.use_multi_gpu or (configs.use_multi_gpu and configs.local_rank == 0):
print("use linear as tokenizer and detokenizer")
self.encoder = nn.Linear(self.token_len, self.hidden_dim_of_opt1b)
self.decoder = nn.Linear(self.hidden_dim_of_opt1b, self.token_len)
else:
if not configs.use_multi_gpu or (configs.use_multi_gpu and configs.local_rank == 0):
print("use mlp as tokenizer and detokenizer")
self.encoder = MLP(self.token_len, self.hidden_dim_of_opt1b,
configs.mlp_hidden_dim, configs.mlp_hidden_layers,
configs.dropout, configs.mlp_activation)
self.decoder = MLP(self.hidden_dim_of_opt1b, self.token_len,
configs.mlp_hidden_dim, configs.mlp_hidden_layers,
configs.dropout, configs.mlp_activation)
def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec):
means = x_enc.mean(1, keepdim=True).detach()
x_enc = x_enc - means
stdev = torch.sqrt(
torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
x_enc /= stdev
bs, _, n_vars = x_enc.shape
# x_enc: [bs x nvars x seq_len]
x_enc = x_enc.permute(0, 2, 1)
# x_enc: [bs * nvars x seq_len]
x_enc = x_enc.reshape(x_enc.shape[0] * x_enc.shape[1], -1)
# fold_out: [bs * n_vars x token_num x token_len]
fold_out = x_enc.unfold(dimension=-1, size=self.token_len, step=self.token_len)
token_num = fold_out.shape[1]
# times_embeds: [bs * n_vars x token_num x hidden_dim_of_opt1b]
times_embeds = self.encoder(fold_out)
if self.mix:
times_embeds = times_embeds / times_embeds.norm(dim=2, keepdim=True)
x_mark_enc = x_mark_enc / x_mark_enc.norm(dim=2, keepdim=True)
times_embeds = times_embeds + self.add_scale * x_mark_enc
# outputs: [bs * n_vars x token_num x hidden_dim_of_opt1b]
outputs = self.opt.model(
inputs_embeds=times_embeds).last_hidden_state
# dec_out: [bs * n_vars x token_num x token_len]
dec_out = self.decoder(outputs)
dec_out = dec_out.reshape(bs, n_vars, -1)
# dec_out: [bs x token_num * token_len x n_vars]
dec_out = dec_out.permute(0, 2, 1)
dec_out = dec_out * \
(stdev[:, 0, :].unsqueeze(1).repeat(1, token_num * self.token_len, 1))
dec_out = dec_out + \
(means[:, 0, :].unsqueeze(1).repeat(1, token_num * self.token_len, 1))
return dec_out
def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec):
return self.forecast(x_enc, x_mark_enc, x_dec, x_mark_dec)