568 lines
21 KiB
Python
Raw Normal View History

2024-05-15 12:57:15 +08:00
# -*- coding: utf-8 -*-
import io
import os
import sys
import public_tools
import random
import subprocess
import uuid
import json
from moviepy.editor import AudioFileClip, VideoFileClip
from pydub import AudioSegment
import iamge_to_video
# sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
MATERIAL_FOLDER = "Y:\\D\\素材\\素材,自选\\做菜2"
class Clip:
"""
剪辑合并的类
"""
2024-06-01 15:08:22 +08:00
def __init__(self, cript_directory, config_path, gpu_type) -> None:
2024-05-15 12:57:15 +08:00
self.audio_duration = None
self.cript_directory = cript_directory
self.ID = str(uuid.uuid4())
self.ASS_ID = str(uuid.uuid4())
self.TEMP_FOLDER = os.path.join(cript_directory, "Temp\\" + self.ID)
self.ASS_FILE_PATH = os.path.join(
self.TEMP_FOLDER, self.ASS_ID + ".ass"
).replace("\\", "/")
self.config_path = config_path
2024-06-01 15:08:22 +08:00
self.gpu_type = gpu_type
2024-05-15 12:57:15 +08:00
self.ffmpeg_path = (
2024-06-27 16:24:41 +08:00
"../package/ffmpeg/win/ffmpeg"
2024-05-15 12:57:15 +08:00
)
self.ffprobe_path = (
2024-06-27 16:24:41 +08:00
"../package/ffmpeg/win/ffprobe"
2024-05-15 12:57:15 +08:00
)
self.getInitData()
pass
def getInitData(self):
"""
初始化数据的方法
"""
self.public_tools = public_tools.PublicTools()
with open(self.config_path, "r", encoding="utf-8") as file:
self.config_json = json.load(file)
self.srt_config = self.config_json["srt_config"]
self.image_folder = self.config_json["image_folder"]
self.video_resolution_x = self.config_json["video_resolution_x"]
self.video_resolution_y = self.config_json["video_resolution_y"]
self.background_music_folder = self.config_json["background_music_folder"]
self.srt_file_path = self.config_json["srt_path"]
self.audio_path = self.config_json["audio_path"]
self.mp4_file_txt = self.config_json["mp4_file_txt"]
self.outpue_file = self.config_json["outpue_file"].replace("\\", "/")
self.srt_style = self.config_json["srt_style"]
self.friendly_reminder = self.config_json["friendly_reminder"]
self.audio_sound_size = self.config_json["audio_sound_size"]
self.background_music_sound_size = self.config_json[
"background_music_sound_size"
]
self.keyFrame = self.config_json["keyFrame"]
self.frameRate = (self.config_json["frameRate"],)
self.bitRate = self.config_json["bitRate"]
# 图片数据
2024-06-01 15:08:22 +08:00
self.iamge_to_video = iamge_to_video.ImageToVideo(self.gpu_type)
2024-05-15 12:57:15 +08:00
self.iamge_to_video.fps = self.config_json["frameRate"]
self.iamge_to_video.video_size = (
self.video_resolution_x,
self.video_resolution_y,
)
self.iamge_to_video.bitRate = self.bitRate
# 随机背景音乐
if self.background_music_folder == "":
pass
else:
backgroun_music_list1 = self.public_tools.list_files_by_extension(
self.background_music_folder, ".mp3"
)
backgroun_music_list2 = self.public_tools.list_files_by_extension(
self.background_music_folder, ".wav"
)
backgroun_music_list = backgroun_music_list1 + backgroun_music_list2
self.background_music = random.choice(backgroun_music_list)
# 修改状态
self.config_json["status"] = "generating"
with open(self.config_path, "w", encoding="utf-8") as file:
json.dump(self.config_json, file, ensure_ascii=False, indent=4)
pass
# 获取srt配置文件并修改srt数据
def getConfigJson(self):
with open(self.srt_config, "r", encoding="utf-8") as file:
self.config_json = json.load(file)
# 修改最后的视频的长度
# 获取音频的时间
self.GetAudioDuration()
# print(self.audio_duration)
# 直接修改
self.config_json["srt_time_information"][
len(self.config_json["srt_time_information"]) - 1
]["end_time"] = self.audio_duration
for i in range(len(self.config_json["srt_time_information"])):
if i == 0:
self.config_json["srt_time_information"][i]["start_time"] = 0
elif i == len(self.config_json["srt_time_information"]) - 1:
pass
else:
# 将当前的 end_time 设置为 下一个的 start_time最后一个数据不设置
self.config_json["srt_time_information"][i]["end_time"] = (
self.config_json["srt_time_information"][i + 1]["start_time"]
)
# 将AI生成的图片生成一个个的小视频
def ImageToOneVideo(self):
self.getConfigJson()
self.iamge_to_video.GenerateVideoAllImage(
self.image_folder, self.keyFrame, self.config_json
)
# 将生成的所有的mp4文件路径写入到txt文件中
self.mp4_files = self.public_tools.list_files_by_extension(
self.image_folder, ".mp4"
)
self.mp4_files.sort()
if os.path.exists(self.mp4_file_txt):
os.remove(self.mp4_file_txt)
# 打开文件并写入数据
with open(self.mp4_file_txt, "w", encoding="utf-8") as file:
for line in self.mp4_files:
# 写入每行数据,并在每行末尾添加换行符
file.write("file '" + line + "'\n")
# print(self.mp4_files)
pass
# 获取音视频的时间长短
def GetAudioVideoDuration(self, path):
try:
# 获取文件的后缀
type = os.path.splitext(path)[1]
if type.upper() == ".MP3":
mp3_audio = AudioFileClip(path)
mp3_duration = mp3_audio.duration * 1000
mp3_audio.close()
return mp3_duration
elif type.upper() == ".WAV":
wav_audio = AudioFileClip(path)
wav_duration = wav_audio.duration * 1000
wav_audio.close()
return wav_duration
elif type.upper() == ".MP4":
video = VideoFileClip(path)
video_duration = video.duration * 1000
video.close()
return video_duration
else:
raise ValueError("参数类型错误")
except Exception as e:
raise ValueError(e)
# 创建临时文件夹
def CreateTempFolder(self):
try:
# 判断Temp文件是不是存在
this_path = self.cript_directory
isExitTemp = public_tools.check_if_folder_exists(this_path, "Temp")
temp = os.path.join(this_path, "Temp")
if not isExitTemp:
os.makedirs(os.path.join(this_path, "Temp"), exist_ok=True)
isExitTemp = public_tools.check_if_folder_exists(
os.path.join(this_path, "Temp"), self.ID
)
if not isExitTemp:
os.makedirs(os.path.join(temp, self.ID), exist_ok=True)
except Exception as e:
raise ValueError(e)
# 获取音频的时间
def GetAudioDuration(self):
if self.audio_duration:
pass
else:
self.audio_duration = self.GetAudioVideoDuration(self.audio_path)
# 匹配视频返回一个txt文件地址
def AddVideoToList(self):
video_duration = 0
IsContinue = True
txt_list = []
# 获取音频的时长
self.GetAudioDuration()
# print("音频时长:", self.audio_duration)
# 创建临时文件夹
self.CreateTempFolder()
# 开始匹配时长,从素材文件夹中
material_list = self.public_tools.list_files_by_extension(
MATERIAL_FOLDER, ".mp4"
)
# 开始处理素材随机获取计算时间生成tmp和txt文件
# print(random.choice(material_list))
i = 0
while True:
random_path = random.choice(material_list)
# print(random_path)
# 对视频进行处理(加速,去头,去尾,镜像,放大,静音)
output_file = os.path.join(self.TEMP_FOLDER, str(i) + ".mp4")
start_time = 5000 # 开头删除5秒
total_duration = self.GetAudioVideoDuration(
random_path
) # 假设视频总时长为60秒
end_time = 3000 # 结尾删除5秒
# 计算出需要保留的部分的长度
duration = total_duration - start_time - end_time
if duration <= 0:
duration = total_duration
video_duration += duration
if video_duration > self.audio_duration:
duration = duration - (video_duration - self.audio_duration)
IsContinue = False
start_time = f"{int(start_time // 3600000):02d}:{int((start_time % 3600000) // 60000):02d}:{(start_time % 60000) / 1000:.3f}"
duration = f"{int(duration // 3600000):02d}:{int((duration % 3600000) // 60000):02d}:{(duration % 60000) / 1000:.3f}"
# 添加txtlist表
txt_list.append(f"file '{os.path.abspath(output_file)}'")
# 删除开头,结尾,镜像,加速,放大
2024-06-01 15:08:22 +08:00
command = []
command.append(self.ffmpeg_path)
if self.gpu_type == "NVIDIA":
command.append("-hwaccel")
command.append("cuda") # 启用 CUDA 硬件加速
command.append("-c:v")
command.append("h264_cuvid") # 使用 NVIDIA CUVID 解码器进行解码
elif self.gpu_type == "AMD":
command.append("-c:v")
2024-06-08 16:56:04 +08:00
command.append("h264")
2024-06-01 15:08:22 +08:00
command.append("-i")
command.append(random_path)
command.append("-ss")
command.append(str(start_time))
command.append("-t")
command.append(str(duration))
command.append("-vf")
command.append(
"hflip,setpts=1*PTS,format=yuv420p,scale=iw*1.1:ih*1.1,crop=iw/1.1:ih/1.1"
)
command.append("-an")
command.append("-b:v")
command.append("5000k")
command.append("-c:v")
if self.gpu_type == "NVIDIA":
command.append("h264_nvenc")
elif self.gpu_type == "AMD":
2024-06-08 16:56:04 +08:00
command.append("h264_amf")
2024-06-01 15:08:22 +08:00
else:
command.append("libx264")
command.append("-preset")
command.append("fast")
command.append("-loglevel")
command.append("error")
command.append(output_file)
2024-05-15 12:57:15 +08:00
subprocess.run(
2024-06-01 15:08:22 +08:00
command,
2024-05-15 12:57:15 +08:00
check=True,
stderr=subprocess.PIPE,
)
i += 1
if not IsContinue:
self.vide_list_txt = os.path.join(
self.TEMP_FOLDER, str(uuid.uuid4()) + ".txt"
)
with open(self.vide_list_txt, "w") as f:
for item in txt_list:
f.write(str(item) + "\n")
break
# 处理音频
def ModifyBackGroundMusic(self):
self.GetAudioDuration()
self.CreateTempFolder()
background_music = []
# 先处理背景音乐,时间超长裁剪,不够长循环
background_music1 = self.public_tools.list_files_by_extension(
self.background_music_folder, ".mp3"
)
background_music2 = self.public_tools.list_files_by_extension(
self.background_music_folder, ".wav"
)
background_music = background_music1 + background_music2
self.background_music_path = random.choice(background_music)
# print("背景音乐" + self.background_music_path)
# 如果输入文件不是WAV需要先转换为WAV
if not self.background_music_path.lower().endswith(".wav"):
audio = AudioSegment.from_file(self.background_music_path).export(
format="wav"
)
audio = AudioSegment.from_wav(audio)
else:
audio = AudioSegment.from_wav(self.background_music_path)
# 获取背景音乐的长度
duration_background_music = len(audio)
# 如果音频长度超过指定长度,则裁剪
if duration_background_music > self.audio_duration:
audio = audio[: self.audio_duration]
# 如果音频长度小于指定长度,则循环补齐
else:
while duration_background_music < self.audio_duration:
audio += audio
duration_background_music = len(audio)
audio = audio[: self.audio_duration]
# 将背景音乐写入到Temp文件中
self.background_music_path = os.path.join(
self.TEMP_FOLDER, str(uuid.uuid4()) + ".wav"
)
audio.export(self.background_music_path, format="wav")
# 合并两个音乐
def Merge_Audio(
self, audio_path1, background_music_path, audio_db, backgroud_music_db
):
# 转换格式
def convert_to_wav(audio_path):
"""如果音频不是WAV格式则转换为WAV格式"""
if not audio_path.lower().endswith(".wav"):
sound = AudioSegment.from_file(audio_path)
audio_path = audio_path.rsplit(".", 1)[0] + ".wav" # 创建新WAV文件路径
sound.export(audio_path, format="wav") # 导出为WAV格式
return AudioSegment.from_wav(audio_path)
self.mix_audio = os.path.join(self.TEMP_FOLDER, str(uuid.uuid4()) + ".wav")
# 转换音频格式为WAV并加载
audio1 = convert_to_wav(audio_path1)
audio2 = convert_to_wav(background_music_path)
# 调整音频文件的分贝量
adjusted_audio1 = audio1 + audio_db # 第一个音频的音量增加db_change1分贝
adjusted_audio2 = (
audio2 + backgroud_music_db
) # 第二个音频的音量增加db_change2分贝
# 合并音频
combined_audio = adjusted_audio1.overlay(adjusted_audio2)
# 导出合并后的音频为WAV格式
combined_audio.export(self.mix_audio, format="wav")
# 处理字幕样式使用ass
def ConvertSubtitles(
self,
font_name="文悦新青年体 (非商用) W8",
font_size="80",
primary_colour="&H0CE0F9",
alignment="5",
positionX=0,
positionY=0,
):
# 将srt转换为ass
subprocess.run(
[
self.ffmpeg_path,
"-i",
self.srt_file_path,
self.ASS_FILE_PATH,
"-loglevel",
"error",
],
check=True,
stderr=subprocess.PIPE,
)
modified_lines = [] # 创建一个新的列表来保存修改后的行
# 修改字幕
with open(self.ASS_FILE_PATH, "r", encoding="utf-8") as file:
lines = file.readlines()
for line in lines:
# 修改分辨率
if line.startswith("PlayResX:"):
line = "PlayResX: " + str(self.video_resolution_x) + "\n"
if line.startswith("PlayResY:"):
line = "PlayResY: " + str(self.video_resolution_y) + "\n"
if line.startswith("ScaledBorderAndShadow:"):
line = "ScaledBorderAndShadow: no" + "\n"
if line.startswith("Style:"):
parts = line.split(",")
# 修改样式设置
parts[1] = font_name # 字体名字
parts[2] = str(font_size) # 字体大小
parts[3] = "&H" + primary_colour # 字体颜色
parts[4] = "&H" + primary_colour # 第二颜色
parts[5] = "&H0" # OutlineColour
parts[6] = "&H0" # BackColour
parts[7] = "0" # Bold
parts[8] = "0" # Italic
parts[9] = "0" # Underline
parts[10] = "0" # StrikeOut
parts[11] = "100" # ScaleX 缩放X
parts[12] = "100" # ScaleY 缩放Y
parts[13] = "0" # Spacing
parts[14] = "0" # Angle
parts[15] = "0" # BorderStyle
parts[16] = "1" # Outline
parts[17] = "0" # Shadow
parts[18] = alignment # Alignment
parts[19] = "0" # MarginL
parts[20] = "0" # MarginR
parts[21] = "0" # MarginV
parts[22] = "1" # Encoding
line = ",".join(parts)
# 添加坐标信息
if line.startswith("Dialogue:"):
texts = line.split(",", 9)
text = texts[9]
# 检查文本中是否已经包含\pos标签
if "\\pos(" not in text:
text = "{\\pos(%d,%d)}" % (positionX, positionY) + text
texts[9] = text
line = ",".join(texts)
modified_lines.append(line) # 添加修改后的行到新列表
# 添加全局水印并设置和视频的时长一直
# Dialogue: 0,0:00:01.00,0:00:05.00,Default,,0,0,0,,{\fs24\an8\alpha80}这是一段示例文本。
modified_lines.append(
"Dialogue: 0,0:00:00.00,"
+ public_tools.format_time_ms(self.audio_duration)
+ ",Default,,0,0,0,,"
+ "{\\pos("
+ str(self.friendly_reminder["positionX"])
+ ","
+ str(self.friendly_reminder["positionY"])
+ ")\\fs"
+ str(self.friendly_reminder["fontSize"])
+ "\\alpha&H"
+ str(self.friendly_reminder["transparent"])
+ "&\\fn"
+ self.friendly_reminder["fontName"]
+ "\\c"
+ self.public_tools.convert_rrggbb_to_bbggrr(
self.friendly_reminder["fontColor"]
)
+ "}"
+ self.friendly_reminder["showText"]
)
with open(self.ASS_FILE_PATH, "w", encoding="utf-8") as file:
file.writelines(modified_lines)
# 合并视频并添加音乐和字幕
def MergeVideoAndAudio(self):
2024-06-01 15:08:22 +08:00
command = []
command.append(self.ffmpeg_path)
command.append("-f")
command.append("concat")
command.append("-safe")
command.append("0")
command.append("-i")
command.append(self.mp4_file_txt)
command.append("-i")
command.append(self.mix_audio)
command.append("-vf")
command.append(f"subtitles=./Temp/{self.ID}/{self.ASS_ID}.ass")
command.append("-c:v")
if self.gpu_type == "NVIDIA":
command.append("h264_nvenc")
elif self.gpu_type == "AMD":
2024-06-08 16:56:04 +08:00
command.append("h264_amf")
2024-06-01 15:08:22 +08:00
else:
command.append("libx264")
command.append("-preset")
command.append("fast")
command.append("-rc:v")
command.append("cbr")
command.append("-b:v")
command.append(str(self.bitRate) + "k")
command.append("-c:a")
command.append("aac")
command.append("-strict")
command.append("-2")
command.append("-loglevel")
command.append("error")
command.append(self.outpue_file)
2024-05-15 12:57:15 +08:00
subprocess.run(command, check=True, stderr=subprocess.PIPE)
# subprocess.run(command)
pass
def DeleteFile(self):
"""
删除已经存在的视频文件
"""
# 删除图片目录下面的所有的MP4文件
out_mp4_list = self.public_tools.list_files_by_extension(
self.image_folder, ".mp4"
)
for mp4 in out_mp4_list:
self.public_tools.delete_path(mp4)
# 删除输出文件
self.public_tools.delete_path(self.outpue_file)
pass
# 合并视频并添加剪辑
def MergeVideosAndClip(self):
# 删除文件
self.DeleteFile()
# 将图片生成视频
self.ImageToOneVideo()
# # 处理音乐
self.ModifyBackGroundMusic()
# # 合并音乐
self.Merge_Audio(
self.audio_path,
self.background_music_path,
self.audio_sound_size,
self.background_music_sound_size,
)
# # 素材处理
# self.AddVideoToList()
# 字幕处理
self.ConvertSubtitles(
self.srt_style["fontName"],
str(self.srt_style["fontSize"] * (self.video_resolution_x / 1440)),
self.public_tools.convert_rrggbb_to_bbggrr(self.srt_style["fontColor"]),
"5",
self.srt_style["positionX"] * (self.video_resolution_x / 1440),
self.srt_style["positionY"] * (self.video_resolution_y / 1080),
)
# 视频合并
self.MergeVideoAndAudio()
# 删除临时文件夹
self.public_tools.delete_path(self.TEMP_FOLDER)
# 删除临时的视频
del_mp4_list = self.public_tools.list_files_by_extension(
self.image_folder, ".mp4"
)
for f in del_mp4_list:
self.public_tools.delete_path(f)
pass