-
Notifications
You must be signed in to change notification settings - Fork 12
get_info
muyu42 edited this page Feb 15, 2024
·
6 revisions
在计算QA对质量指标之前
通常需要获取一些模型的激活值或者梯度,比如embeding、ppl、梯度、loss等
我们先运行get info 获取这些值,并保存在本地,方便后续计算时调用
from transformers import LlamaTokenizer, LlamaForCausalLM
model = LlamaForCausalLM.from_pretrained(args.model_name_or_path, device_map="auto", cache_dir='../cache', output_hidden_states=True)
tokenizer = LlamaTokenizer.from_pretrained(args.model_name_or_path, cache_dir='../cache')
model.eval()
取最后一层,然后归一化,注意张量维度size 注意device之间的移动
def _get_embeddings(self, input_text, max_length=1024):
inputs = self.tokenizer(input_text, return_tensors="pt", truncation=True, max_length=max_length)#.to(device)
with torch.no_grad(): # 推理
outputs = self.model(**inputs, output_hidden_states=True)#output_hidden_states=True可以在推理这里也可以在模型实例化时定义
hidden_states = outputs.hidden_states
embeddings = hidden_states[-1] # Using the last layer hidden state
sentence_embedding = embeddings.mean(dim=1)
return sentence_embedding#.to('cpu')
def _get_ppl(self, input_text):
inputs = self.tokenizer(input_text, return_tensors='pt')
with torch.no_grad():
outputs = self.model(**inputs, output_hidden_states=True)
logits = outputs.logits
# Calculate PPL
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = inputs["input_ids"][..., 1:].contiguous()
loss = torch.nn.functional.cross_entropy(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1), reduction='none')
loss = loss.view(shift_labels.size())
ppl = torch.exp(loss.mean())
return ppl.item()
或者
def _get_ppl(self, input_text, max_length=1024):
inputs = self.tokenizer(input_text, return_tensors="pt", truncation=True, max_length=max_length)#.to(device)
with torch.no_grad():
outputs = model(input_ids, labels=input_ids.contiguous())
loss = outputs.loss
perplexity = torch.exp(loss)
return perplexity #.to('cpu')
def _get_gradients(self, input_text):
'''
在实际应用中,这个保存梯度的方法也可以进一步优化和定制化。
例如,可以只保存特定层的梯度,
或者以一定的格式来优化存储(例如,稀疏矩阵格式 比如lora)。
此代码提供了一个基本的模板,根据需求,可以进行进一步的调整。
'''
# Set model to training mode to compute gradients
self.model.train()
inputs = self.tokenizer(input_text, return_tensors='pt')
outputs = self.model(**inputs, output_hidden_states=True)
logits = outputs.logits
# Calculate loss
shift_labels = inputs["input_ids"][..., 1:].contiguous()
loss = torch.nn.functional.cross_entropy(logits.view(-1, logits.size(-1)), shift_labels.view(-1))
# Compute gradients
self.model.zero_grad() # Reset gradients
loss.backward() # Calculate new gradients
# Extract gradients and detach them
gradients = {name: param.grad.detach() for name, param in self.model.named_parameters() if param.grad is not None}
return gradients
def calculate_task_vector(self):
task_vector = {}
# Get parameters from each model and calculate task vector.
# We will assume both models contain the same parameters keys for simplicity
params1 = self.model.named_parameters()
params2 = self.model2.named_parameters()
dict_params2 = dict(params2)
for name1, param1 in params1:
# Only layers with learnable parameters are considered (Conv, Linear, etc.)
if param1.requires_grad:
# Calculate the task vector (difference of the parameters)
task_vector[name1] = param1.data - dict_params2[name1].data
return task_vector
def inference_and_save_info_pipeline(self, output_dir):
embeddings_collection = []
ppls = []
all_gradients = {}
for entry in self.dataset:
input_text = entry['text']
embedding, ppl = self._get_embeddings_and_ppl(input_text)
embeddings_collection.append(embedding.cpu().numpy())
ppls.append(ppl)
all_gradients.append({k: v.cpu().numpy() for k, v in gradients.items()})