-
Notifications
You must be signed in to change notification settings - Fork 25
/
se_extractor.py
154 lines (127 loc) · 5.32 KB
/
se_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import os
# import glob
import torch
import hashlib
import librosa
import base64
from glob import glob
import numpy as np
from pydub import AudioSegment
from faster_whisper import WhisperModel
from whisper_timestamped.transcribe import get_audio_tensor, get_vad_segments
model_size = "medium"
model_fp = 'fast-whisper'
# Run on GPU with FP16
model = None
def split_audio_whisper(audio_path, audio_name, target_dir='processed'):
global model
if model is None:
# model = WhisperModel(model_size, device="cuda", compute_type="float16")
model = WhisperModel(model_fp, device="cuda", compute_type="float32")
audio = AudioSegment.from_file(audio_path)
max_len = len(audio)
target_folder = os.path.join(target_dir, audio_name)
segments, info = model.transcribe(audio_path, beam_size=5, word_timestamps=True)
segments = list(segments)
# create directory
os.makedirs(target_folder, exist_ok=True)
wavs_folder = os.path.join(target_folder, 'wavs')
os.makedirs(wavs_folder, exist_ok=True)
# segments
s_ind = 0
start_time = None
for k, w in enumerate(segments):
# process with the time
if k == 0:
start_time = max(0, w.start)
end_time = w.end
# calculate confidence
if len(w.words) > 0:
confidence = sum([s.probability for s in w.words]) / len(w.words)
else:
confidence = 0.
# clean text
text = w.text.replace('...', '')
# left 0.08s for each audios
audio_seg = audio[int( start_time * 1000) : min(max_len, int(end_time * 1000) + 80)]
# segment file name
fname = f"{audio_name}_seg{s_ind}.wav"
# filter out the segment shorter than 1.5s and longer than 20s
save = audio_seg.duration_seconds > 1.5 and \
audio_seg.duration_seconds < 20. and \
len(text) >= 2 and len(text) < 200
if save:
output_file = os.path.join(wavs_folder, fname)
audio_seg.export(output_file, format='wav')
if k < len(segments) - 1:
start_time = max(0, segments[k+1].start - 0.08)
s_ind = s_ind + 1
return wavs_folder
# def split_audio_vad(audio_path, audio_name, target_dir, split_seconds=10.0, model_path='./'):
def split_audio_vad(audio_path, audio_name, target_dir, split_seconds=10.0):
SAMPLE_RATE = 16000
audio_vad = get_audio_tensor(audio_path)
segments = get_vad_segments(
audio_vad,
output_sample=True,
min_speech_duration=0.1,
min_silence_duration=1,
method="silero",
# model_path=model_path,
)
segments = [(seg["start"], seg["end"]) for seg in segments]
segments = [(float(s) / SAMPLE_RATE, float(e) / SAMPLE_RATE) for s,e in segments]
print(segments)
audio_active = AudioSegment.silent(duration=0)
audio = AudioSegment.from_file(audio_path)
for start_time, end_time in segments:
audio_active += audio[int( start_time * 1000) : int(end_time * 1000)]
audio_dur = audio_active.duration_seconds
print(f'after vad: dur = {audio_dur}')
target_folder = os.path.join(target_dir, audio_name)
wavs_folder = os.path.join(target_folder, 'wavs')
os.makedirs(wavs_folder, exist_ok=True)
start_time = 0.
count = 0
num_splits = int(np.round(audio_dur / split_seconds))
assert num_splits > 0, 'input audio is too short'
interval = audio_dur / num_splits
for i in range(num_splits):
end_time = min(start_time + interval, audio_dur)
if i == num_splits - 1:
end_time = audio_dur
output_file = f"{wavs_folder}/{audio_name}_seg{count}.wav"
audio_seg = audio_active[int(start_time * 1000): int(end_time * 1000)]
audio_seg.export(output_file, format='wav')
start_time = end_time
count += 1
return wavs_folder
def hash_numpy_array(audio_path):
array, _ = librosa.load(audio_path, sr=None, mono=True)
# Convert the array to bytes
array_bytes = array.tobytes()
# Calculate the hash of the array bytes
hash_object = hashlib.sha256(array_bytes)
hash_value = hash_object.digest()
# Convert the hash value to base64
base64_value = base64.b64encode(hash_value)
return base64_value.decode('utf-8')[:16].replace('/', '_^')
# def get_se(audio_path, vc_model, target_dir='processed', vad=True, model_path='./'):
def get_se(audio_path, vc_model, target_dir='processed', vad=True):
device = vc_model.device
audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{hash_numpy_array(audio_path)}"
se_path = os.path.join(target_dir, audio_name, 'se.pth')
if os.path.isfile(se_path):
se = torch.load(se_path).to(device)
return se, audio_name
if os.path.isdir(audio_path):
wavs_folder = audio_path
elif vad:
# wavs_folder = split_audio_vad(audio_path, target_dir=target_dir, audio_name=audio_name, model_path=model_path)
wavs_folder = split_audio_vad(audio_path, target_dir=target_dir, audio_name=audio_name)
else:
wavs_folder = split_audio_whisper(audio_path, target_dir=target_dir, audio_name=audio_name)
audio_segs = glob(f'{wavs_folder}/*.wav')
if len(audio_segs) == 0:
raise NotImplementedError('No audio segments found!')
return vc_model.extract_se(audio_segs, se_save_path=se_path), audio_name