# -*- coding: utf-8 -*- import cv2 import numpy as np import os import glob import public_tools import subprocess import json class ImageToVideo: def __init__(self, gpu_type) -> None: self.frames = 0 self.gpu_type = gpu_type self.public_tools = public_tools.PublicTools() self.ffmpeg_path = "../package/ffmpeg/win/ffmpeg" self.ffprobe_path = "../package/ffmpeg/win/ffprobe" pass def create_video_from_image_with_center_offset( self, image_path, duration, start_offset, end_offset, fps=60, video_size=(1440, 1080), offest_type="KFTypePositionY", ): """ 将图片合并成视频,并添加关键帧 """ # 使用Python的open函数以二进制模式读取图片文件 with open(image_path, "rb") as file: img_bytes = file.read() # 将字节流解码成图片 img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_UNCHANGED) img_resized = self.get_scaled_image( img, video_size, offest_type, start_offset, end_offset ) if offest_type == "KFTypePositionY": video_name = self.create_video_from_image_Y( image_path, fps, video_size, duration, start_offset, end_offset, img_resized, ) elif offest_type == "KFTypePositionX": video_name = self.create_video_from_image_X( image_path, fps, video_size, duration, start_offset, end_offset, img_resized, ) elif offest_type == "KFTypeScale": video_name = self.create_video_from_image_scale( image_path, fps, video_size, duration, start_offset, end_offset, img, ) else: return ValueError("关键帧没有设置正确的参数") return video_name def create_video_from_image_scale( self, image_path, fps, video_size, duration, start_scale, end_scale, img, ): """ 缩放关键帧生成视频 """ scale_width = video_size[0] / img.shape[1] scale_height = video_size[1] / img.shape[0] default_scale = max(scale_width, scale_height) # 创建视频写入器 video_name = f"{image_path.split('/')[-1].split('.')[0]}.mp4" out = cv2.VideoWriter( video_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, video_size ) total_frames = round(duration * fps) self.frames += total_frames # 计算偏移变化率 offset_change_per_frame = float(end_scale - start_scale) / total_frames // 2 if start_scale < 0: start_scale = 0 current_scale = start_scale for _ in range(int(duration * fps)): # 创建一个空白画布 canvas = np.zeros((video_size[1], video_size[0], 3), dtype=np.uint8) # 根据当前的缩放比例调整图片大小 img_resized = cv2.resize( img, None, fx=default_scale + current_scale, fy=default_scale + current_scale, ) center_x, center_y = video_size[0] // 2, video_size[1] // 2 # 计算图片在画布上的绘制位置 start_x = center_x - img_resized.shape[1] // 2 start_y = center_y - img_resized.shape[0] // 2 # 安全检查,确保不会复制超出边界的区域 src_x1 = max(-start_x, 0) dst_x1 = max(start_x, 0) copy_width = min(img_resized.shape[1] - src_x1, video_size[0] - dst_x1) src_y1 = max(-start_y, 0) dst_y1 = max(start_y, 0) copy_height = min(img_resized.shape[0] - src_y1, video_size[1] - dst_y1) if copy_height > 0 and copy_width > 0: canvas[dst_y1 : dst_y1 + copy_height, dst_x1 : dst_x1 + copy_width] = ( img_resized[ src_y1 : src_y1 + copy_height, src_x1 : src_x1 + copy_width ] ) # 更新偏移量 current_scale += offset_change_per_frame out.write(canvas) out.release() return video_name def create_video_from_image_X( self, image_path, fps, video_size, duration, start_offset, end_offset, img_resized, ): """ 左右关键帧生成视频 """ # 创建视频写入器 video_name = f"{image_path.split('/')[-1].split('.')[0]}.mp4" out = cv2.VideoWriter( video_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, video_size ) total_frames = round(duration * fps) self.frames += total_frames # 计算偏移变化率 offset_change_per_frame = float(end_offset - start_offset) / total_frames current_offset = start_offset scale = img_resized.shape[0] / video_size[1] for _ in range(int(duration * fps)): # 创建一个空白画布 canvas = np.zeros((video_size[1], video_size[0], 3), dtype=np.uint8) center_x, center_y = video_size[0] // 2, video_size[1] // 2 # 计算当前帧的图片中心偏移位置 # offset_y = int(current_offset * scale) # 根据放大比例调整偏移量 offset_x = int(current_offset * 1) # 根据放大比例调整偏移量 # offset_y = int(((new_height - img.shape[0]) // 2) * 1) # start_y = center_y - img_resized.shape[0] // 2 + offset_y # 计算图片在画布上的绘制位置 start_x = center_x - img_resized.shape[1] // 2 + offset_x start_y = center_y - img_resized.shape[0] // 2 # 安全检查,确保不会复制超出边界的区域 src_x1 = max(-start_x, 0) dst_x1 = max(start_x, 0) copy_width = min(img_resized.shape[1] - src_x1, video_size[0] - dst_x1) # copy_width = min(video_size[0], img_resized.shape[1]) # 调整图片复制区域的计算 src_y1 = max(-start_y, 0) dst_y1 = max(start_y, 0) copy_height = min(img_resized.shape[0] - src_y1, video_size[1] - dst_y1) # copy_width = min(copy_width, img_resized.shape[1]) if copy_height > 0 and copy_width > 0: canvas[dst_y1 : dst_y1 + copy_height, dst_x1 : dst_x1 + copy_width] = ( img_resized[ src_y1 : src_y1 + copy_height, src_x1 : src_x1 + copy_width ] ) # 更新偏移量 current_offset += offset_change_per_frame out.write(canvas) out.release() return video_name def create_video_from_image_Y( self, image_path, fps, video_size, duration, start_offset, end_offset, img_resized, ): """ 上下关键帧生成视频 """ # 创建视频写入器 video_name = f"{image_path.split('/')[-1].split('.')[0]}.mp4" out = cv2.VideoWriter( video_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, video_size ) total_frames = round(duration * fps) self.frames += total_frames # 计算偏移变化率 offset_change_per_frame = float(end_offset - start_offset) / total_frames current_offset = start_offset scale = img_resized.shape[1] / video_size[0] for _ in range(int(duration * fps)): # 创建一个空白画布 canvas = np.zeros((video_size[1], video_size[0], 3), dtype=np.uint8) center_x, center_y = video_size[0] // 2, video_size[1] // 2 # 计算当前帧的图片中心偏移位置 # offset_y = int(current_offset * scale) # 根据放大比例调整偏移量 offset_y = int(current_offset * 1) # 根据放大比例调整偏移量 # offset_x = int(((new_width - img.shape[1]) // 2) * 1) # start_x = center_x - img_resized.shape[1] // 2 + offset_x # 计算图片在画布上的绘制位置 start_x = center_x - img_resized.shape[1] // 2 start_y = center_y - img_resized.shape[0] // 2 + offset_y # 安全检查,确保不会复制超出边界的区域 src_y1 = max(-start_y, 0) dst_y1 = max(start_y, 0) copy_height = min(img_resized.shape[0] - src_y1, video_size[1] - dst_y1) # copy_width = min(video_size[0], img_resized.shape[1]) # 调整图片复制区域的计算 src_x1 = max(-start_x, 0) dst_x1 = max(start_x, 0) copy_width = min(img_resized.shape[1] - src_x1, video_size[0] - dst_x1) if copy_height > 0 and copy_width > 0: canvas[dst_y1 : dst_y1 + copy_height, dst_x1 : dst_x1 + copy_width] = ( img_resized[ src_y1 : src_y1 + copy_height, src_x1 : src_x1 + copy_width ] ) # 更新偏移量 current_offset += offset_change_per_frame out.write(canvas) out.release() return video_name def get_sorted_images(self, folder_path, image_extensions=[".jpg", ".png"]): """ 获取图片,排序 """ # 构建一个匹配所有指定扩展名的模式 patterns = [os.path.join(folder_path, "*" + ext) for ext in image_extensions] # 列表用于存储找到的图片文件 image_files = [] # 遍历所有模式,匹配文件 for pattern in patterns: image_files.extend(glob.glob(pattern)) # 按文件名排序 image_files.sort() return image_files def get_scaled_image(self, img, video_size, offest_type, start_offest, end_offest): """ 根据关键帧类型。获取当前图片的放大比例 """ scale_width = video_size[0] / img.shape[1] scale_height = video_size[1] / img.shape[0] scale = max(scale_width, scale_height) if offest_type == "KFTypePositionY": # 检查最大偏移量是否大于图片高度 all_offset = abs(start_offest) + abs(end_offest) + video_size[1] if all_offset > img.shape[0] * scale: # if all_offset > img.shape[0]: scale = max(scale, all_offset / img.shape[0]) max_offset = max(abs(start_offest), abs(end_offest)) if max_offset > img.shape[0]: # 如果最大偏移量大于图片高度,则进一步放大图像 scale = max(scale, video_size[1] / (img.shape[0] - max_offset)) elif offest_type == "KFTypePositionX": # 检查最大偏移量是否大于图片宽度 all_offset = abs(start_offest) + abs(end_offest) + video_size[0] # 判断最大高度和当前图片当前放大倍率之间的大小 if all_offset > img.shape[1] * scale: # if all_offset > img.shape[0]: scale = max(scale, all_offset / img.shape[1]) max_offset = max(abs(start_offest), abs(end_offest)) if max_offset > img.shape[1]: # 如果最大偏移量大于图片高度,则进一步放大图像 scale = max(scale, video_size[0] / (img.shape[1] - max_offset)) elif offest_type == "KFTypeScale": pass else: return ValueError("关键帧没有设置正确的参数") new_width = int(img.shape[1] * scale) new_height = int(img.shape[0] * scale) img_resized = cv2.resize( img, (new_width, new_height), interpolation=cv2.INTER_LINEAR ) return img_resized def GenerateVideoAllImage(self, image_dir, offset, config_json): """ 生成所有的图片 """ config_data = config_json["srt_time_information"] isDirection = False sort_images = self.get_sorted_images(image_dir) # 生成所有的图片视频 for image_file in sort_images: filename = os.path.splitext(os.path.basename(image_file))[ 0 ] # 获取文件名,不包括扩展名 number = int(filename.split("_")[-1]) if number == 188: print(number) filtered_data = [item for item in config_data if item["no"] == number] # 判断是不是空,空的话就跳过 if len(filtered_data) == 0: return ValueError("没有找到对应的关键帧") print(filtered_data) video_arr = [] # 计算当前图片的偏移量,以3200像素为基准 with open(image_file, "rb") as file: img_bytes = file.read() # 将字节流解码成图片 img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_UNCHANGED) img_height, img_width = img.shape[:2] proportion_height = img_height / 3200 proportion_width = img_height / 3200 key_frame = offset["name"] offset_list = ["KFTypePositionY", "KFTypePositionX"] real_key_frame = key_frame if key_frame == "KFTypeRandom": # 随机获取 offset_list 中的一个数据 real_key_frame = offset_list[np.random.randint(0, 2)] if real_key_frame == "KFTypePositionY": offsetValue = offset["up_down"] * proportion_height elif real_key_frame == "KFTypePositionX": offsetValue = offset["left_right"] * proportion_width elif real_key_frame == "KFTypeScale": offsetValue = offset["scale"] else: return ValueError("关键帧没有设置正确的参数") # offsetValue = offset if isDirection: start_offset = offsetValue end_offset = -offsetValue isDirection = False else: start_offset = -offsetValue end_offset = offsetValue isDirection = True video_path = self.create_video_from_image_with_center_offset( image_file, (filtered_data[0]["end_time"] - filtered_data[0]["start_time"]) / 1000, start_offset, end_offset, self.fps, self.video_size, real_key_frame, ) video_arr.append(video_path) print(video_path) # 微调所有的视频 mp4_folder = self.public_tools.list_files_by_extension(image_dir, ".mp4") for mp4_path in mp4_folder: filename = os.path.splitext(os.path.basename(mp4_path))[ 0 ] # 获取文件名,不包括扩展名 number = int(filename.split("_")[-1]) if number == 188: print(number) filtered_data = [item for item in config_data if item["no"] == number] # print(filtered_data) cmd = [ self.ffprobe_path, "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=duration", "-of", "json", mp4_path, ] result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) duration_sec = json.loads(result.stdout)["streams"][0]["duration"] duration_ms = int(float(duration_sec) * 1000) # 将秒转换为毫秒 print( duration_ms, (filtered_data[0]["end_time"] - filtered_data[0]["start_time"]), ) temp_mp4_path = os.path.join(image_dir, "temp_" + str(number) + ".mp4") # 开始微调 cmd = [] cmd.append(self.ffmpeg_path) cmd.append("-i") cmd.append(mp4_path) cmd.append("-filter:v") cmd.append( "setpts=PTS*" + str( (filtered_data[0]["end_time"] - filtered_data[0]["start_time"]) / duration_ms ) ) cmd.append("-c:v") if self.gpu_type == "NVIDIA": cmd.append("h264_nvenc") elif self.gpu_type == "AMD": cmd.append("h264_amf") else: cmd.append("libx264") cmd.append("-preset") cmd.append("fast") cmd.append("-rc:v") cmd.append("cbr") cmd.append("-b:v") cmd.append(str(self.bitRate) + "k") cmd.append(temp_mp4_path) cmd.append("-loglevel") cmd.append("error") cmd.append("-an") subprocess.run(cmd, check=True) os.remove(mp4_path) os.rename(temp_mp4_path, mp4_path) print(self.frames)