Commit e3167a43 by xiaoqi

增加自动上传编码后的礼物上传到后台的功能

parent a6a14574
Showing with 177 additions and 8 deletions
import os
from moviepy import VideoFileClip, AudioFileClip
def merge_video_audio(video_dir, audio_dir, output_dir):
# 获取视频目录下的所有mp4文件名
video_files = {f.split('-')[0]: f for f in os.listdir(video_dir) if f.endswith('.mp4')}
# 遍历音频目录下的所有mp3文件
for audio_filename in os.listdir(audio_dir):
if not audio_filename.endswith('.mp3'):
continue
# 提取音频文件名前缀
prefix = audio_filename.split('.')[0]
# 检查是否存在对应的视频文件
if prefix in video_files:
video_path = os.path.join(video_dir, video_files[prefix])
audio_path = os.path.join(audio_dir, audio_filename)
# 加载视频和音频文件
video = VideoFileClip(video_path)
audio = AudioFileClip(audio_path)
# 如果音频长度不匹配视频长度,则调整音频长度与视频一致
if audio.duration > video.duration:
audio = audio.subclipped(0, video.duration)
elif audio.duration < video.duration:
# 这里简化处理,实际应用可能需要考虑如何处理这种情况
print(f"警告: 音频文件 {audio_filename} 比视频短")
# 合并视频和音频
video_with_audio = video.with_audio(audio)
# 创建输出路径
output_filename = f"{prefix}_merged.mp4"
output_path = os.path.join(output_dir, output_filename)
# 导出合并后的视频文件
video_with_audio.write_videofile(output_path, codec='libx264', audio_codec='aac')
print(f"已合并: {output_filename}")
# 关闭资源
audio.close()
video.close()
if __name__ == '__main__':
# 输入视频和音频所在的目录以及输出目录
# video_directory = input("请输入视频文件所在的目录:")
# audio_directory = input("请输入音频文件所在的目录:")
# output_directory = input("请输入输出文件存放的目录:")
merge_video_audio("C:\\Users\\decid\\Downloads\\motoring", "E:\\git\\MP4GiftConvert\\mp4giftconvert\\audioresource", "E:\\git\\MP4GiftConvert\\mp4giftconvert\\mp3comine_result")
\ No newline at end of file
import glob
import math
import os
import re
import shutil
import subprocess
import tempfile
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from pathlib import Path
import numpy as np
from PIL import Image, ImageEnhance
import requests
from PIL import ImageEnhance
from moviepy import *
from moviepy import VideoFileClip, clips_array
import traceback
# p_preset = "veryslow"
p_preset = "veryslow"
......@@ -26,6 +25,7 @@ p_pix_fmt = "yuv444p"
p_codec = "libx264"
p_enable_sharpening = "" # 5:5: 1.5:0: 0:0 //
p_width = None
p_upload_server_gid = ""
"""
luma_msize_x:亮度矩阵的水平大小(X方向)。这个值定义了应用于亮度分量的模糊矩阵的宽度。
luma_msize_y:亮度矩阵的垂直大小(Y方向)。这个值定义了应用于亮度分量的模糊矩阵的高度。
......@@ -225,7 +225,7 @@ def parse_anmf_data(webp_info):
return result
from PIL import Image, ImageOps
from PIL import Image
import os
from pathlib import Path
......@@ -282,6 +282,7 @@ def webp2png(webp_file, dir_file, target_width=None):
return True
def enhance_png(png_file_path):
# 打开原始图片
image = Image.open(png_file_path)
......@@ -451,7 +452,7 @@ def webp2mp4core(input_webp_path_str, output_file_path: str = None, output_name:
temp_png_alpha_dir = temp_dir / f"alpha_{webp_path.stem}"
delete_path(temp_dir)
# 使用更新后的临时文件夹路径进行后续操作
if not webp2png(input_webp_path_str, temp_png_dir,target_width=p_width):
if not webp2png(input_webp_path_str, temp_png_dir, target_width=p_width):
return
print(f"已经{input_webp_path_str}转成成png帧,保存在{temp_png_dir}")
convert_to_grayscale(temp_png_dir, temp_png_alpha_dir)
......@@ -476,6 +477,41 @@ def webp2mp4core(input_webp_path_str, output_file_path: str = None, output_name:
merge_videos_side_by_side(temp_output_alpha_video_path, temp_normal_output_video_path, output_merge_video_path)
print(f"两个视频合并完成,保存在{output_merge_video_path},开始删除临时文件")
delete_path(temp_dir)
if p_upload_server_gid != "" and p_upload_server_gid != None:
upload_gift(output_merge_video_path, p_upload_server_gid)
def upload_gift(file_path, gid):
"""
上传指定路径的MP4文件,并附带一个giftid(gid)。
:param file_path: MP4文件在本地的路径
:param gid: 礼物ID(作为请求参数)
:return: 返回响应对象,包括服务器的状态码和响应内容
"""
url = "http://httest.yabolive.net/gift/test/upload/mp4Res.html"
headers = {
# 如果需要身份验证或其他自定义头部,请在这里添加
}
files = {
'mp4file': (os.path.basename(file_path), open(file_path, 'rb'), 'video/mp4'),
}
data = {
'gid': str(gid),
}
try:
response = requests.post(url, headers=headers, files=files, data=data)
if response.status_code == 200:
print(f"文件上传成功:{response.text}")
else:
print(f"文件上传失败,状态码:{response.status_code}, 错误消息:{response.text}")
except Exception as e:
print(f"发生异常:{e}")
return None
return response
# 注意:这里假设其他所需函数(如webp2png, convert_to_grayscale, png2mp4, get_fps, merge_videos_side_by_side, delete_path)已经定义。
def convert_png_dir():
......@@ -526,7 +562,7 @@ if __name__ == '__main__':
inputDirOrFile = inputDirOrFile.replace("\"", "")
start_time = time.time()
print(f"pvf={p_vf}")
check_file_or_dir(inputDirOrFile,output_name="-2")
check_file_or_dir(inputDirOrFile, output_name="-2")
print(f"耗时:{time.time() - start_time}秒")
print(f"mp4大小:{mp4_size_total}")
......
......@@ -54,6 +54,7 @@ denoising = sys.argv[7] # 第一个参数
color_enhancement = sys.argv[8] # 第一个参数
webp_path_or_dir = sys.argv[9] # 第一个参数
codec = sys.argv[10] # 第一个参数
convert_webp_2_mp4.p_upload_server_gid = sys.argv[11] # 第一个参数
# if not is_null_or_empty(gift_id):
# webp_path_or_dir = find_first_file("webp_dir", gift_id)
# else:
......
......@@ -54,7 +54,25 @@ def get_motoring_resource(phone, password):
motoring in webp_list]
# 等待所有任务完成
concurrent.futures.wait(futures)
def get_motoring_audio_resource(phone, password):
data = json.loads(request_url("/setting/updateMotoring", phone, password))["data"]
# 创建一个线程池,大小为5
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# mp3_list = [motoring for motoring in data if motoring["audioresource"]!=""]
# # 对于result.data中的每一个gift,提交一个download_file任务到线程池
# futures = [executor.submit(download_file, cdn_host + motoring["audioresource"], f"motoring/motoring_dir_{motoring["mid"]}",
# f"{motoring["mid"]}-{motoring["darkMd5"]}") for
# motoring in mp3_list]
# # 等待所有任务完成
# concurrent.futures.wait(futures)
webp_list = [motoring for motoring in data if motoring["darkRes"] != ""]
# 对于result.data中的每一个gift,提交一个download_file任务到线程池
futures = [executor.submit(download_file, cdn_host + motoring["audioresource"], f"audioresource/audioresource",
f"{motoring["mid"]}") for
motoring in webp_list]
# 等待所有任务完成
concurrent.futures.wait(futures)
def read_token_uid(phone, password):
path = f"{phone}"
db = Path(path)
......@@ -199,6 +217,6 @@ if __name__ == '__main__':
# print("token=" + token)
# load_gift_resource("13560573423", "123456a")
# load_live_gift_list("15889845884", "123456a")
# get_motoring_resource("13560573423", "123456a")
get_motoring_audio_resource("13560573423", "123456a")
# load_gift_resource("13560573423", "123456a")
# load_live_gift_list("")
\ No newline at end of file
from moviepy import VideoFileClip
import os
def remove_audio_from_videos(directory):
# 遍历指定目录下的所有文件
for filename in os.listdir(directory):
if filename.endswith('.mp4'):
video_path = os.path.join(directory, filename)
# 加载视频文件
video_clip = VideoFileClip(video_path)
# 去除音频
video_without_audio = video_clip.without_audio()
# 保存新的无音频视频文件
output_filename = os.path.splitext(filename)[0] + '_noaudio.mp4'
output_path = os.path.join(directory, output_filename)
video_without_audio.write_videofile(output_path, codec='libx264', audio=False)
print(f"已处理: {output_filename}")
import os
import requests
def upload_gift(file_path, gid):
"""
上传指定路径的MP4文件,并附带一个giftid(gid)。
:param file_path: MP4文件在本地的路径
:param gid: 礼物ID(作为请求参数)
:return: 返回响应对象,包括服务器的状态码和响应内容
"""
url = "http://httest.yabolive.net/gift/test/upload/mp4Res.html"
headers = {
# 如果需要身份验证或其他自定义头部,请在这里添加
}
files = {
'mp4file': (os.path.basename(file_path), open(file_path, 'rb'), 'video/mp4'),
}
data = {
'gid': str(gid),
}
try:
response = requests.post(url, headers=headers, files=files, data=data)
if response.status_code == 200:
print(f"文件上传成功:{response.text}")
else:
print(f"文件上传失败,状态码:{response.status_code}, 错误消息:{response.text}")
except Exception as e:
print(f"发生异常:{e}")
return None
return response
res = upload_gift('123.mp4', 21331)
print(res.content)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment