Commit a65bde03 by xiaoqi

Update .gitignore and untrack ignored files

parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# Virtual environment files
.venv/
venv/
env/
.eggs/
pip-wheel-metadata
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyCharm files and directories
.idea/
*.iml
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# dotenv file
.env
# Logs and databases
logs/
*.log
*.sql
*.sqlite3
# Local configuration files
local_settings.py
# Testing and coverage
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
\ No newline at end of file
import glob
import math
import os
import re
import shutil
import subprocess
import tempfile
import time
import traceback
from pathlib import Path
import appdirs
import numpy as np
from PIL import Image, ImageEnhance, UnidentifiedImageError
from moviepy import *
from moviepy import VideoFileClip, clips_array
p_codec = "libx264"
# p_preset = "veryslow"
p_preset = "medium"
p_bitrate = "2000k"
p_threads = 24
p_crf = "22"
p_pix_fmt = "yuv444p"
p_tune = "animation"
p_vf = ""
# p_vf = ""
def merge_videos_side_by_side(video_a_path, video_b_path, output_path):
# 加载两段视频
clip_a = VideoFileClip(video_a_path)
clip_b = VideoFileClip(video_b_path)
# 将两段视频按左右排列拼接
final_clip = clips_array([[clip_a, clip_b]])
if p_vf == "":
params = ["-crf", p_crf, "-pix_fmt", p_pix_fmt, "-tune", p_tune]
else:
params = ["-vf", p_vf, "-crf", p_crf, "-pix_fmt", p_pix_fmt, "-tune", p_tune]
# 输出合并后的视频
final_clip.write_videofile(output_path, codec=p_codec, fps=clip_a.fps, bitrate=p_bitrate, preset=p_preset,
threads=p_threads, ffmpeg_params=params)
print(f"合并完成,已保存到:{output_path}")
# 确保所有图像都兼容 RGB/RGBA 格式
def load_image_as_rgb(image_path):
image = Image.open(image_path)
if image.mode == "L": # 如果是灰度图像(单通道)
image = image.convert("RGB") # 转换为 RGB 格式
elif image.mode == "RGBA": # 如果是 RGBA 图像(带透明通道)
pass # 保持原样
else:
image = image.convert("RGB") # 其他模式也转为 RGB
return np.array(image)
def generate_vf_params(convert_fps_24=False, convert_to_hdr=False, enable_sharpening=False, enable_denoising=False,
enable_color_enhancement=False):
"""
根据参数生成 FFmpeg 的 -vf 滤镜参数字符串。
:param convert_to_hdr: 是否转换为 HDR(True 或 False)
:param enable_sharpening: 是否开启锐化(True 或 False)
:param enable_denoising: 是否开启降噪(True 或 False)
:param enable_color_enhancement: 是否开启色彩增强(True 或 False)
:return: 完整的 -vf 参数字符串
"""
filters = []
# HDR 转换
if convert_to_hdr:
hdr_filter = (
"zscale=matrixin=bt709:matrix=bt709:t=bt709:m=bt709:r=tv,tonemap=tonemap=hable:desat=0"
)
filters.append(hdr_filter)
if convert_fps_24:
to24 = (
"minterpolate=mi_mode=mci:mc_mode=aobmc:vsbmc=1:me_mode=bidir:me=epzs:fps=24"
)
filters.append(to24)
# 降噪
if enable_denoising:
denoise_filter = "hqdn3d=4:3:6:4" # 去噪参数
filters.append(denoise_filter)
# 锐化
if enable_sharpening:
sharpen_filter = "unsharp=lx=5:ly=5:la=1.5" # 锐化参数
filters.append(sharpen_filter)
# 色彩增强
if enable_color_enhancement:
color_enhance_filter = "eq=contrast=1.1:saturation=1.1" # 色彩增强参数
filters.append(color_enhance_filter)
# 如果没有启用任何滤镜,返回空字符串
if not filters:
return ""
# 将所有滤镜用逗号连接成一个完整的 -vf 参数字符串
vf_params = ",".join(filters)
return vf_params
def png2mp4(output_video, fps2, png_dir, target_fps: int = 1000):
"""
将 PNG 序列转换为 MP4 视频文件。
参数:
output_video (str): 输出视频文件路径。
fps2 (int): 输入帧率。
png_dir (str): 包含 PNG 文件的目录路径。
target_fps (int): 目标帧率。如果 fps2 > target_fps,则抽帧到目标帧率;否则保持原帧率。
"""
# 确保帧率不低于 12
fps = max(fps2, 12)
# 如果 fps 大于目标帧率,则设置为目标帧率
if fps > target_fps:
fps = target_fps
# 获取所有 PNG 文件并按名称排序
png_files = sorted(glob.glob(f"{png_dir}/*.png"))
if not png_files:
raise ValueError("未找到任何 PNG 文件,请检查 png_dir 路径是否正确。")
# 加载 PNG 序列帧并确保兼容性
image_arrays = [load_image_as_rgb(png_file) for png_file in png_files]
# 计算总帧数和期望帧数
total_frames = len(image_arrays)
print(f"抽帧前帧数:{total_frames}")
expected_frames = int(total_frames * (target_fps / fps2))
# 如果需要抽帧,创建新的图像序列
if fps < fps2:
step = total_frames / expected_frames # 计算精确的抽帧步长
image_arrays = [image_arrays[int(i * step)] for i in range(expected_frames)]
final_frames = len(image_arrays)
print(f"抽帧后帧数:{final_frames}")
# 创建图像序列剪辑
image_clip = ImageSequenceClip(image_arrays, fps=fps)
# 创建纯色背景
background_color = (230, 230, 230) # 背景颜色(浅灰色)
background = ColorClip(
size=image_clip.size, # 背景尺寸与图像帧一致
color=background_color, # 背景颜色
duration=image_clip.duration # 持续时间与图像帧一致
)
# 将 PNG 帧叠加到背景上(透明区域会被背景填充)
final_clip = CompositeVideoClip(
[background, image_clip.with_position("center")] # 背景在下,PNG 帧居中
)
if p_vf == "":
params = ["-crf", p_crf, "-pix_fmt", p_pix_fmt, "-tune", p_tune]
else:
params = ["-vf", p_vf, "-crf", p_crf, "-pix_fmt", p_pix_fmt, "-tune", p_tune]
print(f"开始转码,帧率: {fps}")
final_clip.write_videofile(
output_video,
codec=p_codec, # 使用 libx264rgb 编解码器尝试无损编码
fps=fps,
preset=p_preset, # 使用较快的预置设置
threads=p_threads,
bitrate=p_bitrate,
ffmpeg_params=params
)
def get_webp_duration(file_path):
try:
# 执行webpinfo命令并捕获输出
result = subprocess.run(
['webpinfo', '-animation', file_path],
capture_output=True,
text=True,
check=True
)
# 正则匹配Duration字段
return parse_anmf_data(result.stdout)
except subprocess.CalledProcessError as e:
print(f"解析失败:{e.stderr}")
return []
# 示例调用
def parse_anmf_data(webp_info):
anmf_pattern = re.compile(
r"Chunk ANMF.*?Width:\s+(\d+)\s+Height:\s+(\d+)\s+Duration:\s+(\d+)",
re.DOTALL
)
# 查找所有匹配的 ANMF 块
matches = anmf_pattern.findall(webp_info)
# 将匹配结果转换为字典并存储在列表中
result = []
for match in matches:
width, height, duration = match
result.append({
"width": int(width),
"height": int(height),
"duration": int(duration)
})
return result
# 打开 WebP 动画文件并解析为 PNG 序列帧
def webp2png(webp_file, dir_file):
# 确保目标目录存在
if Path(dir_file).exists(): # 不重新生成png,只生成一次
return True
os.makedirs(dir_file, exist_ok=True)
# 打开 WebP 文件
img = Image.open(webp_file)
# 检查是否是动画
if (not getattr(img, "is_animated", False)) | (img.n_frames <= 1):
print("这不是一个动画 WebP 文件")
return False
else:
# 遍历每一帧
for i in range(img.n_frames):
img.seek(i) # 移动到第 i 帧
frame = img.copy() # 复制当前帧
width, height = frame.size
# 检查宽度和高度是否为偶数,如果不是,则调整大小
if width % 2 != 0:
width -= 1
if height % 2 != 0:
height -= 1
# 调整图片大小
if width < img.size[0] or height < img.size[1]:
frame = frame.crop((0, 0, width, height))
# 构造保存路径
frame_path = os.path.join(dir_file, f"frame_{i:03d}.png")
# 保存为 PNG 文件
frame.save(frame_path, "PNG")
# enhance_png(frame_path)
return True
def enhance_png(png_file_path):
# 打开原始图片
image = Image.open(png_file_path)
# 创建一个ImageEnhance对象,这里选择增强饱和度
enhancer = ImageEnhance.Color(image)
# 提高饱和度(增强因子大于1.0会增加饱和度)
factor = 1.5 # 设定增强因子,可以根据需要调整
enhanced_image = enhancer.enhance(factor)
# 保存调整后的图片
enhanced_image.save(png_file_path)
def get_fps(webp_file):
webp_infos = get_webp_duration(webp_file)
duration = 0
for webp_info in webp_infos:
duration += webp_info['duration']
total_frame_count = len(webp_infos)
print(f"总时长:{duration}")
return math.ceil((total_frame_count / duration * 1000)) + 1
def convert_png_alpha_2_gray(png_file_path, save_file_path):
# 打开PNG图片
image = Image.open(png_file_path)
# 确保图片有alpha通道
if image.mode in ("RGBA", "LA") or (image.mode == "P" and "transparency" in image.info):
# 提取alpha通道
alpha = image.getchannel("A")
# 将alpha通道保存为灰度图片
alpha.save(save_file_path)
else:
print("该图片没有alpha通道")
# 把普通的png转成单通道只包含alpha的png
def convert_to_grayscale(input_folder, output_folder):
# 确保输出文件夹存在
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 遍历输入文件夹中的所有文件
for filename in os.listdir(input_folder):
# 检查文件是否是PNG格式
if filename.lower().endswith(".png"):
file_path = os.path.join(input_folder, filename)
output_path = os.path.join(output_folder, filename)
convert_png_alpha_2_gray(file_path, output_path)
def check_file_or_dir(userinput, output_name: str = None):
# 检查路径是否存在
if not os.path.exists(userinput):
print(f"路径不存在:{userinput}")
return
# 如果是目录,递归处理目录中的所有文件
if os.path.isdir(userinput):
for dirpath, _, filenames in os.walk(userinput):
for filename in filenames:
full_path = os.path.join(dirpath, filename)
process_webp_file(full_path, output_name=output_name)
# 如果是文件,直接处理
elif os.path.isfile(userinput):
process_webp_file(userinput, output_name=output_name)
else:
print(f"路径既不是文件也不是目录:{userinput}")
def process_webp_file(file_path, output_name: str = None):
if is_webp_animator(file_path):
print(f"正在处理 WebP 文件:{file_path}")
webp2mp4core(file_path, output_name=output_name)
else:
print(f"跳过非 WebP 文件:{file_path}")
def is_webp_animator(file_path):
try:
with Image.open(file_path) as img:
if img.format == 'WEBP':
return True
except Exception:
print("e")
return False
# 调用函数
def delete_path(path):
"""
删除指定路径的文件或目录
参数:
path (str): 要删除的文件/目录路径
返回:
bool: 删除成功返回True,失败返回False
"""
try:
if not os.path.exists(path):
print(f"路径不存在: {path}")
return False
if os.path.isfile(path):
os.remove(path)
print(f"已删除文件: {path}")
elif os.path.isdir(path):
shutil.rmtree(path)
print(f"已删除目录: {path}")
else:
print(f"未知路径类型: {path}")
return False
return True
except PermissionError:
print(f"权限不足,无法删除: {path}")
except Exception as e:
print(f"删除过程中发生未知错误: {str(e)}")
return False
# 初始化累计大小变量
webp_size_total = 0
mp4_size_total = 0
def get_total_size(input_webp_path_str):
global webp_size_total, mp4_size_total
# 原始webp文件的路径对象
webp_path = Path(input_webp_path_str)
# 最终合并视频的输出路径保持不变,仍然位于原始webp文件所在目录
output_merge_video_path = webp_path.parent / f"{webp_path.stem}.mp4"
if Path(output_merge_video_path).exists():
# 如果mp4文件已经生成,则获取其大小并累加到总大小中
mp4_size = output_merge_video_path.stat().st_size
mp4_size_total += mp4_size
# 获取webp文件大小并累加到总大小中
webp_size = webp_path.stat().st_size
webp_size_total += webp_size
print(f"计入大小:webp:{webp_path}")
print(f"计入大小:mp4:{output_merge_video_path}")
def webp2mp4core(input_webp_path_str, output_name: str = None):
print(f"开始将{input_webp_path_str}转成成mp4")
# 原始webp文件的路径对象
webp_path = Path(input_webp_path_str)
# 最终合并视频的输出路径保持不变,仍然位于原始webp文件所在目录
if output_name is None:
output_merge_video_path = webp_path.parent / f"{webp_path.stem}.mp4"
else:
output_merge_video_path = webp_path.parent / f"{webp_path.stem}-{output_name}.mp4"
if Path(output_merge_video_path).exists():
print(f"文件{output_merge_video_path}已存在跳过生成")
return
# 在临时目录下创建对应的临时文件夹路径
# 获取系统临时目录
temp_dir = Path(tempfile.gettempdir()) / "convert_webp4"
temp_png_dir = temp_dir / webp_path.stem
temp_png_alpha_dir = temp_dir / f"alpha_{webp_path.stem}"
# 使用更新后的临时文件夹路径进行后续操作
if not webp2png(input_webp_path_str, temp_png_dir):
return
print(f"已经{input_webp_path_str}转成成png帧,保存在{temp_png_dir}")
convert_to_grayscale(temp_png_dir, temp_png_alpha_dir)
print(f"已经{input_webp_path_str}转成成透明灰度帧,保存在{temp_png_alpha_dir}")
# 生成临时输出视频文件路径
temp_output_alpha_video_path = temp_dir / f"{webp_path.stem}-alpha.mp4"
temp_normal_output_video_path = temp_dir / f"{webp_path.stem}-normal.mp4"
# 获取fps并转换视频
fps = get_fps(input_webp_path_str)
# global p_vf
# if fps >= 24:
# p_vf = ""
print(f"获取到当前webp的帧数是{fps}")
png2mp4(temp_normal_output_video_path, fps, temp_png_dir)
print(f"已经将正常帧数据转成视频保存在{temp_normal_output_video_path}")
png2mp4(temp_output_alpha_video_path, fps, temp_png_alpha_dir)
print(f"已经将灰度帧数据转成灰度视频保存在{temp_output_alpha_video_path}")
# 合并视频并清理临时文件
merge_videos_side_by_side(temp_output_alpha_video_path, temp_normal_output_video_path, output_merge_video_path)
print(f"两个视频合并完成,保存在{output_merge_video_path},开始删除临时文件")
# delete_path(temp_output_alpha_video_path)
# delete_path(temp_png_dir)
# delete_path(temp_normal_output_video_path)
# delete_path(temp_png_alpha_dir)
# 注意:这里假设其他所需函数(如webp2png, convert_to_grayscale, png2mp4, get_fps, merge_videos_side_by_side, delete_path)已经定义。
def convert_png_dir():
png_dir = input("请输入序列帧所在目录")
"""
检查指定目录是否存在且包含PNG文件。
参数:
png_dir (str): 要检查的目录路径。
"""
if os.path.exists(png_dir) and os.path.isdir(png_dir):
# 获取目录下的所有文件名
files = os.listdir(png_dir)
# 过滤出PNG文件并检查至少有一个PNG文件
png_files = [f for f in files if f.lower().endswith('.png')]
if png_files:
fps = input("请输入转换的fps")
base_dir = os.path.dirname(png_dir)
dir_name = os.path.basename(png_dir)
mp4_file_path = os.path.join(base_dir, dir_name + ".mp4")
png2mp4(output_video=mp4_file_path, fps=float(fps), png_dir=png_dir)
else:
print("目录里不存在序列帧,重新输入")
convert_png_dir()
else:
print("目录不存在")
convert_png_dir()
if __name__ == '__main__':
is_png_dir_convert = input("1.转换png序列帧到MP4 \n2.转换webp文件到MP4")
if is_png_dir_convert == '1':
convert_png_dir()
else:
inputDirOrFile = input("请输入文件或目录的路径:")
inputDirOrFile = inputDirOrFile.replace("\"", "")
# inputDirOrFile = "I:/test/test.webp"
start_time = time.time()
# p_ffmpeg_params = ["-crf", "1", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数1")
# p_ffmpeg_params = ["-crf", "2", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数2")
# p_ffmpeg_params = ["-crf", "3", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数3")
# p_ffmpeg_params = ["-crf", "5", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数5")
# p_ffmpeg_params = ["-crf", "7", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数7")
# p_ffmpeg_params = ["-crf", "9", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数9")
# p_ffmpeg_params = ["-crf", "12", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数12")
# p_ffmpeg_params = ["-crf", "15", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数15")
# p_ffmpeg_params = ["-crf", "18", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数18")
# p_ffmpeg_params = ["-crf", "20", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数20")
p_vf = generate_vf_params(convert_fps_24=False, convert_to_hdr=False, enable_sharpening=False,
enable_denoising=True, enable_color_enhancement=False)
print(f"pvf={p_vf}")
check_file_or_dir(inputDirOrFile, "降噪")
# p_vf = ""
# check_file_or_dir(inputDirOrFile, "不插帧")
# p_codec = "libx264"
# p_preset = "ultrafast"
# check_file_or_dir(inputDirOrFile, "ultrafast")
# p_ffmpeg_params = ["-crf", "24", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数24")
# p_ffmpeg_params = ["-crf", "26", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数26")
# p_ffmpeg_params = ["-crf", "28", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数28")
# p_ffmpeg_params = ["-crf", "30", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数30")
# p_ffmpeg_params = ["-crf", "32", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数32")
# p_ffmpeg_params = ["-crf", "34", "-pix_fmt", "yuv444p", "-tune", "animation"]
# check_file_or_dir(inputDirOrFile, "质量参数34")
print(f"耗时:{time.time() - start_time}秒")
print(f"mp4大小:{mp4_size_total}")
print(f"webp大小:{webp_size_total}")
import os.path
import convert_webp_2_mp4
def rename_webp_file(webp_file_dir):
# 检查输入路径是否为目录
if os.path.isdir(webp_file_dir):
# 遍历目录及其子目录中的所有文件
for dirpath, _, filenames in os.walk(webp_file_dir):
for filename in filenames:
# 获取文件完整路径
full_path = os.path.join(dirpath, filename)
# 检查文件是否是 WebP 动画文件
if convert_webp_2_mp4.is_webp_animator(full_path):
# 获取文件的 FPS 值
fps = int(convert_webp_2_mp4.get_fps(full_path))
# 如果成功获取到 FPS 值
if fps is not None:
# 分离文件名和扩展名
base_name, ext = os.path.splitext(filename)
# 构造新的文件名
new_filename = f"{base_name}_fps_{fps}{ext}"
# 构造新的完整路径
new_full_path = os.path.join(dirpath, new_filename)
# 重命名文件
os.rename(full_path, new_full_path)
print(f"Renamed: {full_path} -> {new_full_path}")
else:
print(f"Failed to get FPS for file: {full_path}")
else:
print(f"The provided path is not a directory: {webp_file_dir}")
if __name__ == '__main__':
inputDirOrFile = input("请输入文件或目录的路径:")
inputDirOrFile = inputDirOrFile.replace("\"", "")
rename_webp_file(inputDirOrFile)
from dataclasses import dataclass, field
from typing import List, Optional
# 假设你已经有了json_data这个变量,它包含了你的JSON字符串
import json
@dataclass
class Gift:
darkMd5: str
darkRes: str
gid: int
md5: str
mp4Res: str
resource: str
weight: int
zipMd5: Optional[str] = ""
zipRes: Optional[str] = ""
@dataclass
class Response:
msg: str
code: int
data: List[Gift]
md5: str
def json_to_entity(json_str: str) -> Response:
# 将JSON字符串解析成字典
parsed_dict = json.loads(json_str)
# 提取"data"部分并转换为Gift实例列表
gifts = [Gift(**gift_data) for gift_data in parsed_dict['data']]
# 创建Response实例
response = Response(
msg=parsed_dict['msg'],
code=parsed_dict['code'],
data=gifts,
md5=parsed_dict['md5']
)
return response
import requests
import json
import os
from urllib.parse import unquote
from pathlib import Path
from urllib.parse import urlparse
import concurrent.futures
from gift_parser import json_to_entity
is_release_api = True
test_host = "https://ceshi.yabolive.tv/apishare"
release_host = "https://ceshi.yabolive.tv/apishare"
cdn_host = "https://zhibocdn.yabolive.net/comm"
def firefly_login(phone, password):
url = f"{get_host()}/user/login.html?cancel=0&prchannel=&localecode=zh-Hans-CN&countrycode=86&advertisingId=ea55a34e-9152-4ec8-b2b5-74cff51b4100&cert=RQsxRXO3pnOHpQswfQpHP9-Kfne5RuRWp7-xfhOTRuel&imsi=&pkg=android.happylive&appname=com.happy.live&driverid=4d5872cb5a783a22760dd6c7f9b90cad6&bootloader=S906U1UEU7EXK6&storeApp=1&model=SM-S906U1&net=wifi&lang=0&brand=samsung&localecountry=CN&os=android&larea=2&appversion=7700&manufactruer=samsung&phone={phone
}&imei=&osversion=14&pwd={password}&device=g0q&cid=fireflygw&debug=sendbox"
response = requests.get(url, )
print(f"Response Code:{response.status_code}")
print(f"Response Text:{response.text}")
if response.status_code == 200:
code = response.json()["code"]
msg = unquote(response.json()["msg"])
print(f"msg:{msg}")
if code == 1: # 登录成功,
save_token(phone, response.text)
uid = response.json()["uid"]
token = response.json()["token"]
return uid, token
return None, None
def read_token_uid(phone, password):
path = f"{phone}"
db = Path(path)
if db.exists():
# 文件存在,尝试读取token和uid
with open(path, 'r', encoding='utf-8') as file:
try:
data = json.load(file)
uid = data.get('uid')
token = data.get('token')
if uid and token:
print("Token and UID loaded from local file.")
return uid, token
except json.JSONDecodeError:
print("Failed to decode JSON from local file.")
# 如果没有找到有效的本地文件,或者文件中的数据无效,则登录并获取新token和uid
print("No valid local token/UID found, logging in...")
uid, token = firefly_login(phone, password)
return uid, token
def save_token(phone, json):
path = f"{phone}"
db = Path(path)
if db.exists():
db.unlink(missing_ok=True)
with open(path, 'w', encoding='utf-8') as file:
file.write(json)
def get_host():
if is_release_api:
return release_host
else:
return test_host
def request_url(api, phone, password):
url = get_host() + api + base_param(phone, password)
response = requests.get(url, )
print(f"Response Code:{response.status_code}")
# print(f"Response Text:{response.text}")
return response.text
def base_param(phone, password):
uid, token = read_token_uid(phone, password)
return f"?cancel=0&prchannel=&localecode=zh-Hans-CN&countrycode=86&advertisingId=ea55a34e-9152-4ec8-b2b5-74cff51b4100&cert=RQsxRXO3pnOHpQswfQpHP9-Kfne5RuRWp7-xfhOTRuel&imsi=&pkg=android.happylive&appname=com.happy.live&driverid=4d5872cb5a783a22760dd6c7f9b90cad6&bootloader=S906U1UEU7EXK6&storeApp=1&model=SM-S906U1&net=wifi&lang=0&brand=samsung&localecountry=CN&os=android&larea=2&appversion=7700&manufactruer=samsung&uid={uid
}&imei=&osversion=14&token={token}&device=g0q&cid=fireflygw&debug=sendbox"
def load_live_gift_list(phone, password):
live_gift = request_url(api="/setting/updatelivegift.html", phone=phone, password=password)
jsonobject = json.loads(live_gift)
live_gift_list = jsonobject["data"]
shechigifts = [shechigift for shechigift in live_gift_list if shechigift.get("category") == 1001]
# print(shechigifts)
json_text = request_url(api="/setting/updatespecialresource", phone=phone, password=password)
result = json_to_entity(json_text)
matching_elements = [shiechigift for shiechigift in shechigifts for resource in result.data if shiechigift.get("darkMd5") == resource.darkMd5]
# matching_elements = [resource for resource in result.data for shechigift in shechigifts if resource.darkMd5 == shechigift.get("darkMd5")]
print(matching_elements)
# 创建一个线程池,大小为5
# with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# # 对于result.data中的每一个gift,提交一个download_file任务到线程池
# futures = [executor.submit(download_file, cdn_host + gift.darkRes, "shechi_webp_dir", f"{gift.gid}-{gift.darkMd5}.webp") for
# gift in matching_elements]
#
# # 等待所有任务完成
# concurrent.futures.wait(futures)
#
# print("所有文件下载完成")
def load_gift_resource(phone, password):
json_text = request_url(api="/setting/updatespecialresource", phone=phone, password=password)
result = json_to_entity(json_text)
# 创建一个线程池,大小为5
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 对于result.data中的每一个gift,提交一个download_file任务到线程池
futures = [executor.submit(download_file, cdn_host + gift.darkRes, "webp_dir", f"{gift.gid}-{gift.darkMd5}") for
gift in result.data]
# 等待所有任务完成
concurrent.futures.wait(futures)
print("所有文件下载完成")
def download_file(url, directory, custom_filename=None):
print("开始下载" + url)
# 如果没有提供自定义文件名,则从URL中解析
if custom_filename is None:
a = urlparse(url)
filename = os.path.basename(a.path)
else:
# 使用提供的自定义文件名
filename = custom_filename
# 确保目录存在
Path(directory).mkdir(parents=True, exist_ok=True)
local_filename = os.path.join(directory, filename)
# 发送GET请求
response = requests.get(url, stream=True)
# 确保请求成功
response.raise_for_status()
# 写入文件到本地路径
with open(local_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # 过滤掉保持活动连接的空字节
f.write(chunk)
print("下载完成")
return local_filename
if __name__ == '__main__':
# uid, token = read_token_uid("13560573423", "123456a")
# print("token=" + token)
# load_gift_resource("13560573423", "123456a")
load_live_gift_list("15889845884", "123456a")
File added
class ObfuseTableBase64:
MAX_LENGTH = 5
MAX_CODE_TIME = 3
placeHolder = ord('l') # 'l'的ASCII码
# 初始化编码表和解码表
@classmethod
def __init__(cls):
# 初始化临时数组
tmp0 = [ord(c) for c in '09_-18634275#']
tmp1 = [0x41 + i for i in range(len(tmp0))]
tmp2 = [tmp1[-1] + i + 1 for i in range(len(tmp1))]
tmp3 = [0x61 + i for i in range(len(tmp1))]
tmp4 = [tmp3[-1] + i + 1 for i in range(len(tmp3))]
data2Arr = [
tmp3, # 0
tmp0, # 1
tmp1, # 2
tmp4, # 3
tmp2 # 4
]
# 生成65字符的中间表
data65 = []
for i in range(5 * len(tmp0)):
quotient, remainder = divmod(i, 5)
data65.append(data2Arr[remainder][quotient])
# 生成64字符的编码表
idx = data65.index(cls.placeHolder) if cls.placeHolder in data65 else 0
cls.encodingTable = data65[:idx] + data65[idx + 1:] if idx < len(data65) else data65[:64]
# 生成解码表
cls.decodingTable = [-1] * 256
for i, char in enumerate(cls.encodingTable):
cls.decodingTable[char] = i
@classmethod
def encode(cls, data):
if isinstance(data, str):
data = data.encode('utf-8')
modulus = len(data) % 3
output_length = (len(data) // 3) * 4 + (4 if modulus else 0)
encoded = bytearray(output_length)
for i in range(0, len(data) - modulus, 3):
byte1, byte2, byte3 = data[i], data[i + 1], data[i + 2]
encoded[i // 3 * 4] = cls.encodingTable[(byte1 >> 2) & 0x3F]
encoded[i // 3 * 4 + 1] = cls.encodingTable[((byte1 << 4) | (byte2 >> 4)) & 0x3F]
encoded[i // 3 * 4 + 2] = cls.encodingTable[((byte2 << 2) | (byte3 >> 6)) & 0x3F]
encoded[i // 3 * 4 + 3] = cls.encodingTable[byte3 & 0x3F]
# 处理余数情况
if modulus == 1:
byte = data[-1]
encoded[-4] = cls.encodingTable[(byte >> 2) & 0x3F]
encoded[-3] = cls.encodingTable[((byte << 4) & 0x3F)]
encoded[-2:] = [cls.placeHolder] * 2
elif modulus == 2:
byte1, byte2 = data[-2], data[-1]
encoded[-4] = cls.encodingTable[(byte1 >> 2) & 0x3F]
encoded[-3] = cls.encodingTable[((byte1 << 4) | (byte2 >> 4)) & 0x3F]
encoded[-2] = cls.encodingTable[((byte2 << 2) & 0x3F)]
encoded[-1] = cls.placeHolder
return encoded.decode('utf-8')
@classmethod
def decode(cls, data):
if isinstance(data, str):
data = data.encode('utf-8')
data = cls.discard_non_base64_bytes(data)
modulus = 0
if data[-2] == cls.placeHolder:
modulus = 1
elif data[-1] == cls.placeHolder:
modulus = 2
output_length = ((len(data) // 4) - 1) * 3 + (4 - modulus)
decoded = bytearray(output_length)
for i in range(0, len(data) - 4, 4):
b1 = cls.decodingTable[data[i]]
b2 = cls.decodingTable[data[i + 1]]
b3 = cls.decodingTable[data[i + 2]]
b4 = cls.decodingTable[data[i + 3]]
decoded[i // 4 * 3] = (b1 << 2) | (b2 >> 4)
decoded[i // 4 * 3 + 1] = (b2 << 4) | (b3 >> 2)
decoded[i // 4 * 3 + 2] = (b3 << 6) | b4
# 处理末尾
if modulus == 1:
b1 = cls.decodingTable[data[-4]]
b2 = cls.decodingTable[data[-3]]
decoded[-1] = (b1 << 2) | (b2 >> 4)
elif modulus == 2:
b1 = cls.decodingTable[data[-4]]
b2 = cls.decodingTable[data[-3]]
b3 = cls.decodingTable[data[-2]]
decoded[-2] = (b1 << 2) | (b2 >> 4)
decoded[-1] = (b2 << 4) | (b3 >> 2)
return decoded.decode('utf-8')
@classmethod
def discard_non_base64_bytes(cls, data):
return bytearray([b for b in data if cls.is_valid_base64_byte(b)])
@classmethod
def is_valid_base64_byte(cls, byte):
if byte == cls.placeHolder:
return True
if byte < 0 or byte >= 128:
return False
return cls.decodingTable[byte] != -1
@classmethod
def obfuse_base64(cls, decode_str):
if not (decode_str.startswith('{{') and decode_str.endswith('}}')):
if decode_str.startswith('"') and decode_str.endswith('"'):
decode_str = decode_str[1:-1]
try:
decode_str = cls.decode(decode_str)
except Exception as e:
print(f"解码错误: {e}")
return decode_str
\ No newline at end of file
import av
import numpy as np
from PIL import Image
import os
def png_to_webm_with_alpha(input_folder, output_file, fps=12):
# 获取所有PNG文件并按名称排序
png_files = sorted([f for f in os.listdir(input_folder) if f.endswith('.png')])
if not png_files:
raise ValueError("No PNG files found in the input folder")
first_image = Image.open(os.path.join(input_folder, png_files[0]))
width, height = first_image.size
# 创建输出容器和视频流
container = av.open(output_file, mode='w')
stream = container.add_stream('libvpx-vp9', rate=fps)
stream.pix_fmt = 'yuva420p'
stream.width = width
stream.height = height
# 设置编码器参数
stream.options = {
'quality': 'good',
'crf': '35',
'auto-alt-ref': '0',
}
for png_file in png_files:
image_path = os.path.join(input_folder, png_file)
image = Image.open(image_path).convert('RGBA')
frame_data = np.array(image)
# 创建AVFrame并填充数据
frame = av.VideoFrame.from_ndarray(frame_data, format='rgba')
# 编码并写入
for packet in stream.encode(frame):
container.mux(packet)
# 刷新编码器(修复此处:传入 None 表示结束)
for packet in stream.encode(None): # <-- 更安全的刷新方式
container.mux(packet)
container.close()
if __name__ == "__main__":
input_folder = "pngs"
output_file = "output.webm"
png_to_webm_with_alpha(input_folder, output_file, fps=12)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment