http_request.py
8.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import requests
import json
import os
from urllib.parse import unquote
from pathlib import Path
from urllib.parse import urlparse
import concurrent.futures
from numpy.matlib import empty
from gift_parser import json_to_entity
is_release_api = True
test_host = "https://ceshi.yabolive.tv/apishare"
release_host = "https://zhibo.yazhaiyabo.com"
cdn_host = "https://big.bgp.ourpow.com/comm"
def firefly_login(phone, password):
url = f"{get_host()}/user/login.html?cancel=0&prchannel=&localecode=zh-Hans-CN&countrycode=86&advertisingId=ea55a34e-9152-4ec8-b2b5-74cff51b4100&cert=RQsxRXO3pnOHpQswfQpHP9-Kfne5RuRWp7-xfhOTRuel&imsi=&pkg=android.happylive&appname=com.happy.live&driverid=4d5872cb5a783a22760dd6c7f9b90cad6&bootloader=S906U1UEU7EXK6&storeApp=1&model=SM-S906U1&net=wifi&lang=0&brand=samsung&localecountry=CN&os=android&larea=2&appversion=7700&manufactruer=samsung&phone={phone
}&imei=&osversion=14&pwd={password}&device=g0q&cid=fireflygw&debug=sendbox"
response = requests.get(url, )
print(f"Response Code:{response.status_code}")
print(f"Response Text:{response.text}")
if response.status_code == 200:
code = response.json()["code"]
msg = unquote(response.json()["msg"])
print(f"msg:{msg}")
if code == 1: # 登录成功,
save_token(phone, response.text)
uid = response.json()["uid"]
token = response.json()["token"]
return uid, token
return None, None
def get_motoring_resource(phone, password):
data = json.loads(request_url("/setting/updateMotoring", phone, password))["data"]
# 创建一个线程池,大小为5
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# mp3_list = [motoring for motoring in data if motoring["audioresource"]!=""]
# # 对于result.data中的每一个gift,提交一个download_file任务到线程池
# futures = [executor.submit(download_file, cdn_host + motoring["audioresource"], f"motoring/motoring_dir_{motoring["mid"]}",
# f"{motoring["mid"]}-{motoring["darkMd5"]}") for
# motoring in mp3_list]
# # 等待所有任务完成
# concurrent.futures.wait(futures)
webp_list = [motoring for motoring in data if motoring["darkRes"] != ""]
# 对于result.data中的每一个gift,提交一个download_file任务到线程池
futures = [executor.submit(download_file, cdn_host + motoring["darkRes"], f"motoring/motoring",
f"{motoring["mid"]}-{motoring["darkMd5"]}") for
motoring in webp_list]
# 等待所有任务完成
concurrent.futures.wait(futures)
def read_token_uid(phone, password):
path = f"{phone}"
db = Path(path)
if db.exists():
# 文件存在,尝试读取token和uid
with open(path, 'r', encoding='utf-8') as file:
try:
data = json.load(file)
uid = data.get('uid')
token = data.get('token')
if uid and token:
print("Token and UID loaded from local file.")
return uid, token
except json.JSONDecodeError:
print("Failed to decode JSON from local file.")
# 如果没有找到有效的本地文件,或者文件中的数据无效,则登录并获取新token和uid
print("No valid local token/UID found, logging in...")
uid, token = firefly_login(phone, password)
return uid, token
def save_token(phone, json):
path = f"{phone}"
db = Path(path)
if db.exists():
db.unlink(missing_ok=True)
with open(path, 'w', encoding='utf-8') as file:
file.write(json)
def get_host():
if is_release_api:
return release_host
else:
return test_host
def request_url(api, phone, password):
url = get_host() + api + base_param(phone, password)
response = requests.get(url, )
print(f"Response Code:{response.status_code}")
print(f"请求的url:{url}")
print(f"Response Text:{response.text}")
return response.text
def base_param(phone, password):
uid, token = read_token_uid(phone, password)
return f"?cancel=0&prchannel=&localecode=zh-Hans-CN&countrycode=86&advertisingId=ea55a34e-9152-4ec8-b2b5-74cff51b4100&cert=RQsxRXO3pnOHpQswfQpHP9-Kfne5RuRWp7-xfhOTRuel&imsi=&pkg=android.happylive&appname=com.happy.live&driverid=4d5872cb5a783a22760dd6c7f9b90cad6&bootloader=S906U1UEU7EXK6&storeApp=1&model=SM-S906U1&net=wifi&lang=0&brand=samsung&localecountry=CN&os=android&larea=2&appversion=7700&manufactruer=samsung&uid={uid
}&imei=&osversion=14&token={token}&device=g0q&cid=fireflygw&debug=sendbox"
def load_live_gift_list(phone, password):
live_gift = request_url(api="/setting/updatelivegift.html", phone=phone, password=password)
jsonobject = json.loads(live_gift)
live_gift_list = jsonobject["data"]
shechigifts = [shechigift for shechigift in live_gift_list if shechigift.get("category") == 1001]
# print(shechigifts)
json_text = request_url(api="/setting/updatespecialresource", phone=phone, password=password)
result = json_to_entity(json_text)
matching_elements = [shiechigift for shiechigift in shechigifts for resource in result.data if
shiechigift.get("darkMd5") == resource.darkMd5]
# matching_elements = [resource for resource in result.data for shechigift in shechigifts if resource.darkMd5 == shechigift.get("darkMd5")]
print(matching_elements)
# 创建一个线程池,大小为5
# with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# # 对于result.data中的每一个gift,提交一个download_file任务到线程池
# futures = [executor.submit(download_file, cdn_host + gift.darkRes, "shechi_webp_dir", f"{gift.gid}-{gift.darkMd5}.webp") for
# gift in matching_elements]
#
# # 等待所有任务完成
# concurrent.futures.wait(futures)
#
# print("所有文件下载完成")
def load_gift_resource(phone, password):
json_text = request_url(api="/setting/updatespecialresource", phone=phone, password=password)
result = json_to_entity(json_text)
# 创建一个线程池,大小为5
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 对于result.data中的每一个gift,提交一个download_file任务到线程池
futures = [executor.submit(download_file, cdn_host + gift.darkRes, "webp_dir", f"{gift.gid}-{gift.darkMd5}") for
gift in result.data]
# 等待所有任务完成
concurrent.futures.wait(futures)
print("所有文件下载完成")
def get_filename_from_url(url):
"""
从给定的URL中解析出文件名(包括扩展名)
:param url: 文件的下载地址
:return: 文件名(包括扩展名)
"""
# 解析URL
parsed_url = urlparse(url)
# 获取路径部分,并分割成各个组成部分
path = parsed_url.path
# 提取最后一部分作为文件名,并处理可能的编码问题
filename = path.split('/')[-1]
# 处理URL中可能出现的百分号编码
return unquote(filename)
def download_file(url, directory, custom_filename=None):
# 如果没有提供自定义文件名,则从URL中解析
if custom_filename is None:
filename = get_filename_from_url(url)
else:
# 使用提供的自定义文件名
filename = custom_filename
# 然后从文件名中解析出扩展名
extension = os.path.splitext(get_filename_from_url(url))[1]
filename = f"{filename}{extension}"
print(f"文件:{filename}开始下载:{url}")
# 确保目录存在
Path(directory).mkdir(parents=True, exist_ok=True)
local_filename = os.path.join(directory, filename)
# 发送GET请求
response = requests.get(url, stream=True)
# 确保请求成功
response.raise_for_status()
# 写入文件到本地路径
with open(local_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # 过滤掉保持活动连接的空字节
f.write(chunk)
print("下载完成")
return local_filename
if __name__ == '__main__':
uid, token = read_token_uid("13560573423", "123456a")
# print("token=" + token)
# load_gift_resource("13560573423", "123456a")
# load_live_gift_list("15889845884", "123456a")
# get_motoring_resource("13560573423", "123456a")
# load_gift_resource("13560573423", "123456a")
# load_live_gift_list("")