import glob import math import os import re import shutil import subprocess import tempfile import time from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from pathlib import Path import numpy as np from PIL import Image, ImageEnhance from moviepy import * from moviepy import VideoFileClip, clips_array import traceback # p_preset = "veryslow" p_preset = "veryslow" p_bitrate = "2000k" p_threads = 4 p_crf = "22" p_tune = "animation" p_pix_fmt = "yuv444p" p_codec = "libx264" p_enable_sharpening = "" # 5:5: 1.5:0: 0:0 // p_width = None """ luma_msize_x:亮度矩阵的水平大小(X方向)。这个值定义了应用于亮度分量的模糊矩阵的宽度。 luma_msize_y:亮度矩阵的垂直大小(Y方向)。这个值定义了应用于亮度分量的模糊矩阵的高度。 luma_amount:亮度锐化强度。正值表示锐化,负值表示模糊。通常范围从 -2.0 到 5.0。 chroma_msize_x:色度矩阵的水平大小(X方向)。与亮度类似,但应用于色度分量。 chroma_msize_y:色度矩阵的垂直大小(Y方向)。 chroma_amount:色度锐化强度。 """ p_enable_denoising = "" # 4:3:6:4 亮度空间去噪强度 亮度时间去噪强度 色度空间去噪强度 色度时间去噪强度 p_enable_color_enhancement = "" # 色彩增强 p_fps = 0 p_vf = "" def merge_videos_side_by_side(video_a_path, video_b_path, output_path): # 加载两段视频 clip_a = VideoFileClip(video_a_path) clip_b = VideoFileClip(video_b_path) # 将两段视频按左右排列拼接 final_clip = clips_array([[clip_a, clip_b]]) if p_vf == "": params = ["-crf", p_crf, "-pix_fmt", p_pix_fmt, "-tune", p_tune] else: params = ["-vf", p_vf, "-crf", p_crf, "-pix_fmt", p_pix_fmt, "-tune", p_tune] # 输出合并后的视频 final_clip.write_videofile(output_path, codec=p_codec, fps=clip_a.fps, bitrate=p_bitrate, preset=p_preset, threads=p_threads, ffmpeg_params=params) print(f"合并完成,已保存到:{output_path}") # 确保所有图像都兼容 RGB/RGBA 格式 def load_image_as_rgb(image_path): image = Image.open(image_path) if image.mode == "L": # 如果是灰度图像(单通道) image = image.convert("RGB") # 转换为 RGB 格式 elif image.mode == "RGBA": # 如果是 RGBA 图像(带透明通道) pass # 保持原样 else: image = image.convert("RGB") # 其他模式也转为 RGB return np.array(image) def update_vf_params(): """ 根据参数生成 FFmpeg 的 -vf 滤镜参数字符串。 :param convert_to_hdr: 是否转换为 HDR(True 或 False) :param enable_sharpening: 是否开启锐化(True 或 False) :param enable_denoising: 是否开启降噪(True 或 False) :param enable_color_enhancement: 是否开启色彩增强(True 或 False) :return: 完整的 -vf 参数字符串 """ filters = [] # # HDR 转换 # if p_convert_to_hdr: # hdr_filter = ( # "zscale=matrixin=bt709:matrix=bt709:t=bt709:m=bt709:r=tv,tonemap=tonemap=hable:desat=0" # ) # filters.append(hdr_filter) if p_fps > 0: to_fps = ( f"minterpolate=mi_mode=mci:mc_mode=aobmc:vsbmc=1:me_mode=bidir:me=epzs:fps={p_fps}" ) filters.append(to_fps) # 降噪 if p_enable_denoising != "": denoise_filter = f"hqdn3d={p_enable_denoising}" # 去噪参数 filters.append(denoise_filter) # 锐化 if p_enable_sharpening != "": # unsharp = 5:5: 1.5:0: 0:0 sharpen_filter = f"unsharp={p_enable_sharpening}" # 锐化参数 filters.append(sharpen_filter) # 色彩增强 if p_enable_color_enhancement: color_enhance_filter = f"eq=contrast={p_enable_color_enhancement}:saturation={p_enable_color_enhancement}" # 色彩增强参数 filters.append(color_enhance_filter) # 如果没有启用任何滤镜,返回空字符串 if not filters: return "" # 将所有滤镜用逗号连接成一个完整的 -vf 参数字符串 vf_params = ",".join(filters) global p_vf p_vf = vf_params print(f"最终参数vf{vf_params}") def png2mp4(output_video, fps2, png_dir, target_fps: int = 1000): """ 将 PNG 序列转换为 MP4 视频文件。 参数: output_video (str): 输出视频文件路径。 fps2 (int): 输入帧率。 png_dir (str): 包含 PNG 文件的目录路径。 target_fps (int): 目标帧率。如果 fps2 > target_fps,则抽帧到目标帧率;否则保持原帧率。 """ # 确保帧率不低于 12 fps = max(fps2, 12) # 如果 fps 大于目标帧率,则设置为目标帧率 if fps > target_fps: fps = target_fps # 获取所有 PNG 文件并按名称排序 png_files = sorted(glob.glob(f"{png_dir}/*.png")) if not png_files: raise ValueError("未找到任何 PNG 文件,请检查 png_dir 路径是否正确。") # 加载 PNG 序列帧并确保兼容性 image_arrays = [load_image_as_rgb(png_file) for png_file in png_files] # 计算总帧数和期望帧数 total_frames = len(image_arrays) print(f"抽帧前帧数:{total_frames}") expected_frames = int(total_frames * (target_fps / fps2)) # 如果需要抽帧,创建新的图像序列 if fps < fps2: step = total_frames / expected_frames # 计算精确的抽帧步长 image_arrays = [image_arrays[int(i * step)] for i in range(expected_frames)] final_frames = len(image_arrays) print(f"抽帧后帧数:{final_frames}") # 创建图像序列剪辑 image_clip = ImageSequenceClip(image_arrays, fps=fps) # 创建纯色背景 background_color = (255, 255, 255) # 背景颜色(黑) background = ColorClip( size=image_clip.size, # 背景尺寸与图像帧一致 color=background_color, # 背景颜色 duration=image_clip.duration # 持续时间与图像帧一致 ) # 将 PNG 帧叠加到背景上(透明区域会被背景填充) final_clip = CompositeVideoClip( [background, image_clip.with_position("center")] # 背景在下,PNG 帧居中 ) if p_vf == "": params = ["-crf", p_crf, "-pix_fmt", p_pix_fmt, "-tune", p_tune] else: params = ["-vf", p_vf, "-crf", p_crf, "-pix_fmt", p_pix_fmt, "-tune", p_tune] print(f"开始转码,帧率: {fps}") final_clip.write_videofile( output_video, codec=p_codec, # 使用 libx264rgb 编解码器尝试无损编码 fps=fps, preset=p_preset, # 使用较快的预置设置 threads=p_threads, bitrate=p_bitrate, ffmpeg_params=params ) def get_webp_duration(file_path): try: # 执行webpinfo命令并捕获输出 result = subprocess.run( ['webpinfo', '-animation', file_path], capture_output=True, text=True, check=True ) # 正则匹配Duration字段 return parse_anmf_data(result.stdout) except subprocess.CalledProcessError as e: print(f"解析失败:{e.stderr}") return [] # 示例调用 def parse_anmf_data(webp_info): anmf_pattern = re.compile( r"Chunk ANMF.*?Width:\s+(\d+)\s+Height:\s+(\d+)\s+Duration:\s+(\d+)", re.DOTALL ) # 查找所有匹配的 ANMF 块 matches = anmf_pattern.findall(webp_info) # 将匹配结果转换为字典并存储在列表中 result = [] for match in matches: width, height, duration = match result.append({ "width": int(width), "height": int(height), "duration": int(duration) }) return result from PIL import Image, ImageOps import os from pathlib import Path # 打开 WebP 动画文件并解析为 PNG 序列帧 def webp2png(webp_file, dir_file, target_width=None): # 确保目标目录存在 if Path(dir_file).exists(): # 不重新生成png,只生成一次 return True os.makedirs(dir_file, exist_ok=True) # 打开 WebP 文件 img = Image.open(webp_file) # 检查是否是动画 if (not getattr(img, "is_animated", False)) | (img.n_frames <= 1): print("这不是一个动画 WebP 文件") return False else: # 遍历每一帧 for i in range(img.n_frames): img.seek(i) # 移动到第 i 帧 frame = img.copy() # 复制当前帧 width, height = frame.size # 如果 target_width 为 None,则裁剪到最近的能被 8 整除的宽度 if target_width is None: target_width = (width // 8) * 8 # 向下取整到最近的 8 的倍数 if target_width < 8: # 确保宽度至少为 8 target_width = 8 # 对于高度,我们也应用同样的逻辑 target_height = (height // 8) * 8 # 向下取整到最近的 8 的倍数 if target_height < 8: # 确保高度至少为 8 target_height = 8 # 如果目标宽度大于原图宽度,添加透明背景居中 if target_width > width or target_height > height: new_frame = Image.new("RGBA", (target_width, target_height), (0, 0, 0, 0)) offset_x = (target_width - width) // 2 offset_y = (target_height - height) // 2 new_frame.paste(frame, (offset_x, offset_y)) frame = new_frame else: # 调整宽度和高度 frame = frame.crop(((width - target_width) // 2, (height - target_height) // 2, (width + target_width) // 2, (height + target_height) // 2)) # 构造保存路径 frame_path = os.path.join(dir_file, f"frame_{i:03d}.png") # 保存为 PNG 文件 frame.save(frame_path, "PNG") return True def enhance_png(png_file_path): # 打开原始图片 image = Image.open(png_file_path) # 创建一个ImageEnhance对象,这里选择增强饱和度 enhancer = ImageEnhance.Color(image) # 提高饱和度(增强因子大于1.0会增加饱和度) factor = 1.5 # 设定增强因子,可以根据需要调整 enhanced_image = enhancer.enhance(factor) # 保存调整后的图片 enhanced_image.save(png_file_path) def get_fps(webp_file): webp_infos = get_webp_duration(webp_file) duration = 0 for webp_info in webp_infos: duration += webp_info['duration'] total_frame_count = len(webp_infos) print(f"总时长:{duration}") return math.ceil((total_frame_count / duration * 1000)) + 1 def convert_png_alpha_2_gray(png_file_path, save_file_path): # 打开PNG图片 image = Image.open(png_file_path) # 确保图片有alpha通道 if image.mode in ("RGBA", "LA") or (image.mode == "P" and "transparency" in image.info): # 提取alpha通道 alpha = image.getchannel("A") # 将alpha通道保存为灰度图片 alpha.save(save_file_path) else: print("该图片没有alpha通道") # 把普通的png转成单通道只包含alpha的png def convert_to_grayscale(input_folder, output_folder): # 确保输出文件夹存在 if not os.path.exists(output_folder): os.makedirs(output_folder) # 遍历输入文件夹中的所有文件 for filename in os.listdir(input_folder): # 检查文件是否是PNG格式 if filename.lower().endswith(".png"): file_path = os.path.join(input_folder, filename) output_path = os.path.join(output_folder, filename) convert_png_alpha_2_gray(file_path, output_path) def check_file_or_dir(userinput, output_file_path: str = None, output_name: str = None): print(f"入参:userinput={userinput} output_file_path={output_file_path} output_name={output_name}") # 检查路径是否存在 if not os.path.exists(userinput): print(f"路径不存在:{userinput}") return files_to_process = [] # 如果是目录,递归处理目录中的所有文件 if os.path.isdir(userinput): for dirpath, _, filenames in os.walk(userinput): for filename in filenames: full_path = os.path.join(dirpath, filename) files_to_process.append((full_path, output_file_path, output_name)) # 如果是文件,直接添加到待处理列表中 elif os.path.isfile(userinput): files_to_process.append((userinput, output_file_path, output_name)) else: print(f"路径既不是文件也不是目录:{userinput}") return # 使用ThreadPoolExecutor来并行处理任务 with ThreadPoolExecutor(max_workers=8) as executor: # 提交所有任务 futures = {executor.submit(process_webp_file, file_path, output_file_path, out_name): ( file_path, output_file_path, out_name) for file_path, output_file_path, out_name in files_to_process} # 获取已完成的任务结果 for future in as_completed(futures): file_path, output_file_path, out_name = futures[future] try: future.result() # 这里可以获取process_webp_file的返回值(如果有) except Exception as exc: print(f'{file_path} 处理异常: {exc}') print("详细堆栈信息如下:") traceback.print_exc() def process_webp_file(file_path, output_file_path: str = None, output_name: str = None): if is_webp_animator(file_path): print(f"正在处理 WebP 文件:{file_path}") webp2mp4core(file_path, output_file_path=output_file_path, output_name=output_name) else: print(f"跳过非 WebP 文件:{file_path}") def is_webp_animator(file_path): try: with Image.open(file_path) as img: if img.format == 'WEBP': return True except Exception: print("e") return False # 初始化累计大小变量 webp_size_total = 0 mp4_size_total = 0 def get_total_size(input_webp_path_str): global webp_size_total, mp4_size_total # 原始webp文件的路径对象 webp_path = Path(input_webp_path_str) # 最终合并视频的输出路径保持不变,仍然位于原始webp文件所在目录 output_merge_video_path = f"{webp_path.parent}/{webp_path.stem}.mp4" if Path(output_merge_video_path).exists(): # 如果mp4文件已经生成,则获取其大小并累加到总大小中 mp4_size = output_merge_video_path.stat().st_size mp4_size_total += mp4_size # 获取webp文件大小并累加到总大小中 webp_size = webp_path.stat().st_size webp_size_total += webp_size print(f"计入大小:webp:{webp_path}") print(f"计入大小:mp4:{output_merge_video_path}") def webp2mp4core(input_webp_path_str, output_file_path: str = None, output_name: str = None): print(f"开始将{input_webp_path_str}转成成mp4") # 原始webp文件的路径对象 webp_path = Path(input_webp_path_str) mp4_output_path = webp_path.parent if output_file_path is not None: mp4_output_path = output_file_path # 最终合并视频的输出路径保持不变,仍然位于原始webp文件所在目录 if output_name is None: output_merge_video_path = f"{mp4_output_path}/{webp_path.stem}.mp4" else: output_merge_video_path = f"{mp4_output_path}/{webp_path.stem}-{output_name}.mp4" if Path(output_merge_video_path).exists(): print(f"文件{output_merge_video_path}已存在跳过生成") return # 在临时目录下创建对应的临时文件夹路径 # 获取系统临时目录 if output_name is None: dir_name_end = "" else: dir_name_end = output_name temp_dir = Path(tempfile.gettempdir()) / f"convert_webp4-{dir_name_end}" temp_png_dir = temp_dir / webp_path.stem temp_png_alpha_dir = temp_dir / f"alpha_{webp_path.stem}" # 使用更新后的临时文件夹路径进行后续操作 if not webp2png(input_webp_path_str, temp_png_dir,target_width=p_width): return print(f"已经{input_webp_path_str}转成成png帧,保存在{temp_png_dir}") convert_to_grayscale(temp_png_dir, temp_png_alpha_dir) print(f"已经{input_webp_path_str}转成成透明灰度帧,保存在{temp_png_alpha_dir}") # 生成临时输出视频文件路径 temp_output_alpha_video_path = temp_dir / f"{webp_path.stem}-alpha.mp4" temp_normal_output_video_path = temp_dir / f"{webp_path.stem}-normal.mp4" # 获取fps并转换视频 fps = get_fps(input_webp_path_str) global p_vf global p_fps if fps < 12 and p_fps <= 0: p_fps = 12 update_vf_params() print(f"获取到当前webp的帧数是{fps}") png2mp4(temp_normal_output_video_path, fps, temp_png_dir) print(f"已经将正常帧数据转成视频保存在{temp_normal_output_video_path}") png2mp4(temp_output_alpha_video_path, fps, temp_png_alpha_dir) print(f"已经将灰度帧数据转成灰度视频保存在{temp_output_alpha_video_path}") # 合并视频并清理临时文件 merge_videos_side_by_side(temp_output_alpha_video_path, temp_normal_output_video_path, output_merge_video_path) print(f"两个视频合并完成,保存在{output_merge_video_path},开始删除临时文件") # 注意:这里假设其他所需函数(如webp2png, convert_to_grayscale, png2mp4, get_fps, merge_videos_side_by_side, delete_path)已经定义。 def convert_png_dir(): png_dir = input("请输入序列帧所在目录") """ 检查指定目录是否存在且包含PNG文件。 参数: png_dir (str): 要检查的目录路径。 """ if os.path.exists(png_dir) and os.path.isdir(png_dir): # 获取目录下的所有文件名 files = os.listdir(png_dir) # 过滤出PNG文件并检查至少有一个PNG文件 png_files = [f for f in files if f.lower().endswith('.png')] if png_files: fps = input("请输入转换的fps") base_dir = os.path.dirname(png_dir) dir_name = os.path.basename(png_dir) mp4_file_path = os.path.join(base_dir, dir_name + ".mp4") png2mp4(output_video=mp4_file_path, fps=float(fps), png_dir=png_dir) else: print("目录里不存在序列帧,重新输入") convert_png_dir() else: print("目录不存在") convert_png_dir() if __name__ == '__main__': # is_png_dir_convert = input("1.转换png序列帧到MP4 \n2.转换webp文件到MP4") # if is_png_dir_convert == '1': # convert_png_dir() # else: # inputDirOrFile = input("请输入文件或目录的路径:") # inputDirOrFile = inputDirOrFile.replace("\"", "") # # inputDirOrFile = "I:/test/test.webp" # start_time = time.time() # p_vf = generate_vf_params(convert_fps_24=False, convert_to_hdr=False, enable_sharpening=False, # enable_denoising=True, enable_color_enhancement=False) # print(f"pvf={p_vf}") # check_file_or_dir(inputDirOrFile, "降噪") # print(f"耗时:{time.time() - start_time}秒") # print(f"mp4大小:{mp4_size_total}") # print(f"webp大小:{webp_size_total}") update_vf_params() inputDirOrFile = input("请输入文件或目录的路径:") inputDirOrFile = inputDirOrFile.replace("\"", "") start_time = time.time() print(f"pvf={p_vf}") check_file_or_dir(inputDirOrFile,output_name="-2") print(f"耗时:{time.time() - start_time}秒") print(f"mp4大小:{mp4_size_total}") print(f"webp大小:{webp_size_total}") # 调用函数 def delete_path(path): try: if not os.path.exists(path): print(f"路径不存在: {path}") return False if os.path.isfile(path): os.remove(path) print(f"已删除文件: {path}") elif os.path.isdir(path): shutil.rmtree(path) print(f"已删除目录: {path}") else: print(f"未知路径类型: {path}") return False return True except PermissionError: print(f"权限不足,无法删除: {path}") except Exception as e: print(f"删除过程中发生未知错误: {str(e)}") return False