-
Notifications
You must be signed in to change notification settings - Fork 883
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: update voice separation logic and optimize code structure fix: resolve ffmpeg encoding errors and phrase initialization issue refactor: remove whisperX replicate API support for simplicity perf: enhance prompt for broader model compatibility style: reduce translation block size to minimize errors
- Loading branch information
Showing
25 changed files
with
506 additions
and
846 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
import os, sys, subprocess | ||
import pandas as pd | ||
from moviepy.editor import AudioFileClip | ||
from typing import Dict, List, Tuple | ||
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) | ||
from core.config_utils import update_key | ||
from core.all_whisper_methods.demucs_vl import RAW_AUDIO_FILE, AUDIO_DIR | ||
|
||
def convert_video_to_audio(input_file: str) -> str: | ||
os.makedirs(AUDIO_DIR, exist_ok=True) | ||
if not os.path.exists(RAW_AUDIO_FILE): | ||
print(f"🎬➡️🎵 Converting to audio with FFmpeg ......") | ||
subprocess.run([ | ||
'ffmpeg', '-y', '-i', input_file, '-vn', '-b:a', '64k', | ||
'-ar', '16000', '-ac', '1', '-metadata', 'encoding=UTF-8', | ||
'-f', 'mp3', RAW_AUDIO_FILE | ||
], check=True, stderr=subprocess.PIPE) | ||
print(f"🎬➡️🎵 Converted <{input_file}> to <{RAW_AUDIO_FILE}> with FFmpeg\n") | ||
|
||
return RAW_AUDIO_FILE | ||
|
||
def _detect_silence(audio_file: str, start: float, end: float) -> List[float]: | ||
"""Detect silence points in the given audio segment""" | ||
cmd = ['ffmpeg', '-y', '-i', audio_file, | ||
'-ss', str(start), '-to', str(end), | ||
'-af', 'silencedetect=n=-30dB:d=0.5', | ||
'-f', 'null', '-'] | ||
|
||
output = subprocess.run(cmd, capture_output=True, text=True, | ||
encoding='utf-8').stderr | ||
|
||
return [float(line.split('silence_end: ')[1].split(' ')[0]) | ||
for line in output.split('\n') | ||
if 'silence_end' in line] | ||
|
||
def split_audio(audio_file: str, target_len: int = 50*60, win: int = 60) -> List[Tuple[float, float]]: | ||
print("🔪 Starting audio segmentation...") | ||
|
||
with AudioFileClip(audio_file) as audio: | ||
duration = audio.duration | ||
segments = [] | ||
pos = 0 | ||
while pos < duration: | ||
if duration - pos < target_len: | ||
segments.append((pos, duration)) | ||
break | ||
win_start = pos + target_len - win | ||
win_end = min(win_start + 2 * win, duration) | ||
silences = _detect_silence(audio_file, win_start, win_end) | ||
|
||
if silences: | ||
target_pos = target_len - (win_start - pos) | ||
split_at = next((t for t in silences if t - win_start > target_pos), None) | ||
if split_at: | ||
segments.append((pos, split_at)) | ||
pos = split_at | ||
continue | ||
segments.append((pos, pos + target_len)) | ||
pos += target_len | ||
|
||
print(f"🔪 Audio split into {len(segments)} segments") | ||
return segments | ||
|
||
def process_transcription(result: Dict) -> pd.DataFrame: | ||
all_words = [] | ||
for segment in result['segments']: | ||
for word in segment['words']: | ||
# Check word length | ||
if len(word["word"]) > 20: | ||
print(f"⚠️ Warning: Detected word longer than 20 characters, skipping: {word['word']}") | ||
continue | ||
|
||
# ! For French, we need to convert guillemets to empty strings | ||
word["word"] = word["word"].replace('»', '').replace('«', '') | ||
|
||
if 'start' not in word and 'end' not in word: | ||
if all_words: | ||
# Assign the end time of the previous word as the start and end time of the current word | ||
word_dict = { | ||
'text': word["word"], | ||
'start': all_words[-1]['end'], | ||
'end': all_words[-1]['end'], | ||
} | ||
all_words.append(word_dict) | ||
else: | ||
# If it's the first word, look next for a timestamp then assign it to the current word | ||
next_word = next((w for w in segment['words'] if 'start' in w and 'end' in w), None) | ||
if next_word: | ||
word_dict = { | ||
'text': word["word"], | ||
'start': next_word["start"], | ||
'end': next_word["end"], | ||
} | ||
all_words.append(word_dict) | ||
else: | ||
raise Exception(f"No next word with timestamp found for the current word : {word}") | ||
else: | ||
# Normal case, with start and end times | ||
word_dict = { | ||
'text': f'{word["word"]}', | ||
'start': word.get('start', all_words[-1]['end'] if all_words else 0), | ||
'end': word['end'], | ||
} | ||
|
||
all_words.append(word_dict) | ||
|
||
return pd.DataFrame(all_words) | ||
|
||
def save_results(df: pd.DataFrame): | ||
os.makedirs('output/log', exist_ok=True) | ||
excel_path = os.path.join('output/log', "cleaned_chunks.xlsx") | ||
|
||
# Remove rows where 'text' is empty | ||
initial_rows = len(df) | ||
df = df[df['text'].str.len() > 0] | ||
removed_rows = initial_rows - len(df) | ||
if removed_rows > 0: | ||
print(f"ℹ️ Removed {removed_rows} row(s) with empty text.") | ||
|
||
# Check for and remove words longer than 20 characters | ||
long_words = df[df['text'].str.len() > 20] | ||
if not long_words.empty: | ||
print(f"⚠️ Warning: Detected {len(long_words)} word(s) longer than 20 characters. These will be removed.") | ||
df = df[df['text'].str.len() <= 20] | ||
|
||
df['text'] = df['text'].apply(lambda x: f'"{x}"') | ||
df.to_excel(excel_path, index=False) | ||
print(f"📊 Excel file saved to {excel_path}") | ||
|
||
def save_language(language: str): | ||
update_key("whisper.detected_language", language) |
Oops, something went wrong.