495 lines
17 KiB
Python
495 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
||
import cv2
|
||
import numpy as np
|
||
import os
|
||
import glob
|
||
import public_tools
|
||
import subprocess
|
||
import json
|
||
|
||
|
||
class ImageToVideo:
|
||
def __init__(self, gpu_type) -> None:
|
||
self.frames = 0
|
||
self.gpu_type = gpu_type
|
||
self.public_tools = public_tools.PublicTools()
|
||
self.ffmpeg_path = (
|
||
"../package/ffmpeg-2023-12-07-git-f89cff96d0-full_build/bin/ffmpeg"
|
||
)
|
||
self.ffprobe_path = (
|
||
"../package/ffmpeg-2023-12-07-git-f89cff96d0-full_build/bin/ffprobe"
|
||
)
|
||
pass
|
||
|
||
def create_video_from_image_with_center_offset(
|
||
self,
|
||
image_path,
|
||
duration,
|
||
start_offset,
|
||
end_offset,
|
||
fps=60,
|
||
video_size=(1440, 1080),
|
||
offest_type="KFTypePositionY",
|
||
):
|
||
"""
|
||
将图片合并成视频,并添加关键帧
|
||
"""
|
||
# 使用Python的open函数以二进制模式读取图片文件
|
||
with open(image_path, "rb") as file:
|
||
img_bytes = file.read()
|
||
|
||
# 将字节流解码成图片
|
||
img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_UNCHANGED)
|
||
|
||
img_resized = self.get_scaled_image(
|
||
img, video_size, offest_type, start_offset, end_offset
|
||
)
|
||
|
||
if offest_type == "KFTypePositionY":
|
||
video_name = self.create_video_from_image_Y(
|
||
image_path,
|
||
fps,
|
||
video_size,
|
||
duration,
|
||
start_offset,
|
||
end_offset,
|
||
img_resized,
|
||
)
|
||
elif offest_type == "KFTypePositionX":
|
||
video_name = self.create_video_from_image_X(
|
||
image_path,
|
||
fps,
|
||
video_size,
|
||
duration,
|
||
start_offset,
|
||
end_offset,
|
||
img_resized,
|
||
)
|
||
elif offest_type == "KFTypeScale":
|
||
video_name = self.create_video_from_image_scale(
|
||
image_path,
|
||
fps,
|
||
video_size,
|
||
duration,
|
||
start_offset,
|
||
end_offset,
|
||
img,
|
||
)
|
||
else:
|
||
return ValueError("关键帧没有设置正确的参数")
|
||
|
||
return video_name
|
||
|
||
def create_video_from_image_scale(
|
||
self,
|
||
image_path,
|
||
fps,
|
||
video_size,
|
||
duration,
|
||
start_scale,
|
||
end_scale,
|
||
img,
|
||
):
|
||
"""
|
||
缩放关键帧生成视频
|
||
"""
|
||
|
||
scale_width = video_size[0] / img.shape[1]
|
||
scale_height = video_size[1] / img.shape[0]
|
||
default_scale = max(scale_width, scale_height)
|
||
|
||
# 创建视频写入器
|
||
video_name = f"{image_path.split('/')[-1].split('.')[0]}.mp4"
|
||
out = cv2.VideoWriter(
|
||
video_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, video_size
|
||
)
|
||
total_frames = round(duration * fps)
|
||
self.frames += total_frames
|
||
# 计算偏移变化率
|
||
offset_change_per_frame = float(end_scale - start_scale) / total_frames // 2
|
||
if start_scale < 0:
|
||
start_scale = 0
|
||
|
||
current_scale = start_scale
|
||
|
||
for _ in range(int(duration * fps)):
|
||
# 创建一个空白画布
|
||
canvas = np.zeros((video_size[1], video_size[0], 3), dtype=np.uint8)
|
||
|
||
# 根据当前的缩放比例调整图片大小
|
||
img_resized = cv2.resize(
|
||
img,
|
||
None,
|
||
fx=default_scale + current_scale,
|
||
fy=default_scale + current_scale,
|
||
)
|
||
|
||
center_x, center_y = video_size[0] // 2, video_size[1] // 2
|
||
|
||
# 计算图片在画布上的绘制位置
|
||
start_x = center_x - img_resized.shape[1] // 2
|
||
start_y = center_y - img_resized.shape[0] // 2
|
||
|
||
# 安全检查,确保不会复制超出边界的区域
|
||
src_x1 = max(-start_x, 0)
|
||
dst_x1 = max(start_x, 0)
|
||
copy_width = min(img_resized.shape[1] - src_x1, video_size[0] - dst_x1)
|
||
|
||
src_y1 = max(-start_y, 0)
|
||
dst_y1 = max(start_y, 0)
|
||
copy_height = min(img_resized.shape[0] - src_y1, video_size[1] - dst_y1)
|
||
|
||
if copy_height > 0 and copy_width > 0:
|
||
canvas[dst_y1 : dst_y1 + copy_height, dst_x1 : dst_x1 + copy_width] = (
|
||
img_resized[
|
||
src_y1 : src_y1 + copy_height, src_x1 : src_x1 + copy_width
|
||
]
|
||
)
|
||
|
||
# 更新偏移量
|
||
current_scale += offset_change_per_frame
|
||
out.write(canvas)
|
||
out.release()
|
||
return video_name
|
||
|
||
def create_video_from_image_X(
|
||
self,
|
||
image_path,
|
||
fps,
|
||
video_size,
|
||
duration,
|
||
start_offset,
|
||
end_offset,
|
||
img_resized,
|
||
):
|
||
"""
|
||
左右关键帧生成视频
|
||
"""
|
||
# 创建视频写入器
|
||
video_name = f"{image_path.split('/')[-1].split('.')[0]}.mp4"
|
||
out = cv2.VideoWriter(
|
||
video_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, video_size
|
||
)
|
||
total_frames = round(duration * fps)
|
||
self.frames += total_frames
|
||
# 计算偏移变化率
|
||
offset_change_per_frame = float(end_offset - start_offset) / total_frames
|
||
current_offset = start_offset
|
||
scale = img_resized.shape[0] / video_size[1]
|
||
|
||
for _ in range(int(duration * fps)):
|
||
# 创建一个空白画布
|
||
canvas = np.zeros((video_size[1], video_size[0], 3), dtype=np.uint8)
|
||
|
||
center_x, center_y = video_size[0] // 2, video_size[1] // 2
|
||
|
||
# 计算当前帧的图片中心偏移位置
|
||
# offset_y = int(current_offset * scale) # 根据放大比例调整偏移量
|
||
offset_x = int(current_offset * 1) # 根据放大比例调整偏移量
|
||
# offset_y = int(((new_height - img.shape[0]) // 2) * 1)
|
||
# start_y = center_y - img_resized.shape[0] // 2 + offset_y
|
||
|
||
# 计算图片在画布上的绘制位置
|
||
start_x = center_x - img_resized.shape[1] // 2 + offset_x
|
||
start_y = center_y - img_resized.shape[0] // 2
|
||
|
||
# 安全检查,确保不会复制超出边界的区域
|
||
src_x1 = max(-start_x, 0)
|
||
dst_x1 = max(start_x, 0)
|
||
copy_width = min(img_resized.shape[1] - src_x1, video_size[0] - dst_x1)
|
||
# copy_width = min(video_size[0], img_resized.shape[1])
|
||
|
||
# 调整图片复制区域的计算
|
||
src_y1 = max(-start_y, 0)
|
||
dst_y1 = max(start_y, 0)
|
||
copy_height = min(img_resized.shape[0] - src_y1, video_size[1] - dst_y1)
|
||
|
||
# copy_width = min(copy_width, img_resized.shape[1])
|
||
|
||
if copy_height > 0 and copy_width > 0:
|
||
canvas[dst_y1 : dst_y1 + copy_height, dst_x1 : dst_x1 + copy_width] = (
|
||
img_resized[
|
||
src_y1 : src_y1 + copy_height, src_x1 : src_x1 + copy_width
|
||
]
|
||
)
|
||
|
||
# 更新偏移量
|
||
current_offset += offset_change_per_frame
|
||
out.write(canvas)
|
||
out.release()
|
||
return video_name
|
||
|
||
def create_video_from_image_Y(
|
||
self,
|
||
image_path,
|
||
fps,
|
||
video_size,
|
||
duration,
|
||
start_offset,
|
||
end_offset,
|
||
img_resized,
|
||
):
|
||
"""
|
||
上下关键帧生成视频
|
||
"""
|
||
# 创建视频写入器
|
||
video_name = f"{image_path.split('/')[-1].split('.')[0]}.mp4"
|
||
out = cv2.VideoWriter(
|
||
video_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, video_size
|
||
)
|
||
total_frames = round(duration * fps)
|
||
self.frames += total_frames
|
||
# 计算偏移变化率
|
||
offset_change_per_frame = float(end_offset - start_offset) / total_frames
|
||
current_offset = start_offset
|
||
scale = img_resized.shape[1] / video_size[0]
|
||
|
||
for _ in range(int(duration * fps)):
|
||
# 创建一个空白画布
|
||
canvas = np.zeros((video_size[1], video_size[0], 3), dtype=np.uint8)
|
||
|
||
center_x, center_y = video_size[0] // 2, video_size[1] // 2
|
||
|
||
# 计算当前帧的图片中心偏移位置
|
||
# offset_y = int(current_offset * scale) # 根据放大比例调整偏移量
|
||
offset_y = int(current_offset * 1) # 根据放大比例调整偏移量
|
||
# offset_x = int(((new_width - img.shape[1]) // 2) * 1)
|
||
# start_x = center_x - img_resized.shape[1] // 2 + offset_x
|
||
|
||
# 计算图片在画布上的绘制位置
|
||
start_x = center_x - img_resized.shape[1] // 2
|
||
start_y = center_y - img_resized.shape[0] // 2 + offset_y
|
||
|
||
# 安全检查,确保不会复制超出边界的区域
|
||
src_y1 = max(-start_y, 0)
|
||
dst_y1 = max(start_y, 0)
|
||
copy_height = min(img_resized.shape[0] - src_y1, video_size[1] - dst_y1)
|
||
# copy_width = min(video_size[0], img_resized.shape[1])
|
||
|
||
# 调整图片复制区域的计算
|
||
src_x1 = max(-start_x, 0)
|
||
dst_x1 = max(start_x, 0)
|
||
copy_width = min(img_resized.shape[1] - src_x1, video_size[0] - dst_x1)
|
||
|
||
if copy_height > 0 and copy_width > 0:
|
||
canvas[dst_y1 : dst_y1 + copy_height, dst_x1 : dst_x1 + copy_width] = (
|
||
img_resized[
|
||
src_y1 : src_y1 + copy_height, src_x1 : src_x1 + copy_width
|
||
]
|
||
)
|
||
|
||
# 更新偏移量
|
||
current_offset += offset_change_per_frame
|
||
out.write(canvas)
|
||
out.release()
|
||
return video_name
|
||
|
||
def get_sorted_images(self, folder_path, image_extensions=[".jpg", ".png"]):
|
||
"""
|
||
获取图片,排序
|
||
"""
|
||
# 构建一个匹配所有指定扩展名的模式
|
||
patterns = [os.path.join(folder_path, "*" + ext) for ext in image_extensions]
|
||
# 列表用于存储找到的图片文件
|
||
image_files = []
|
||
# 遍历所有模式,匹配文件
|
||
for pattern in patterns:
|
||
image_files.extend(glob.glob(pattern))
|
||
# 按文件名排序
|
||
image_files.sort()
|
||
return image_files
|
||
|
||
def get_scaled_image(self, img, video_size, offest_type, start_offest, end_offest):
|
||
"""
|
||
根据关键帧类型。获取当前图片的放大比例
|
||
"""
|
||
scale_width = video_size[0] / img.shape[1]
|
||
scale_height = video_size[1] / img.shape[0]
|
||
scale = max(scale_width, scale_height)
|
||
|
||
if offest_type == "KFTypePositionY":
|
||
# 检查最大偏移量是否大于图片高度
|
||
all_offset = abs(start_offest) + abs(end_offest) + video_size[1]
|
||
|
||
if all_offset > img.shape[0] * scale:
|
||
# if all_offset > img.shape[0]:
|
||
scale = max(scale, all_offset / img.shape[0])
|
||
|
||
max_offset = max(abs(start_offest), abs(end_offest))
|
||
if max_offset > img.shape[0]:
|
||
# 如果最大偏移量大于图片高度,则进一步放大图像
|
||
scale = max(scale, video_size[1] / (img.shape[0] - max_offset))
|
||
|
||
elif offest_type == "KFTypePositionX":
|
||
# 检查最大偏移量是否大于图片宽度
|
||
all_offset = abs(start_offest) + abs(end_offest) + video_size[0]
|
||
|
||
# 判断最大高度和当前图片当前放大倍率之间的大小
|
||
if all_offset > img.shape[1] * scale:
|
||
# if all_offset > img.shape[0]:
|
||
scale = max(scale, all_offset / img.shape[1])
|
||
|
||
max_offset = max(abs(start_offest), abs(end_offest))
|
||
if max_offset > img.shape[1]:
|
||
# 如果最大偏移量大于图片高度,则进一步放大图像
|
||
scale = max(scale, video_size[0] / (img.shape[1] - max_offset))
|
||
|
||
elif offest_type == "KFTypeScale":
|
||
pass
|
||
|
||
else:
|
||
return ValueError("关键帧没有设置正确的参数")
|
||
|
||
new_width = int(img.shape[1] * scale)
|
||
new_height = int(img.shape[0] * scale)
|
||
img_resized = cv2.resize(
|
||
img, (new_width, new_height), interpolation=cv2.INTER_LINEAR
|
||
)
|
||
return img_resized
|
||
|
||
def GenerateVideoAllImage(self, image_dir, offset, config_json):
|
||
"""
|
||
生成所有的图片
|
||
"""
|
||
config_data = config_json["srt_time_information"]
|
||
isDirection = False
|
||
sort_images = self.get_sorted_images(image_dir)
|
||
# 生成所有的图片视频
|
||
for image_file in sort_images:
|
||
filename = os.path.splitext(os.path.basename(image_file))[
|
||
0
|
||
] # 获取文件名,不包括扩展名
|
||
number = int(filename.split("_")[-1])
|
||
if number == 188:
|
||
print(number)
|
||
filtered_data = [item for item in config_data if item["no"] == number]
|
||
# 判断是不是空,空的话就跳过
|
||
if len(filtered_data) == 0:
|
||
return ValueError("没有找到对应的关键帧")
|
||
|
||
print(filtered_data)
|
||
|
||
video_arr = []
|
||
# 计算当前图片的偏移量,以3200像素为基准
|
||
|
||
with open(image_file, "rb") as file:
|
||
img_bytes = file.read()
|
||
|
||
# 将字节流解码成图片
|
||
img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_UNCHANGED)
|
||
img_height, img_width = img.shape[:2]
|
||
proportion_height = img_height / 3200
|
||
proportion_width = img_height / 3200
|
||
|
||
key_frame = offset["name"]
|
||
offset_list = ["KFTypePositionY", "KFTypePositionX"]
|
||
real_key_frame = key_frame
|
||
|
||
if key_frame == "KFTypeRandom":
|
||
# 随机获取 offset_list 中的一个数据
|
||
real_key_frame = offset_list[np.random.randint(0, 2)]
|
||
|
||
if real_key_frame == "KFTypePositionY":
|
||
offsetValue = offset["up_down"] * proportion_height
|
||
elif real_key_frame == "KFTypePositionX":
|
||
offsetValue = offset["left_right"] * proportion_width
|
||
elif real_key_frame == "KFTypeScale":
|
||
offsetValue = offset["scale"]
|
||
else:
|
||
return ValueError("关键帧没有设置正确的参数")
|
||
|
||
# offsetValue = offset
|
||
if isDirection:
|
||
start_offset = offsetValue
|
||
end_offset = -offsetValue
|
||
isDirection = False
|
||
else:
|
||
start_offset = -offsetValue
|
||
end_offset = offsetValue
|
||
isDirection = True
|
||
|
||
video_path = self.create_video_from_image_with_center_offset(
|
||
image_file,
|
||
(filtered_data[0]["end_time"] - filtered_data[0]["start_time"]) / 1000,
|
||
start_offset,
|
||
end_offset,
|
||
self.fps,
|
||
self.video_size,
|
||
real_key_frame,
|
||
)
|
||
video_arr.append(video_path)
|
||
print(video_path)
|
||
|
||
# 微调所有的视频
|
||
mp4_folder = self.public_tools.list_files_by_extension(image_dir, ".mp4")
|
||
for mp4_path in mp4_folder:
|
||
filename = os.path.splitext(os.path.basename(mp4_path))[
|
||
0
|
||
] # 获取文件名,不包括扩展名
|
||
number = int(filename.split("_")[-1])
|
||
if number == 188:
|
||
print(number)
|
||
|
||
filtered_data = [item for item in config_data if item["no"] == number]
|
||
|
||
# print(filtered_data)
|
||
cmd = [
|
||
self.ffprobe_path,
|
||
"-v",
|
||
"error",
|
||
"-select_streams",
|
||
"v:0",
|
||
"-show_entries",
|
||
"stream=duration",
|
||
"-of",
|
||
"json",
|
||
mp4_path,
|
||
]
|
||
result = subprocess.run(
|
||
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
|
||
)
|
||
duration_sec = json.loads(result.stdout)["streams"][0]["duration"]
|
||
duration_ms = int(float(duration_sec) * 1000) # 将秒转换为毫秒
|
||
print(
|
||
duration_ms,
|
||
(filtered_data[0]["end_time"] - filtered_data[0]["start_time"]),
|
||
)
|
||
temp_mp4_path = os.path.join(image_dir, "temp_" + str(number) + ".mp4")
|
||
# 开始微调
|
||
cmd = []
|
||
cmd.append(self.ffmpeg_path)
|
||
cmd.append("-i")
|
||
cmd.append(mp4_path)
|
||
cmd.append("-filter:v")
|
||
cmd.append(
|
||
"setpts=PTS*"
|
||
+ str(
|
||
(filtered_data[0]["end_time"] - filtered_data[0]["start_time"])
|
||
/ duration_ms
|
||
)
|
||
)
|
||
cmd.append("-c:v")
|
||
|
||
if self.gpu_type == "NVIDIA":
|
||
cmd.append("h264_nvenc")
|
||
elif self.gpu_type == "AMD":
|
||
cmd.append("h264_vaapi")
|
||
else:
|
||
cmd.append("libx264")
|
||
|
||
cmd.append("-preset")
|
||
cmd.append("fast")
|
||
cmd.append("-rc:v")
|
||
cmd.append("cbr")
|
||
cmd.append("-b:v")
|
||
cmd.append(str(self.bitRate) + "k")
|
||
cmd.append(temp_mp4_path)
|
||
cmd.append("-loglevel")
|
||
cmd.append("error")
|
||
cmd.append("-an")
|
||
|
||
subprocess.run(cmd, check=True)
|
||
os.remove(mp4_path)
|
||
os.rename(temp_mp4_path, mp4_path)
|
||
print(self.frames)
|