forked from FengQuanLi/ResnetGPT
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit f67f8b3
Showing
28 changed files
with
1,565 additions
and
0 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import torch | ||
from torchtext import data | ||
import numpy as np | ||
from torch.autograd import Variable | ||
|
||
|
||
def nopeak_mask(size, device): | ||
np_mask = np.triu(np.ones((1, size, size)), | ||
k=1).astype('uint8') | ||
variable = Variable | ||
np_mask = variable(torch.from_numpy(np_mask) == 0) | ||
np_mask = np_mask.cuda(device) | ||
return np_mask | ||
|
||
def create_masks(src, trg, device): | ||
|
||
src_mask = (src != -1).unsqueeze(-2) | ||
|
||
if trg is not None: | ||
trg_mask = (trg != -1).unsqueeze(-2) | ||
trg_mask.cuda(device) | ||
size = trg.size(1) # get seq_len for matrix | ||
np_mask = nopeak_mask(size, device) | ||
trg_mask = trg_mask & np_mask | ||
else: | ||
trg_mask = None | ||
return src_mask, trg_mask | ||
|
||
# patch on Torchtext's batching process that makes it more efficient | ||
# from http://nlp.seas.harvard.edu/2018/04/03/attention.html#position-wise-feed-forward-networks | ||
|
||
class MyIterator(data.Iterator): | ||
def create_batches(self): | ||
if self.train: | ||
def pool(d, random_shuffler): | ||
for p in data.batch(d, self.batch_size * 100): | ||
p_batch = data.batch( | ||
sorted(p, key=self.sort_key), | ||
self.batch_size, self.batch_size_fn) | ||
for b in random_shuffler(list(p_batch)): | ||
yield b | ||
self.batches = pool(self.data(), self.random_shuffler) | ||
|
||
else: | ||
self.batches = [] | ||
for b in data.batch(self.data(), self.batch_size, | ||
self.batch_size_fn): | ||
self.batches.append(sorted(b, key=self.sort_key)) | ||
|
||
global max_src_in_batch, max_tgt_in_batch | ||
|
||
def batch_size_fn(new, count, sofar): | ||
"Keep augmenting batch and calculate total number of tokens + padding." | ||
global max_src_in_batch, max_tgt_in_batch | ||
if count == 1: | ||
max_src_in_batch = 0 | ||
max_tgt_in_batch = 0 | ||
max_src_in_batch = max(max_src_in_batch, len(new.src)) | ||
max_tgt_in_batch = max(max_tgt_in_batch, len(new.trg) + 2) | ||
src_elements = count * max_src_in_batch | ||
tgt_elements = count * max_tgt_in_batch | ||
return max(src_elements, tgt_elements) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
import torch | ||
import torch.nn as nn | ||
import math | ||
from torch.autograd import Variable | ||
import torch.nn.functional as F | ||
import numpy as np | ||
|
||
|
||
class Embedder(nn.Module): | ||
def __init__(self, vocab_size, d_model): | ||
super().__init__() | ||
self.d_model = d_model | ||
self.embed = Embedder2(vocab_size, d_model) | ||
|
||
def forward(self, x): | ||
return self.embed(x) | ||
|
||
|
||
class PositionalEncoder(nn.Module): | ||
def __init__(self, d_model, max_seq_len=1024, dropout=0.1): | ||
super().__init__() | ||
self.d_model = d_model | ||
self.dropout = nn.Dropout(dropout) | ||
# create constant 'pe' matrix with values dependant on | ||
# pos and i | ||
pe = torch.zeros(max_seq_len, d_model) | ||
for pos in range(max_seq_len): | ||
for i in range(0, d_model, 2): | ||
pe[pos, i] = \ | ||
math.sin(pos / (10000 ** ((2 * i) / d_model))) | ||
pe[pos, i + 1] = \ | ||
math.cos(pos / (10000 ** ((2 * (i + 1)) / d_model))) | ||
pe = pe.unsqueeze(0) | ||
self.register_buffer('pe', pe) | ||
|
||
def forward(self, x): | ||
# make embeddings relatively larger | ||
x = x * math.sqrt(self.d_model) | ||
# add constant to embedding | ||
seq_len = x.size(1) | ||
pe = Variable(self.pe[:, :seq_len], requires_grad=False) | ||
if x.is_cuda: | ||
pe.cuda() | ||
x = x + pe | ||
x = self.dropout(x) | ||
return x | ||
|
||
|
||
class Embedder2(nn.Module): | ||
def __init__(self, num_embeddings, embedding_dim, padding_idx=None, | ||
max_norm=None, norm_type=2., scale_grad_by_freq=False, | ||
sparse=False, _weight=None): | ||
super(Embedder2, self).__init__() | ||
self.num_embeddings = num_embeddings | ||
self.embedding_dim = embedding_dim | ||
if padding_idx is not None: | ||
if padding_idx > 0: | ||
assert padding_idx < self.num_embeddings, 'Padding_idx must be within num_embeddings' | ||
elif padding_idx < 0: | ||
assert padding_idx >= -self.num_embeddings, 'Padding_idx must be within num_embeddings' | ||
padding_idx = self.num_embeddings + padding_idx | ||
self.padding_idx = padding_idx | ||
self.max_norm = max_norm | ||
self.norm_type = norm_type | ||
self.scale_grad_by_freq = scale_grad_by_freq | ||
if _weight is None: | ||
np.random.seed(1) | ||
np数 = np.random.uniform(0, 1, (num_embeddings, embedding_dim)) | ||
self.weight = nn.Parameter(torch.Tensor(np数)) | ||
# self.weight = nn.Parameter(torch.Tensor(num_embeddings, embedding_dim)) | ||
#self.reset_parameters() | ||
else: | ||
assert list(_weight.shape) == [num_embeddings, embedding_dim], \ | ||
'Shape of weight does not match num_embeddings and embedding_dim' | ||
self.weight = nn.Parameter(_weight) | ||
self.sparse = sparse | ||
a = 0 | ||
|
||
def reset_parameters(self): | ||
nn.init.normal_(self.weight) | ||
if self.padding_idx is not None: | ||
with torch.no_grad(): | ||
self.weight[self.padding_idx].fill_(0) | ||
|
||
def forward(self, input): | ||
return F.embedding( | ||
input, self.weight, self.padding_idx, self.max_norm, | ||
self.norm_type, self.scale_grad_by_freq, self.sparse) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import torch | ||
import torch.nn as nn | ||
from Sublayers import FeedForward, MultiHeadAttention, Norm | ||
|
||
|
||
class DecoderLayer(nn.Module): | ||
def __init__(self, d_model, heads, dropout=0.1): | ||
super().__init__() | ||
self.norm_1 = Norm(d_model) | ||
self.norm_2 = Norm(d_model) | ||
self.norm_3 = Norm(d_model) | ||
|
||
self.dropout_1 = nn.Dropout(dropout) | ||
self.dropout_2 = nn.Dropout(dropout) | ||
self.dropout_3 = nn.Dropout(dropout) | ||
|
||
self.attn_1 = MultiHeadAttention(heads, d_model, dropout=dropout) | ||
self.attn_2 = MultiHeadAttention(heads, d_model, dropout=dropout) | ||
self.ff = FeedForward(d_model, dropout=dropout) | ||
|
||
def forward(self, x, trg_mask): | ||
x2 = self.norm_1(x) | ||
x = x + self.dropout_1(self.attn_1(x2, x2, x2, trg_mask)) | ||
x2 = self.norm_3(x) | ||
x2 = self.ff(x2) | ||
x = x + self.dropout_3(x2) | ||
return x |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
import torch | ||
import torch.nn as nn | ||
from Layers import DecoderLayer | ||
from Embed import Embedder, PositionalEncoder | ||
from Sublayers import Norm, 全连接层 | ||
import copy | ||
import os.path | ||
import torchvision | ||
def get_clones(module, N): | ||
return nn.ModuleList([copy.deepcopy(module) for i in range(N)]) | ||
|
||
|
||
|
||
class Decoder(nn.Module): | ||
def __init__(self, vocab_size, d_model, N, heads, dropout, 最大长度=1024): | ||
super().__init__() | ||
self.N = N | ||
self.embed = Embedder(vocab_size, d_model) | ||
self.embedP = Embedder(最大长度, d_model) | ||
# self.pe = PositionalEncoder(d_model, dropout=dropout) | ||
self.layers = get_clones(DecoderLayer(d_model, heads, dropout), N) | ||
self.norm = Norm(d_model) | ||
def forward(self,图向量,操作 ,trg_mask): | ||
position = torch.arange(0, 图向量.size(1), dtype=torch.long, | ||
device=图向量.device) | ||
|
||
|
||
x = 图向量+self.embedP(position)+self.embed(操作)*0 | ||
|
||
|
||
|
||
for i in range(self.N): | ||
x = self.layers[i](x, trg_mask) | ||
return self.norm(x) | ||
|
||
class Transformer(nn.Module): | ||
def __init__(self, trg_vocab, d_model, N, heads, dropout,图向量尺寸=6*6*2048): | ||
super().__init__() | ||
self.图转= 全连接层(图向量尺寸,d_model) | ||
|
||
|
||
|
||
self.decoder = Decoder(trg_vocab, d_model, N, heads, dropout) | ||
self.out = 全连接层(d_model, trg_vocab) | ||
|
||
def forward(self, 图向量 ,操作, trg_mask): | ||
图向量=self.图转(图向量) | ||
|
||
d_output = self.decoder(图向量,操作 , trg_mask) | ||
output = self.out(d_output) | ||
return output | ||
|
||
class RESNET_Transformer(nn.Module): | ||
def __init__(self, trg_vocab, d_model, N, heads, dropout,图向量尺寸=1000): | ||
super().__init__() | ||
self.图转= 全连接层(图向量尺寸,d_model) | ||
|
||
self.resnet = torchvision.models.resnet18(pretrained=False).eval().requires_grad_(True) | ||
|
||
self.decoder = Decoder(trg_vocab, d_model, N, heads, dropout) | ||
self.out = 全连接层(d_model, trg_vocab) | ||
|
||
def forward(self, 图向量 , trg_mask): | ||
x=self.resnet(图向量).unsqueeze(0) | ||
图向量=self.图转(x) | ||
|
||
d_output = self.decoder(图向量, trg_mask) | ||
output = self.out(d_output) | ||
output=output[:,-1,:] | ||
return output | ||
def get_model(opt, trg_vocab,model_weights='model_weights'): | ||
|
||
assert opt.d_model % opt.heads == 0 | ||
assert opt.dropout < 1 | ||
|
||
model = Transformer( trg_vocab, opt.d_model, opt.n_layers, opt.heads, opt.dropout) | ||
|
||
if opt.load_weights is not None and os.path.isfile(opt.load_weights+'/'+model_weights): | ||
print("loading pretrained weights...") | ||
model.load_state_dict(torch.load(f'{opt.load_weights}/'+model_weights)) | ||
else: | ||
量 = 0 | ||
for p in model.parameters(): | ||
if p.dim() > 1: | ||
#nn.init.xavier_uniform_(p) | ||
a=0 | ||
长 = len(p.shape) | ||
点数 = 1 | ||
for j in range(长): | ||
点数 = p.shape[j] * 点数 | ||
|
||
量 += 点数 | ||
print('使用参数:{}百万'.format(量/1000000)) | ||
return model | ||
|
||
|
||
def get_modelB(opt, trg_vocab): | ||
assert opt.d_model % opt.heads == 0 | ||
assert opt.dropout < 1 | ||
|
||
model = RESNET_Transformer(trg_vocab, opt.d_model, opt.n_layers, opt.heads, opt.dropout) | ||
|
||
if opt.load_weights is not None and os.path.isfile(opt.load_weights + '/model_weightsB'): | ||
print("loading pretrained weights...") | ||
model.load_state_dict(torch.load(f'{opt.load_weights}/model_weightsB')) | ||
else: | ||
量 = 0 | ||
for p in model.parameters(): | ||
if p.dim() > 1: | ||
# nn.init.xavier_uniform_(p) | ||
a = 0 | ||
长 = len(p.shape) | ||
点数 = 1 | ||
for j in range(长): | ||
点数 = p.shape[j] * 点数 | ||
|
||
量 += 点数 | ||
print('使用参数:{}百万'.format(量 / 1000000)) | ||
return model |
Oops, something went wrong.