# pip install scenedetect opencv-python -i https://pypi.tuna.tsinghua.edu.cn/simple from scenedetect.video_manager import VideoManager from scenedetect.scene_manager import SceneManager from scenedetect.stats_manager import StatsManager from scenedetect.detectors.content_detector import ContentDetector import os import sys import json import subprocess # from huggingface_hub import hf_hub_download # from faster_whisper import WhisperModel import public_tools from pathlib import Path # 获取智能画面分割的时间或者秒数 def find_scenes(video_path, sensitivity): print( "正在计算分镜数据" + "sensitivity:" + str(sensitivity) + "path : " + video_path ) sys.stdout.flush() video_manager = VideoManager([video_path]) stats_manager = StatsManager() scene_manager = SceneManager(stats_manager) # 使用contect-detector scene_manager.add_detector(ContentDetector(threshold=float(sensitivity))) shijian_list = [] try: video_manager.set_downscale_factor() video_manager.start() scene_manager.detect_scenes(frame_source=video_manager) scene_list = scene_manager.get_scene_list() print("分镜数据列表:") sys.stdout.flush() for i, scene in enumerate(scene_list): shijian_list.append([scene[0].get_timecode(), scene[1].get_timecode()]) print( "Scene %2d: Start %s / Frame %d, End %s / Frame %d" % ( i + 1, scene[0].get_timecode(), scene[0].get_frames(), scene[1].get_timecode(), scene[1].get_frames(), ) ) sys.stdout.flush() finally: video_manager.release() return shijian_list # 如果不存在就创建 def createDir(file_dir): # 如果不存在文件夹,就创建 if not os.path.isdir(file_dir): os.mkdir(file_dir) # 切分一个视频 def ClipVideo(video_path, out_folder, image_out_folder, sensitivity, gpu_type): shijian_list = find_scenes(video_path, sensitivity) # 多组时间列表 shijian_list_len = len(shijian_list) print("总共有%s个场景" % str(shijian_list_len)) sys.stdout.flush() video_list = [] for i in range(0, shijian_list_len): start_time_str = shijian_list[i][0] end_time_str = shijian_list[i][1] print("开始输出第" + str(i + 1) + "个分镜") video_name = "{:05d}".format(i + 1) out_video_file = os.path.join(out_folder, video_name + ".mp4") sys.stdout.flush() video_list.append( { "start_time_str": start_time_str, "end_time_str": end_time_str, "out_video_file": out_video_file, "video_name": video_name, } ) # 使用 ffmpeg 裁剪视频 command = [] command.append("ffmpeg") command.append("-i") command.append(video_path) command.append("-ss") command.append(start_time_str) command.append("-to") command.append(end_time_str) command.append("-c:v") if gpu_type == "NVIDIA": command.append("h264_nvenc") elif gpu_type == "AMD": command.append("h264_amf") else: command.append("libx264") command.append("-preset") command.append("fast") command.append("-c:a") command.append("copy") command.append(out_video_file) command.append("-loglevel") command.append("error") subprocess.run( command, check=True, stderr=subprocess.PIPE, ) print("分镜输出完成。开始抽帧") sys.stdout.flush() for vi in video_list: h, m, s = vi["start_time_str"].split(":") start_seconds = int(h) * 3600 + int(m) * 60 + float(s) h, m, s = vi["end_time_str"].split(":") end_seconds = int(h) * 3600 + int(m) * 60 + float(s) print("正在抽帧:" + vi["video_name"]) sys.stdout.flush() subprocess.run( [ "ffmpeg", "-ss", str((end_seconds - start_seconds) / 2), "-i", vi["out_video_file"], "-frames:v", "1", os.path.join(image_out_folder, vi["video_name"] + ".png"), "-loglevel", "error", ] ) print("抽帧完成,开始识别文案") sys.stdout.flush() return video_list def SplitAudio(video_out_folder, video_list): # ffmpeg -i input_file.mp4 -vn -ab 128k output_file.mp3 print("正在分离音频!!") mp3_list = [] sys.stdout.flush() for v in video_list: mp3_path = os.path.join(video_out_folder, v["video_name"] + ".mp3") mp3_list.append(mp3_path) subprocess.run( [ "ffmpeg", "-i", v["out_video_file"], "-vn", "-ab", "128k", mp3_path, "-loglevel", "error", ], check=True, ) return mp3_list # def GetText(out_folder, mp3_list): # text = [] # # 先获取模型 # print("正在下载或加载模型") # sys.stdout.flush() # model_path = Path( # hf_hub_download(repo_id="Systran/faster-whisper-large-v3", filename="model.bin") # ) # hf_hub_download( # repo_id="Systran/faster-whisper-large-v3", # filename="config.json", # ) # hf_hub_download( # repo_id="Systran/faster-whisper-large-v3", # filename="preprocessor_config.json", # ) # hf_hub_download( # repo_id="Systran/faster-whisper-large-v3", # filename="tokenizer.json", # ) # hf_hub_download( # repo_id="Systran/faster-whisper-large-v3", # filename="vocabulary.json", # ) # model = WhisperModel( # model_size_or_path=os.path.dirname(model_path), # device="auto", # local_files_only=True, # ) # print("模型加载成功,开始识别") # sys.stdout.flush() # for mp in mp3_list: # segments, info = model.transcribe( # mp, # beam_size=5, # language="zh", # vad_filter=True, # vad_parameters=dict(min_silence_duration_ms=1000), # ) # tmp_text = "" # for segment in segments: # tmp_text += segment.text + "。" # print(mp + "识别完成") # sys.stdout.flush() # text.append(tmp_text) # # 数据写出 # print("文本全部识别成功,正在写出") # sys.stdout.flush() # tools = public_tools.PublicTools() # tools.write_to_file(text, os.path.join(out_folder, "文案.txt")) # print("写出完成") # sys.stdout.flush() # def GetTextTask(out_folder, mp, name): # text = [] # # 先获取模型 # print("正在下载或加载模型") # sys.stdout.flush() # model_path = Path( # hf_hub_download(repo_id="Systran/faster-whisper-large-v3", filename="model.bin") # ) # hf_hub_download( # repo_id="Systran/faster-whisper-large-v3", # filename="config.json", # ) # hf_hub_download( # repo_id="Systran/faster-whisper-large-v3", # filename="preprocessor_config.json", # ) # hf_hub_download( # repo_id="Systran/faster-whisper-large-v3", # filename="tokenizer.json", # ) # hf_hub_download( # repo_id="Systran/faster-whisper-large-v3", # filename="vocabulary.json", # ) # model = WhisperModel( # model_size_or_path=os.path.dirname(model_path), # device="auto", # local_files_only=True, # ) # print("模型加载成功,开始识别") # sys.stdout.flush() # segments, info = model.transcribe( # mp, # beam_size=5, # language="zh", # vad_filter=True, # vad_parameters=dict(min_silence_duration_ms=1000), # ) # tmp_text = "" # for segment in segments: # tmp_text += segment.text + "。" # print(mp + "识别完成") # sys.stdout.flush() # text.append(tmp_text) # # 数据写出 # sys.stdout.flush() # tools = public_tools.PublicTools() # tools.write_to_file(text, os.path.join(out_folder, name + ".txt")) # sys.stdout.flush() def get_fram(video_path, out_path, sensitivity): try: shijian_list = find_scenes(video_path, sensitivity) # 多组时间列表 print("总共有%s个场景" % str(len(shijian_list))) print("开始输出json") print(shijian_list) # 将数组中的消息写道json文件中 with open(out_path, "w") as file: # 将数组写入到指定的json文件 json.dump(shijian_list, file) print("输出完成") except Exception as e: print("出现错误" + str(e)) exit(0) # def init(video_path, video_out_folder, image_out_folder, sensitivity, gpu_type): # v_l = ClipVideo( # video_path, video_out_folder, image_out_folder, sensitivity, gpu_type # ) # # 开始分离音频 # m_l = SplitAudio(video_out_folder, v_l) # # 开始识别字幕 # GetText(os.path.dirname(video_out_folder), m_l)