-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathpreprocess.py
executable file
·150 lines (119 loc) · 5.18 KB
/
preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import librosa
import numpy as np
import os
import pyworld
import pyworld as pw
import glob
from utility import *
import argparse
from datetime import datetime
FEATURE_DIM = 36
SAMPLE_RATE = 16000
FRAMES = 512
FFTSIZE = 1024
SPEAKERS_NUM = len(speakers)
CHUNK_SIZE = 1 # concate CHUNK_SIZE audio clips together
EPSILON = 1e-10
MODEL_NAME = 'starganvc_model'
def load_wavs(dataset: str, sr):
'''
data dict contains all audios file path &
resdict contains all wav files
'''
data = {}
with os.scandir(dataset) as it:
for entry in it:
if entry.is_dir():
data[entry.name] = []
# print(entry.name, entry.path)
with os.scandir(entry.path) as it_f:
for onefile in it_f:
if onefile.is_file():
# print(onefile.path)
data[entry.name].append(onefile.path)
print(f'loaded keys: {data.keys()}')
#data like {TM1:[xx,xx,xxx,xxx]}
resdict = {}
cnt = 0
for key, value in data.items():
resdict[key] = {}
for one_file in value:
filename = one_file.split('/')[-1].split('.')[0] #like 100061
newkey = f'{filename}'
wav, _ = librosa.load(one_file, sr=sr, mono=True, dtype=np.float64)
y,_ = librosa.effects.trim(wav, top_db=15)
wav = np.append(y[0], y[1:] - 0.97 * y[:-1])
resdict[key][newkey] = wav
# resdict[key].append(temp_dict) #like TM1:{100062:[xxxxx], .... }
print('.', end='')
cnt += 1
print(f'\nTotal {cnt} aduio files!')
return resdict
def chunks(iterable, size):
"""Yield successive n-sized chunks from iterable."""
for i in range(0, len(iterable), size):
yield iterable[i:i + size]
def wav_to_mcep_file(dataset: str, sr=SAMPLE_RATE, processed_filepath: str = './data/processed'):
'''convert wavs to mcep feature using image repr'''
shutil.rmtree(processed_filepath)
os.makedirs(processed_filepath, exist_ok=True)
allwavs_cnt = len(glob.glob(f'{dataset}/*/*.wav'))
print(f'Total {allwavs_cnt} audio files!')
d = load_wavs(dataset, sr)
for one_speaker in d.keys():
values_of_one_speaker = list(d[one_speaker].values())
for index, one_chunk in enumerate (chunks(values_of_one_speaker, CHUNK_SIZE)):
wav_concated = [] #preserve one batch of wavs
temp = one_chunk.copy()
#concate wavs
for one in temp:
wav_concated.extend(one)
wav_concated = np.array(wav_concated)
#process one batch of wavs
f0, ap, sp, coded_sp = cal_mcep(wav_concated, sr=sr, dim=FEATURE_DIM)
newname = f'{one_speaker}_{index}'
file_path_z = os.path.join(processed_filepath, newname)
np.savez(file_path_z, f0=f0, coded_sp=coded_sp)
print(f'[save]: {file_path_z}')
#split mcep t0 muliti files
for start_idx in range(0, coded_sp.shape[1] - FRAMES + 1, FRAMES):
one_audio_seg = coded_sp[:, start_idx : start_idx+FRAMES]
if one_audio_seg.shape[1] == FRAMES:
temp_name = f'{newname}_{start_idx}'
filePath = os.path.join(processed_filepath, temp_name)
np.save(filePath, one_audio_seg)
print(f'[save]: {filePath}.npy')
def world_features(wav, sr, fft_size, dim):
f0, timeaxis = pyworld.harvest(wav, sr)
sp = pyworld.cheaptrick(wav, f0, timeaxis, sr,fft_size=fft_size)
ap = pyworld.d4c(wav, f0, timeaxis, sr, fft_size=fft_size)
coded_sp = pyworld.code_spectral_envelope(sp, sr, dim)
return f0, timeaxis, sp, ap, coded_sp
def cal_mcep(wav, sr=SAMPLE_RATE, dim=FEATURE_DIM, fft_size=FFTSIZE):
'''cal mcep given wav singnal
the frame_period used only for pad_wav_to_get_fixed_frames
'''
f0, timeaxis, sp, ap, coded_sp = world_features(wav, sr, fft_size, dim)
coded_sp = coded_sp.T # dim x n
return f0, ap, sp, coded_sp
if __name__ == "__main__":
start = datetime.now()
parser = argparse.ArgumentParser(description = 'Convert the wav waveform to mel-cepstral coefficients(MCCs)\
and calculate the speech statistical characteristics')
input_dir = './data/speakers'
output_dir = './data/processed'
parser.add_argument('--input_dir', type = str, help = 'the direcotry contains data need to be processed', default = input_dir)
parser.add_argument('--output_dir', type = str, help = 'the directory stores the processed data', default = output_dir)
argv = parser.parse_args("")
# added "" for parsing default arguments
input_dir = argv.input_dir
output_dir = argv.output_dir
os.makedirs(output_dir, exist_ok=True)
wav_to_mcep_file(input_dir, SAMPLE_RATE, processed_filepath=output_dir)
#input_dir is train dataset. we need to calculate and save the speech\
# statistical characteristics for each speaker.
generator = GenerateStatistics(output_dir)
generator.generate_stats()
generator.normalize_dataset()
end = datetime.now()
print(f"[Runing Time]: {end-start}")