LaiTool/resources/scripts/iamge_to_video.py
2024-06-08 16:56:04 +08:00

495 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import cv2
import numpy as np
import os
import glob
import public_tools
import subprocess
import json
class ImageToVideo:
def __init__(self, gpu_type) -> None:
self.frames = 0
self.gpu_type = gpu_type
self.public_tools = public_tools.PublicTools()
self.ffmpeg_path = (
"../package/ffmpeg-2023-12-07-git-f89cff96d0-full_build/bin/ffmpeg"
)
self.ffprobe_path = (
"../package/ffmpeg-2023-12-07-git-f89cff96d0-full_build/bin/ffprobe"
)
pass
def create_video_from_image_with_center_offset(
self,
image_path,
duration,
start_offset,
end_offset,
fps=60,
video_size=(1440, 1080),
offest_type="KFTypePositionY",
):
"""
将图片合并成视频,并添加关键帧
"""
# 使用Python的open函数以二进制模式读取图片文件
with open(image_path, "rb") as file:
img_bytes = file.read()
# 将字节流解码成图片
img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_UNCHANGED)
img_resized = self.get_scaled_image(
img, video_size, offest_type, start_offset, end_offset
)
if offest_type == "KFTypePositionY":
video_name = self.create_video_from_image_Y(
image_path,
fps,
video_size,
duration,
start_offset,
end_offset,
img_resized,
)
elif offest_type == "KFTypePositionX":
video_name = self.create_video_from_image_X(
image_path,
fps,
video_size,
duration,
start_offset,
end_offset,
img_resized,
)
elif offest_type == "KFTypeScale":
video_name = self.create_video_from_image_scale(
image_path,
fps,
video_size,
duration,
start_offset,
end_offset,
img,
)
else:
return ValueError("关键帧没有设置正确的参数")
return video_name
def create_video_from_image_scale(
self,
image_path,
fps,
video_size,
duration,
start_scale,
end_scale,
img,
):
"""
缩放关键帧生成视频
"""
scale_width = video_size[0] / img.shape[1]
scale_height = video_size[1] / img.shape[0]
default_scale = max(scale_width, scale_height)
# 创建视频写入器
video_name = f"{image_path.split('/')[-1].split('.')[0]}.mp4"
out = cv2.VideoWriter(
video_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, video_size
)
total_frames = round(duration * fps)
self.frames += total_frames
# 计算偏移变化率
offset_change_per_frame = float(end_scale - start_scale) / total_frames // 2
if start_scale < 0:
start_scale = 0
current_scale = start_scale
for _ in range(int(duration * fps)):
# 创建一个空白画布
canvas = np.zeros((video_size[1], video_size[0], 3), dtype=np.uint8)
# 根据当前的缩放比例调整图片大小
img_resized = cv2.resize(
img,
None,
fx=default_scale + current_scale,
fy=default_scale + current_scale,
)
center_x, center_y = video_size[0] // 2, video_size[1] // 2
# 计算图片在画布上的绘制位置
start_x = center_x - img_resized.shape[1] // 2
start_y = center_y - img_resized.shape[0] // 2
# 安全检查,确保不会复制超出边界的区域
src_x1 = max(-start_x, 0)
dst_x1 = max(start_x, 0)
copy_width = min(img_resized.shape[1] - src_x1, video_size[0] - dst_x1)
src_y1 = max(-start_y, 0)
dst_y1 = max(start_y, 0)
copy_height = min(img_resized.shape[0] - src_y1, video_size[1] - dst_y1)
if copy_height > 0 and copy_width > 0:
canvas[dst_y1 : dst_y1 + copy_height, dst_x1 : dst_x1 + copy_width] = (
img_resized[
src_y1 : src_y1 + copy_height, src_x1 : src_x1 + copy_width
]
)
# 更新偏移量
current_scale += offset_change_per_frame
out.write(canvas)
out.release()
return video_name
def create_video_from_image_X(
self,
image_path,
fps,
video_size,
duration,
start_offset,
end_offset,
img_resized,
):
"""
左右关键帧生成视频
"""
# 创建视频写入器
video_name = f"{image_path.split('/')[-1].split('.')[0]}.mp4"
out = cv2.VideoWriter(
video_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, video_size
)
total_frames = round(duration * fps)
self.frames += total_frames
# 计算偏移变化率
offset_change_per_frame = float(end_offset - start_offset) / total_frames
current_offset = start_offset
scale = img_resized.shape[0] / video_size[1]
for _ in range(int(duration * fps)):
# 创建一个空白画布
canvas = np.zeros((video_size[1], video_size[0], 3), dtype=np.uint8)
center_x, center_y = video_size[0] // 2, video_size[1] // 2
# 计算当前帧的图片中心偏移位置
# offset_y = int(current_offset * scale) # 根据放大比例调整偏移量
offset_x = int(current_offset * 1) # 根据放大比例调整偏移量
# offset_y = int(((new_height - img.shape[0]) // 2) * 1)
# start_y = center_y - img_resized.shape[0] // 2 + offset_y
# 计算图片在画布上的绘制位置
start_x = center_x - img_resized.shape[1] // 2 + offset_x
start_y = center_y - img_resized.shape[0] // 2
# 安全检查,确保不会复制超出边界的区域
src_x1 = max(-start_x, 0)
dst_x1 = max(start_x, 0)
copy_width = min(img_resized.shape[1] - src_x1, video_size[0] - dst_x1)
# copy_width = min(video_size[0], img_resized.shape[1])
# 调整图片复制区域的计算
src_y1 = max(-start_y, 0)
dst_y1 = max(start_y, 0)
copy_height = min(img_resized.shape[0] - src_y1, video_size[1] - dst_y1)
# copy_width = min(copy_width, img_resized.shape[1])
if copy_height > 0 and copy_width > 0:
canvas[dst_y1 : dst_y1 + copy_height, dst_x1 : dst_x1 + copy_width] = (
img_resized[
src_y1 : src_y1 + copy_height, src_x1 : src_x1 + copy_width
]
)
# 更新偏移量
current_offset += offset_change_per_frame
out.write(canvas)
out.release()
return video_name
def create_video_from_image_Y(
self,
image_path,
fps,
video_size,
duration,
start_offset,
end_offset,
img_resized,
):
"""
上下关键帧生成视频
"""
# 创建视频写入器
video_name = f"{image_path.split('/')[-1].split('.')[0]}.mp4"
out = cv2.VideoWriter(
video_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, video_size
)
total_frames = round(duration * fps)
self.frames += total_frames
# 计算偏移变化率
offset_change_per_frame = float(end_offset - start_offset) / total_frames
current_offset = start_offset
scale = img_resized.shape[1] / video_size[0]
for _ in range(int(duration * fps)):
# 创建一个空白画布
canvas = np.zeros((video_size[1], video_size[0], 3), dtype=np.uint8)
center_x, center_y = video_size[0] // 2, video_size[1] // 2
# 计算当前帧的图片中心偏移位置
# offset_y = int(current_offset * scale) # 根据放大比例调整偏移量
offset_y = int(current_offset * 1) # 根据放大比例调整偏移量
# offset_x = int(((new_width - img.shape[1]) // 2) * 1)
# start_x = center_x - img_resized.shape[1] // 2 + offset_x
# 计算图片在画布上的绘制位置
start_x = center_x - img_resized.shape[1] // 2
start_y = center_y - img_resized.shape[0] // 2 + offset_y
# 安全检查,确保不会复制超出边界的区域
src_y1 = max(-start_y, 0)
dst_y1 = max(start_y, 0)
copy_height = min(img_resized.shape[0] - src_y1, video_size[1] - dst_y1)
# copy_width = min(video_size[0], img_resized.shape[1])
# 调整图片复制区域的计算
src_x1 = max(-start_x, 0)
dst_x1 = max(start_x, 0)
copy_width = min(img_resized.shape[1] - src_x1, video_size[0] - dst_x1)
if copy_height > 0 and copy_width > 0:
canvas[dst_y1 : dst_y1 + copy_height, dst_x1 : dst_x1 + copy_width] = (
img_resized[
src_y1 : src_y1 + copy_height, src_x1 : src_x1 + copy_width
]
)
# 更新偏移量
current_offset += offset_change_per_frame
out.write(canvas)
out.release()
return video_name
def get_sorted_images(self, folder_path, image_extensions=[".jpg", ".png"]):
"""
获取图片,排序
"""
# 构建一个匹配所有指定扩展名的模式
patterns = [os.path.join(folder_path, "*" + ext) for ext in image_extensions]
# 列表用于存储找到的图片文件
image_files = []
# 遍历所有模式,匹配文件
for pattern in patterns:
image_files.extend(glob.glob(pattern))
# 按文件名排序
image_files.sort()
return image_files
def get_scaled_image(self, img, video_size, offest_type, start_offest, end_offest):
"""
根据关键帧类型。获取当前图片的放大比例
"""
scale_width = video_size[0] / img.shape[1]
scale_height = video_size[1] / img.shape[0]
scale = max(scale_width, scale_height)
if offest_type == "KFTypePositionY":
# 检查最大偏移量是否大于图片高度
all_offset = abs(start_offest) + abs(end_offest) + video_size[1]
if all_offset > img.shape[0] * scale:
# if all_offset > img.shape[0]:
scale = max(scale, all_offset / img.shape[0])
max_offset = max(abs(start_offest), abs(end_offest))
if max_offset > img.shape[0]:
# 如果最大偏移量大于图片高度,则进一步放大图像
scale = max(scale, video_size[1] / (img.shape[0] - max_offset))
elif offest_type == "KFTypePositionX":
# 检查最大偏移量是否大于图片宽度
all_offset = abs(start_offest) + abs(end_offest) + video_size[0]
# 判断最大高度和当前图片当前放大倍率之间的大小
if all_offset > img.shape[1] * scale:
# if all_offset > img.shape[0]:
scale = max(scale, all_offset / img.shape[1])
max_offset = max(abs(start_offest), abs(end_offest))
if max_offset > img.shape[1]:
# 如果最大偏移量大于图片高度,则进一步放大图像
scale = max(scale, video_size[0] / (img.shape[1] - max_offset))
elif offest_type == "KFTypeScale":
pass
else:
return ValueError("关键帧没有设置正确的参数")
new_width = int(img.shape[1] * scale)
new_height = int(img.shape[0] * scale)
img_resized = cv2.resize(
img, (new_width, new_height), interpolation=cv2.INTER_LINEAR
)
return img_resized
def GenerateVideoAllImage(self, image_dir, offset, config_json):
"""
生成所有的图片
"""
config_data = config_json["srt_time_information"]
isDirection = False
sort_images = self.get_sorted_images(image_dir)
# 生成所有的图片视频
for image_file in sort_images:
filename = os.path.splitext(os.path.basename(image_file))[
0
] # 获取文件名,不包括扩展名
number = int(filename.split("_")[-1])
if number == 188:
print(number)
filtered_data = [item for item in config_data if item["no"] == number]
# 判断是不是空,空的话就跳过
if len(filtered_data) == 0:
return ValueError("没有找到对应的关键帧")
print(filtered_data)
video_arr = []
# 计算当前图片的偏移量以3200像素为基准
with open(image_file, "rb") as file:
img_bytes = file.read()
# 将字节流解码成图片
img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_UNCHANGED)
img_height, img_width = img.shape[:2]
proportion_height = img_height / 3200
proportion_width = img_height / 3200
key_frame = offset["name"]
offset_list = ["KFTypePositionY", "KFTypePositionX"]
real_key_frame = key_frame
if key_frame == "KFTypeRandom":
# 随机获取 offset_list 中的一个数据
real_key_frame = offset_list[np.random.randint(0, 2)]
if real_key_frame == "KFTypePositionY":
offsetValue = offset["up_down"] * proportion_height
elif real_key_frame == "KFTypePositionX":
offsetValue = offset["left_right"] * proportion_width
elif real_key_frame == "KFTypeScale":
offsetValue = offset["scale"]
else:
return ValueError("关键帧没有设置正确的参数")
# offsetValue = offset
if isDirection:
start_offset = offsetValue
end_offset = -offsetValue
isDirection = False
else:
start_offset = -offsetValue
end_offset = offsetValue
isDirection = True
video_path = self.create_video_from_image_with_center_offset(
image_file,
(filtered_data[0]["end_time"] - filtered_data[0]["start_time"]) / 1000,
start_offset,
end_offset,
self.fps,
self.video_size,
real_key_frame,
)
video_arr.append(video_path)
print(video_path)
# 微调所有的视频
mp4_folder = self.public_tools.list_files_by_extension(image_dir, ".mp4")
for mp4_path in mp4_folder:
filename = os.path.splitext(os.path.basename(mp4_path))[
0
] # 获取文件名,不包括扩展名
number = int(filename.split("_")[-1])
if number == 188:
print(number)
filtered_data = [item for item in config_data if item["no"] == number]
# print(filtered_data)
cmd = [
self.ffprobe_path,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=duration",
"-of",
"json",
mp4_path,
]
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
duration_sec = json.loads(result.stdout)["streams"][0]["duration"]
duration_ms = int(float(duration_sec) * 1000) # 将秒转换为毫秒
print(
duration_ms,
(filtered_data[0]["end_time"] - filtered_data[0]["start_time"]),
)
temp_mp4_path = os.path.join(image_dir, "temp_" + str(number) + ".mp4")
# 开始微调
cmd = []
cmd.append(self.ffmpeg_path)
cmd.append("-i")
cmd.append(mp4_path)
cmd.append("-filter:v")
cmd.append(
"setpts=PTS*"
+ str(
(filtered_data[0]["end_time"] - filtered_data[0]["start_time"])
/ duration_ms
)
)
cmd.append("-c:v")
if self.gpu_type == "NVIDIA":
cmd.append("h264_nvenc")
elif self.gpu_type == "AMD":
cmd.append("h264_amf")
else:
cmd.append("libx264")
cmd.append("-preset")
cmd.append("fast")
cmd.append("-rc:v")
cmd.append("cbr")
cmd.append("-b:v")
cmd.append(str(self.bitRate) + "k")
cmd.append(temp_mp4_path)
cmd.append("-loglevel")
cmd.append("error")
cmd.append("-an")
subprocess.run(cmd, check=True)
os.remove(mp4_path)
os.rename(temp_mp4_path, mp4_path)
print(self.frames)