Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
- 多视频格式支持
- 把下载视频也放在 output 下,修改了查找原视频的索引
  • Loading branch information
Huanshere committed Sep 3, 2024
1 parent b0be57a commit c32e518
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 62 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,5 @@ _model_cache/
*.webm
*.mp3
.DS_Store
runtime/
runtime/
dev/
5 changes: 4 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os, sys
# 建议在 streamlit 页面中调整设置
## ======================== 基本设置 ======================== ##
# API 设置 建议使用唯一真神 https://api.wlai.vip, sonnet 价格仅 10r/1M, 也可以(不建议)参考格式修改成你的API
# API 设置 建议使用唯一真神 https://api.wlai.vip, sonnet 价格仅 10r/1M
# !一定确保 key 是 AZ 渠道
API_KEY = 'sk-xxx'
BASE_URL = 'https://api2.wlai.vip'
MODEL = ['claude-3-5-sonnet-20240620']
Expand All @@ -25,6 +26,8 @@
AUDIO_LANGUAGE = 'en'

## ======================== 进阶设置设置 ======================== ##
# 支持视频格式
ALLOWED_VIDEO_FORMATS = ['mp4', 'mov', 'avi', 'mkv', 'flv', 'wmv', 'webm']

# gpt多线程数量
MAX_WORKERS = 8
Expand Down
1 change: 0 additions & 1 deletion core/ask_gpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def check_ask_gpt_history(prompt, model):

def select_llm(model):
from config import MODEL, API_KEY, BASE_URL
print(f"API_KEY: {API_KEY}")
if model in MODEL and API_KEY:
return {'api_key': API_KEY, 'base_url': BASE_URL, 'model': MODEL}
else:
Expand Down
68 changes: 42 additions & 26 deletions core/onekeycleanup.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,71 @@
import os
import os, sys
import glob
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.step1_ytdlp import find_video_files
import shutil

def cleanup():
# Get video file name
video_files = glob.glob("*.mp4") + glob.glob("*.webm")
if not video_files:
print("🚫 No video files found")
return
video_file = find_video_files()
video_name = video_file.split("/")[1]
video_name = os.path.splitext(video_name)[0]
video_name = sanitize_filename(video_name)

video_file = video_files[0]
video_name = os.path.splitext(video_file)[0]

# Create required folders
os.makedirs("history", exist_ok=True)
history_dir = os.path.join("history", video_name)
log_dir = os.path.join(history_dir, "log")
gpt_log_dir = os.path.join(history_dir, "gpt_log")

os.makedirs(log_dir, exist_ok=True)
os.makedirs(gpt_log_dir, exist_ok=True)

# Move log files
# 移动非日志文件
for file in glob.glob("output/*"):
if not file.endswith(('log', 'gpt_log')):
move_file(file, history_dir)

# 移动 log 文件
for file in glob.glob("output/log/*"):
move_file(file, log_dir)

# Move gpt_log files
# 移动 gpt_log 文件
for file in glob.glob("output/gpt_log/*"):
move_file(file, gpt_log_dir)

# Move subtitle files
for file in glob.glob("output/*"):
move_file(file, history_dir)

# Move video files
move_file(video_file, history_dir)
# 删除空的 output 目录
try:
os.rmdir("output/log")
os.rmdir("output/gpt_log")
os.rmdir("output")
except OSError:
pass # 忽略删除失败的错误

def move_file(src, dst):
try:
# 获取目标路径的目录和文件名
dst_dir, dst_filename = os.path.split(dst)
# 使用 os.path.join 来确保路径的正确性
dst = os.path.join(dst_dir, sanitize_filename(dst_filename))
# 获取源文件的文件名
src_filename = os.path.basename(src)
# 使用 os.path.join 来确保路径的正确性,并包含文件名
dst = os.path.join(dst, sanitize_filename(src_filename))

if os.path.exists(dst):
if os.path.isdir(dst):
# 如果目标是文件夹,尝试删除文件夹内容
shutil.rmtree(dst, ignore_errors=True)
else:
# 如果目标是文件,尝试删除文件
os.remove(dst)

shutil.move(src, dst, copy_function=shutil.copy2)
print(f"✅ 已移动: {src} -> {dst}")
except shutil.Error as e:
# 如果目标文件已存在,强制覆盖
os.remove(dst)
shutil.move(src, dst, copy_function=shutil.copy2)
print(f"✅ 已覆盖并移动: {src} -> {dst}")
except PermissionError:
print(f"⚠️ 权限错误: 无法删除 {dst},尝试直接覆盖")
try:
shutil.copy2(src, dst)
os.remove(src)
print(f"✅ 已复制并删除源文件: {src} -> {dst}")
except Exception as e:
print(f"❌ 移动失败: {src} -> {dst}")
print(f"错误信息: {str(e)}")
except Exception as e:
print(f"❌ 移动失败: {src} -> {dst}")
print(f"错误信息: {str(e)}")
Expand Down
15 changes: 14 additions & 1 deletion core/step1_ytdlp.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
import os,sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import glob
from yt_dlp import YoutubeDL

def download_video_ytdlp(url, save_path='./', resolution=1080):
def download_video_ytdlp(url, save_path='output', resolution=1080):
allowed_resolutions = [360, 480, 1080]
if resolution not in allowed_resolutions:
resolution = 1080

os.makedirs(save_path, exist_ok=True)
ydl_opts = {
'format': f'bestvideo[height<={resolution}]+bestaudio/best[height<={resolution}]',
'outtmpl': f'{save_path}/%(title)s.%(ext)s'
}
with YoutubeDL(ydl_opts) as ydl:
ydl.download([url])

def find_video_files(save_path='output'):
from config import ALLOWED_VIDEO_FORMATS
video_files = [file for file in glob.glob(save_path + "/*") if os.path.splitext(file)[1][1:] in ALLOWED_VIDEO_FORMATS]
video_files = [file for file in video_files if not file.startswith("output/output")]
# if num != 1, raise ValueError
if len(video_files) != 1:
raise ValueError(f"找到的视频数量不唯一,请检查。找到的视频数量: {len(video_files)}")
return video_files[0]

if __name__ == '__main__':
# 示例用法
url = input('请输入您想下载的视频URL: ')
resolution = input('请输入所需分辨率 (360/480/1080,默认1080): ')
resolution = int(resolution) if resolution.isdigit() else 1080
download_video_ytdlp(url, resolution=resolution)
print(f"🎥 视频已下载到 {find_video_files()}")
7 changes: 5 additions & 2 deletions core/step2_whisper_stamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import pandas as pd
from typing import List, Dict
import warnings
from core.step1_ytdlp import find_video_files
warnings.filterwarnings("ignore")
from config import WHISPER_MODEL, MODEL_DIR, AUDIO_LANGUAGE

def convert_video_to_audio_and_transcribe(input_file: str):
from config import WHISPER_MODEL, MODEL_DIR, AUDIO_LANGUAGE
# 🎬➡️🎵➡️📊 Convert video to audio and transcribe
audio_file = os.path.splitext(input_file)[0] + '_temp.mp3'

Expand Down Expand Up @@ -86,4 +87,6 @@ def transcript(video_file: StopIteration):
print("📊 转录结果已存在,跳过转录步骤。")

if __name__ == "__main__":
transcript("KUNG FU PANDA 4 | Official Trailer.mp4")
video_file = find_video_files()
print(f"🎬 找到的视频文件: {video_file}, 开始转录...")
transcript(video_file)
11 changes: 4 additions & 7 deletions core/step7_merge_sub_to_vid.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os, glob, subprocess, time, sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.step1_ytdlp import find_video_files

EN_FONT_SIZE = 16
TRANS_FONT_SIZE = 18
Expand All @@ -20,11 +21,7 @@ def merge_subtitles_to_video():
from config import RESOLUTIOM
TARGET_WIDTH, TARGET_HEIGHT = RESOLUTIOM.split('x')
## merge subtitles to video and save the output video
video_files = glob.glob("*.mp4") + glob.glob("*.webm")
if not video_files:
print("No video files found in the current directory.")
exit(1)
video_file = video_files[0]
video_file = find_video_files()
en_srt = "output/english_subtitles.srt"
trans_srt = "output/translated_subtitles.srt"

Expand Down Expand Up @@ -52,15 +49,15 @@ def merge_subtitles_to_video():
output_video
]

print("Starting FFmpeg process... should take less than 10s for 2mins video.")
print("开始压制字幕到视频...")
start_time = time.time()
process = subprocess.Popen(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

try:
stdout, stderr = process.communicate(timeout=120)
if process.returncode == 0:
print(f"Process completed in {time.time() - start_time:.2f} seconds.")
print("🎉🎥 `output_video_with_subs.mp4` generated successfully! Go check it out inside `output` 👀")
print("🎉🎥 压制字幕到视频完成! 在 `output` 文件夹中查看 `output_video_with_subs.mp4` 👀")
else:
print("Error occurred during FFmpeg execution:")
print(stderr.decode())
Expand Down
4 changes: 2 additions & 2 deletions core/step8_extract_refer_audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@ def step8_main(input_video):
print("Full audio extracted and cleaned and saved as original_vocal.wav and background.wav") # 完整音频处理完成

if __name__ == "__main__":
import glob
input_video = (glob.glob("*.mp4") + glob.glob("*.webm"))[0]
from core.step1_ytdlp import find_video_files
input_video = find_video_files()
step8_main(input_video)
6 changes: 3 additions & 3 deletions st.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import streamlit as st
import os, glob, sys
import os, sys
from st_components.imports_and_utils import *
from st_components.download_video_section import download_video_section
from st_components.sidebar_setting import page_setting
Expand Down Expand Up @@ -42,7 +42,7 @@ def text_processing_section():
return True

def process_text():
video_file = (glob.glob("*.mp4") + glob.glob("*.webm"))[0]
video_file = step1_ytdlp.find_video_files()

with st.spinner("使用Whisper进行转录..."):
step2_whisper_stamped.transcript(video_file)
Expand Down Expand Up @@ -86,7 +86,7 @@ def audio_processing_section():
st.rerun()

def process_audio():
input_video = (glob.glob("*.mp4") + glob.glob("*.webm"))[0]
input_video = step1_ytdlp.find_video_files()

with st.spinner("提取音频..."):
step8_extract_refer_audio.step8_main(input_video)
Expand Down
37 changes: 19 additions & 18 deletions st_components/download_video_section.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
import streamlit as st
import os, glob, sys, shutil
import os, sys, shutil
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from st_components.imports_and_utils import step1_ytdlp
from core.step1_ytdlp import download_video_ytdlp, find_video_files

def download_video_section(cloud):
title1 = "上传视频 " if cloud else "下载或上传视频"
st.header(title1)
with st.container(border=True):
if not glob.glob("*.mp4") + glob.glob("*.webm"):
try:
video_file = find_video_files()
st.video(video_file)
if st.button(" 删除并重新选择 ", key="delete_video_button"):
os.remove(video_file)
if os.path.exists("output"):
shutil.rmtree("output")
st.rerun()
return True
except:
if not cloud:
url = st.text_input("输入YouTube链接:")
if st.button("下载视频", key="download_button", use_container_width=True):
if url:
with st.spinner("正在下载视频..."):
step1_ytdlp.download_video_ytdlp(url, save_path='./')
download_video_ytdlp(url)
st.rerun()

uploaded_file = st.file_uploader("或上传视频 <30min", type=["mp4", "webm"])
from config import ALLOWED_VIDEO_FORMATS
uploaded_file = st.file_uploader("或上传视频 <30min", type=ALLOWED_VIDEO_FORMATS)
if uploaded_file:
with open(os.path.join("./", uploaded_file.name), "wb") as f:
os.makedirs("output", exist_ok=True)
# 视频写入output文件夹
with open(os.path.join("output", uploaded_file.name), "wb") as f:
f.write(uploaded_file.getbuffer())
st.video(uploaded_file)
st.rerun()
else:
return False
else:
video_file = (glob.glob("*.mp4") + glob.glob("*.webm"))[0]
st.video(video_file)
if st.button(" 删除并重新选择 ", key="delete_video_button"):
os.remove(video_file)
if os.path.exists("output"):
shutil.rmtree("output")
st.rerun()
return True
return False

0 comments on commit c32e518

Please sign in to comment.