举报器完成

This commit is contained in:
xhy 2025-03-30 22:49:37 +08:00
parent e2604067fe
commit 06b8db7ecc
23 changed files with 17264 additions and 105 deletions

View File

@ -1,8 +1,15 @@
# baidu-reporter
- crawl_engine:
- 从任务队列中获取待爬取的 URL打开百度搜索收集链接结果存入 surl 表中
- evidence_engine:
- 轮询 surl 表,检查还没有收集证据的 surl开始收集证据
- reporter_engine:
- 轮询 surl 表,找到没有被对应渠道举报过的 url开始去举报
使用方式:
```shell
# 采集模式采集指定关键字的URL列表直接存入数据库
python main.py --crawl www.yunzhiju.net
# 收集模式,收集所有待举报的链接的截图与 Token
python main.py --evidence
# 举报模式,干他丫的
# 可以通过参数决定使用哪个举报渠道
python main.py --report wap,pc,site
```

View File

@ -3,7 +3,7 @@ import sys
import os
import time
from app.engines.reporter import Reporter
from app.engines.report_engine import Reporter
from .config import load_config, AppConfig
from .engines.crawl_engine import CrawlEngine
@ -45,8 +45,9 @@ class MainApp:
# 添加运行模式参数
parser.add_argument(
"-m",
"--mode",
"--report",
const="pc,site,wap",
nargs="?",
help="指定运行模式pc/site/wap不指定则运行所有模式多个模式使用英文逗号分隔",
)
@ -63,15 +64,15 @@ class MainApp:
args = parser.parse_args()
# 处理模式参数
if args.mode:
modes = [m.strip() for m in args.mode.split(",")]
if args.report:
reports = [m.strip() for m in args.report.split(",")]
valid_modes = ["pc", "site", "wap"]
invalid_modes = [m for m in modes if m not in valid_modes]
invalid_modes = [m for m in reports if m not in valid_modes]
if invalid_modes:
parser.error(f'无效的运行模式: {", ".join(invalid_modes)}')
args.mode = modes
args.report = reports
else:
args.mode = ["pc", "site", "wap"]
args.report = ["pc", "site", "wap"]
# 检查输入的文件是否存在
# if not os.path.exists(args.file):
@ -94,6 +95,10 @@ class MainApp:
evidence = EvidenceEngine()
evidence.cli_start()
evidence.stop()
elif self.args.report:
reporter = Reporter(self.args.report)
reporter.cli_start()
reporter.stop()
def start_web(self):
"""开启 Web 模式"""

View File

@ -0,0 +1,65 @@
import threading
from loguru import logger
from .reporters.pc_reporter import PcReporter
from .reporters.site_reporter import SiteReporter
from .reporters.wap_reporter import WapReporter
class Reporter:
"""举报器,目前有三个渠道,以后可以继续扩展"""
def __init__(self, mode: list[str]):
# 初始化其他的 reporter
self.reporters = {
"pc": PcReporter(),
"wap": WapReporter(),
"site": SiteReporter(),
}
self.worker_thread = None
self.ev = threading.Event()
self.status = 1
self.mode = mode
def start(self):
self.worker_thread = threading.Thread(
target=self.worker, name="REPORTER", daemon=True
)
self.worker_thread.start()
def wait(self):
self.worker_thread.join()
def cli_start(self):
for mode in self.mode:
if mode == "pc":
self.reporters["pc"].run()
elif mode == "wap":
self.reporters["wap"].run()
elif mode == "site":
self.reporters["site"].run()
else:
logger.error(f"参数错误: {mode}")
continue
def stop(self):
self.status = 0
self.ev.set()
def worker(self):
while self.status:
for mode in self.mode:
if mode == "pc":
self.reporters["pc"].run()
elif mode == "wap":
self.reporters["wap"].run()
elif mode == "site":
self.reporters["site"].run()
else:
logger.error(f"参数错误: {mode}")
continue
self.ev.wait(60)

View File

@ -1,80 +0,0 @@
from loguru import logger
from sqlalchemy import Engine, select
from sqlmodel import Session
from app.utils.dp import DPEngine
from .reporters.pc_reporter import PcReporter
from .reporters.site_reporter import SiteReporter
from .reporters.wap_reporter import WapReporter
from ..models.report_urls import ReportUrlModel
class Reporter:
"""举报器,目前有三个渠道,以后可以继续扩展"""
def __init__(self, urls_file: str, mode: list[str], db_engine: Engine):
self.urls_file = urls_file
self.mode = mode
self.db_engine = db_engine
# 初始化其他的 reporter
self.reporters = {
"pc": PcReporter(),
"wap": WapReporter(),
"site": SiteReporter(),
}
# TODO 初始化 reporter 需要的公共数据比如headless chrome等等
# self.baseDP = DPEngine(is_wap=False, no_img=True)
def run(self):
"""开始k站"""
self.get_reports_data()
def get_reports_data(self):
"""获取举报数据:页面截图,举报链接等,并存到数据库中"""
urls = self.read_urls()
logger.info(f"从文件 {self.urls_file} 读取到 {len(urls)} 个 URL")
# 如果数据库中已经有了,就跳过
# 如果数据库中没有,去获取页面截图、举报链接
# 如果获取成功,就插入数据库
# 如果获取失败,就记录日志
dp = DPEngine(is_wap=False, no_img=True) # 用来截图的浏览器实例
with Session(self.db_engine) as session:
for url in urls:
# 检查是否已经存在,如果存在就跳过
stmt = select(ReportUrlModel).where(ReportUrlModel.surl == url)
report_url = session.exec(stmt).one_or_none()
logger.debug(f"查询 {url} 的结果: {report_url}")
if report_url:
continue
# 获取页面截图、举报链接
# 打开baidu搜索URL
tab = dp.browser.new_tab(url)
tab.wait(5)
if "未找到相关结果" in tab.html:
logger.info(f"SRUl {url} 搜索结果空")
continue
# 截一张图
# img_path = f"./imgs/{report_domain}/{md5_hash(report_url)}.png"
# tab.get_screenshot()
# 2. 点击举报按钮,获取举报链接
# 3. 打开 URL 截一张图
# 4. 存到数据库里
def read_urls(self) -> list[str]:
"""读取 urls 文件"""
urls: list[str] = []
with open(self.urls_file, "r") as fp:
for url in fp:
url = url.strip()
if not url:
continue
urls.append(url)
return urls

View File

@ -1,11 +1,360 @@
import os.path
import random
import time
from urllib.parse import urlparse, parse_qs
import execjs
import requests
from loguru import logger
from sqlmodel import Session, select
from .base import BaseReporter
from ...config.config import AppCtx
from ...models.report_urls import ReportUrlModel
from ...utils.common import get_proxies, get_all_cookies, md5, generate_random_phone_number
from ...utils.ua import random_ua
from ...utils.ydm_verify import YdmVerify
class PcReporter(BaseReporter):
def __init__(self):
pass
self.engine_name = "PC_REPORTER"
self.database = AppCtx.g_db_engine
self.upload_pic_url = "http://jubao.baidu.com/jubao/accu/upload"
self.report_url = "https://jubao.baidu.com/jubao/accu/submit"
self.proxies = get_proxies()
self.headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,zh-TW;q=0.6',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Origin': 'https://jubao.baidu.com',
'Pragma': 'no-cache',
'Referer': "",
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': random_ua(),
'X-Requested-With': 'XMLHttpRequest',
'sec-ch-ua_wap': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'sec-ch-ua_wap-mobile': '?0',
'sec-ch-ua_wap-platform': '"Windows"',
"Cookie": "",
}
def run(self):
"""实现 PC 端的举报逻辑"""
pass
with Session(self.database) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.is_report_by_one == False)
rows: list[ReportUrlModel] = session.exec(stmt).all()
logger.info(f"[{self.engine_name}] 共计 {len(rows)} 条记录需要举报")
for row in rows:
# 选个 cookie
report_cookie = random.choice(get_all_cookies())
self.headers["Cookie"] = report_cookie
logger.debug(f"cookie: {report_cookie}")
# 构造 referer
surl = row.surl
q = row.q
token = row.token
title = row.title
domain = row.domain
# timestamp_s = {int(time.time() * 1000)} # 这里为啥要用 set ?
timestamp_s = int(time.time() * 1000)
# referer = f"https://jubao.baidu.com/jubao/accu/?surl={surl}token={token}&title={title}&q={q}&has_gw=0&has_v=0&_t8={timestamp_s}"
referer = "https://jubao.baidu.com/"
logger.debug(f"referer: {referer}, type of referer: {type(referer)}")
self.headers["Referer"] = referer
# 检查图片是否存在
img_path = f"./imgs/{domain}/{md5(surl)}.png"
wap_img_path = f"./imgs/{domain}/{md5(surl)}-wap.png"
if not all((os.path.exists(img_path), os.path.exists(wap_img_path))):
logger.debug(f"图片{img_path}{wap_img_path} 不存在")
continue
# 上传图片
img_filename = self.upload_report_pic(img_path)
wap_img_filename = self.upload_report_pic(wap_img_path)
logger.debug(f"{img_filename=}, {wap_img_filename=}")
if not all((img_filename, wap_img_filename)):
logger.debug(f"图片 {img_path}{wap_img_path} 上传失败")
continue
# 提交举报
retry = 0
while retry < 3:
verify_result = self.verify_captcha(surl, token, title, q, timestamp_s)
# 校验失败了
if verify_result["op"] != 1:
logger.debug("验证码校验失败!")
retry += 1
continue
# 走到这里说明成功了
logger.info("验证码校验成功")
ds = verify_result["ds"]
tk = verify_result["tk"]
if self.do_report(ds, tk, surl, token, title, q, img_filename):
# 举报成功了,更新数据库即可
row.is_report_by_one = True
session.add(row)
session.commit()
break
retry += 1
def do_report(self, ds, tk, surl, token, title, q, upload=''):
try:
phone = generate_random_phone_number()
# logger.error(f"surl={surl}, title={title},token={token},q={q}")
data = {
'problemtype': '11001',
'keyword': q,
'title': title,
'token': token,
'surl': surl,
'url': '',
'isnatural': '1',
'hasGw': '0',
'hasV': '0',
'buzId': '1004',
'key': 'OoMZm9dghrzLTSLyPl25VcvLBuavUH%2ByvFz%2BPsgntmcXR%2FVwnRoDbJcyjTRtW3sYx0psN8bB%2FY1ZfrCne2tgnYIowsBLFE3mSnrUfZH5L64qYVPFHFKNUZG9Ihzj5pdQIXfNkR7D1qZ%2BH5gwXOSmhlh8BgGVNOWegqhKBcHaMKs%3D%23%23htD3CYLHqsqXvHybBgjJov8Wxel8EYKJdU%2FLY4f8NrclL7Hu0%2Feuv9EKveT1%2FJ1WO53ihO%2FonJmjI9GOD3x%2BVqhQxnTlxGoOjSSOhGW5X%2B3PaNfdzOQv0epktgY1G08HNICi9ftmBMShrFImYz0ihI%2B9PpalvA3QzKROQzF85xw%3D%23%23CO3Pb0NDSZfr7Z2LcWQMhLIEbBDYBcD0tjS5Vcxo0O2e8i7%2FEz9XApnigZn5kMewEb0B1FHumPsFEJjvrJ4HwyxZ5LKkeJrmfr7SOa8v59Y57eCtmlXmQ8mHH6ER6UF%2Fv0V8YXk%2FPkLWfZfopL1DFPEkNDkrBiFgAFKW1hakUDM%3D',
'description': f"{surl} 存在大量的淫秽色情信息被百度搜索引擎收录。用于给黄色网站引流。",
'upload': upload,
'phone': phone,
'email': f"{phone}@qq.com",
'mobile_net_type': '4g',
'tk': tk,
'ds': ds,
}
response = requests.post(
self.report_url,
headers=self.headers,
data=data,
allow_redirects=False,
proxies=self.proxies,
timeout=10
)
if response.json().get('status') == 0:
logger.success(f"{surl} 举报成功")
return True
elif response.json().get("status") == 4:
logger.warning(f"{surl} 重复举报")
return True
else:
logger.warning(f"{surl} 举报失败,{response.json()=}")
return False
except Exception as e:
logger.error(f'{e}')
return False
def upload_report_pic(self, img_path: str):
"""上传图片"""
try:
with open(img_path, "rb") as fp:
files = {
"upfile": (f"{int(time.time() * 1000)}.jpg", fp, "image/png")
}
data = {
"index": 0
}
response = requests.post(
"http://jubao.baidu.com/jubao/accu/upload",
files=files,
data=data,
proxies=self.proxies,
headers=self.headers,
timeout=10
)
json_data = response.json()
data = json_data["data"]
parsed_url = urlparse(data)
query_params = parse_qs(parsed_url.query)
filename = query_params.get("filename", [None])[0]
return filename
except Exception as e:
logger.error(f"[{self.engine_name}] 上传图片 {img_path} 失败,错误: {e}")
return None
def verify_captcha(self, surl, token, title, q, timestamp_s):
# 获取 as、tk 值
try:
get_as_tk = self.post_init(surl, token, title, q, timestamp_s)
get_as = get_as_tk['as']
get_tk = get_as_tk['tk']
# 获取验证码图片下载链接、backstr
get_style_result = self.get_style(get_tk, surl, token, title, q, timestamp_s)
get_backstr = get_style_result['backstr']
pic_download_link = get_style_result['captcha']
# 下载验证码图片
self.download_captcha(pic_download_link)
rotate_angle_rate = self.get_rotate_angle_rate()
# key = self.get_key(get_as)
get_ds_tk = self.post_log(get_as, get_tk, get_backstr, rotate_angle_rate)
log_ds = get_ds_tk['ds']
log_tk = get_ds_tk['tk']
log_op = get_ds_tk['op']
result = {
'ds': log_ds,
'tk': log_tk,
'op': log_op
}
return result
except Exception as e:
logger.error(f'{e}')
return {'op': 3}
def post_init(self, surl, token, title, q, timestamp_s):
try:
url = "https://passport.baidu.com/cap/init"
data = {
"_": int(time.time() * 1000),
"refer": f"http://jubao.baidu.com/jubao/accu/?surl={surl}&token={token}&title={title}&q={q}&has_gw=0&has_v=0&_t8={timestamp_s}",
"ak": self.get_ak(),
"ver": "2",
"scene": "",
"ds": "",
"tk": "",
"as": "",
"reinit": 0
}
# logger.debug(f"{self.headers=}")
# logger.debug(f"{data=}")
response = requests.post(url, headers=self.headers, data=data, proxies=self.proxies).json()
# logger.success(response)
# tk 不对 op=3
# as 不对验证不过
result_init = {
"as": response["data"]["as"],
"tk": response["data"]["tk"]
}
# logger.info(result_init)
return result_init
except Exception as e:
logger.error(f"[post_init]验证码识别失败{e}")
return None
def get_style(self, get_tk, surl, token, title, q, timestamp_s):
"""获取验证码图片下载链接、backstr"""
# "https://wappass.baidu.com/cap/style"
try:
url = "https://passport.baidu.com/cap/style"
data = {
"_": int(time.time() * 1000),
"refer": f"http://jubao.baidu.com/jubao/accu/?surl={surl}&token={token}&title={title}&q={q}&has_gw=0&has_v=0&_t8={timestamp_s}",
"ak": self.get_ak(),
"tk": get_tk,
"scene": "",
"isios": "0",
"type": "spin",
"ver": "2"
}
response = requests.post(url, headers=self.headers, data=data, proxies=self.proxies).json()
# logger.success(response)
# backstr 不对会报错存在安全风险
result = {
"backstr": response["data"]["backstr"],
"captcha": response["data"]["captchalist"][0]["source"]["back"]["path"],
# "spin-0": response["data"]['ext']['p']['q']['spin-0'],
# "c": response["data"]['ext']['p']['c']
}
return result
except Exception as e:
logger.error(f"[get_style] 验证码识别失败{e}")
return None
def download_captcha(self, img_url):
"""下载验证码图片"""
try:
img_download = requests.get(img_url, headers=self.headers)
with open('./captcha/captcha.png', 'wb') as pic:
pic.write(img_download.content)
logger.success('captcha download success!')
except Exception as e:
logger.error(f"[download_captcha] 验证码识别失败 {e}")
@staticmethod
def get_rotate_angle_rate():
identify_distance = YdmVerify()
with open('./captcha/captcha.png', 'rb') as p:
picture = p.read()
slide_distance = identify_distance.rotate(image=picture)
# 旋转角度为
# logger.info('rotate angle: ' + str(slide_distance))
rotate_angle_rate = round(slide_distance / 360, 2)
# logger.info('rotate angle rate: ' + str(rotate_angle_rate))
return rotate_angle_rate
@staticmethod
def get_key(get_as):
try:
with open('./js/mkd_v2_link_submit.js', 'r', encoding='utf-8') as f:
ds_js = f.read()
key = execjs.compile(ds_js).call('getNewKey', get_as)
return key
except Exception as e:
logger.error(f"[get_key]验证码识别失败 {e}")
def post_log(self, get_as, tk, back_str, rotate_angle_rate):
# logger.debug(get_as)
# logger.debug(tk)
# logger.debug(back_str)
# logger.debug(rotate_angle_rate)
try:
time_log = str(int(time.time() * 1000))
with open('./js/mkd_v2_link_submit.js', 'r', encoding='utf-8') as f:
ds_js = f.read()
# print(back_str)
fs = execjs.compile(ds_js).call('getFs2', back_str, rotate_angle_rate, get_as)
# logger.info('fs: %s' % fs)
# logger.info('fs length: %s' % len(fs))
url = "https://passport.baidu.com/cap/log"
data = {
"_": time_log,
"refer": "https://aigc.baidu.com/works",
"ak": self.get_ak(),
"as": get_as,
"scene": "",
"tk": tk,
"ver": "2",
"cv": "submit",
"typeid": "spin-0",
# fuid 短时间不会变, 指纹, 不同浏览器不一样
# "Edge": "FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGnCfjdYr2J6wEsox+bQtrTEGEimjy3MrXEpSuItnI4KDyOhCNLIvGcJ9TrqWJqhR97vnz96e18U/ntNdoDIMLzy/6P9rOWdIYWmTQAeeG69438PcpN++VzDmPtrURexo5YYWpVFkRs9k5n0AC3djzGRuXr1+yVZXtGGofFzxBmdr9HtaANtMMPysO2XXACLNUNkLLWJN9fLc3OAWce48tpeQQ2ufd7knx4Oo6OM0PpOVDwQcezbGX85VEmymh7f7M5kIyVl+w5yn2AY4BmLrEWEsyw9SzzW8eHQ5zYIUjiw9hXi7OMMoCL+ptOvZYbHZs0R5qLHmmDCW1M8MMX5yyJF0BV1dQvKslKnAJwZu4XCbsXKn3UXosU1U30/poiN2VeXkBPeo8+Xj/4BIoC2I7WZ6zkFa/Uwd5SvC91kvff2a/Z4OwyTQNM7ES9HmRhChdWg0SJ2xEs1aiXAit16RiTlf82esJH+X/j52G7R3ErwQeJT3QoDv64R2702+8NbGIjf1ZOfxhUCpmJqV4jeHSaHRmnKgJZsK91XhhrdJKXdsbt3phIOpxGLupULr2K+v1DNdId8/HuE0776+tTpUl7shVCeM/XWrdkhru42pifhiujnDhIblsLt8grnj5/GRqcD6ZPAXqJW3lLc0/ub9jXgvXK/EczRgKl+7/tTBkPTCrUVtajA0luHLQOrVsXuN1v0/PR3i09SuFzZJkJBKE3M6rYvPttK9NQiBxhxYWDhX82uQu2XK8+8oU3gxCIaJwsQmX/It0kaZ45PZHFqtD40uOX0sXuThvUin4N4RSI2G9d7jPkj5hbBFquQKM4S+tDJ34jmplOTrqqKT7PPVfrdgd4OkK13pEy86BsJ8M0gKXgtivUgM8Bjl1m/pkg0SuDyntWLdrmMxcZYvgySvSSwQ2Qtm8EkKHIMyR/XgfHnpX5vadGpRMro2qaE8u+x8w1gJHIRKib2u6Q1JtQiZE1Rde/vRx8xKfg6uYR37n0BvfgJE5+KbeuwCyAvJRGUA2fpt0VClIfV0m2PRG7bvH00OODKY6cFi7NgWAK6Jc1G4Ugkfp7W8I0ZYwNpTTxVoxIIBF37aBhyiPWPAOeYXBqA",
# Chrome: "FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49IZbsw3/U3NYEqD0LjhKzgMn8fIES5OyXlgwN5I+F8wHowpWWfXkQJw8/9AsO5Q2VOvnc2JlHGIlGS8Vq2z4OA80lVLon08EG3PPxkVZGm39fDi2exK9NDrZB+tNLX6ISxE5PzBgXpCOJ6oP9F1B0OBWaCMD/m01n8FhdDNCvP8EO5cetU79+pgL+ECRdtN6V4VElGJE0mxV4+4Zq4Jf/Xe/q8CkoTNf7Ti1glGYmN32UM9dg0uX+VzET/mmTRe4Dt+MuVHSzsI/bKCjPbpaOqfM8UsxDJUG9hyrGZ8QHa1kC04aTxkkTxI275dv3+ijS1zkWOdjFiy1eD/0R8HcRWYp2smk9EmXBkIAHL4H0gC9lQtdjey37/kyl4JA9Fp4zjuVO0arsD8MrGy1divU++B1KdawGqXpnbOcHZ3CctNGrpgmswaScc6DNWb34jFj0X3tdRE0uuHuqiYa5BClFS2V0TCorKi4CobgR419xWaX8IKLJiaNNLOShWdZdlQO2DXXVxcinzKHqUvWTYx45jsiUVlY78AHQGol6CJLQQ8Q797MShlazvdSwPXgJP5z0uMJp9L+3x/Y2GGhW5sit55sFuMXafALTYf69FCUw5+nVIRs150a4+KK+tA0Eu7Itiu3dM2pflKYWwPE6SDZznyejQ08vd+HpXRB/zhfSUcIYlT5gFEiMIA6SXZCo/XT7vC8D3gHdN+yr46XdVol/WkjFQof0JQH/Vhjj5C1xcAyNxq/VVBT01vdKk6zo6c08e84FEVMLd0m3XWtjFOYu7wRI7lldw2pSxyGnWvA4aiYWcWvvKNJtqB8wHqc5RPr9KRzhbxJnTM5K1vTx4xT/1ZUR3pU7nQKZo/4kP9XycIr/Jg3XMRSnqCBUJlagKAFPt2HF0LdsSk4WWcldb97Ar584nVGbSjPXEUVH0VgbUEm+dADzPoLP+NPMYOyhwgfADiqWaXyKT4UNESYXsPBkdGk6mLCaNSEQsDN1G2677Se3qjzDcyXBnEmHEFptRbmyJzKJ73veHPqfFYtsHO9jH0XnhYk8zKdRuqQ7dnuNIDwxm3UCPo22uFI0ZcgPvQm01s+8jYiMEFJDVra9jWyWTdMpMuhT3p2yYLf70CvUwIkw=",
# fuid.length = 1280, length 235 变化
# FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGnCfjdYr2J6wEsox+bQtrTEGEimjy3MrXEpSuItnI4KDyOhCNLIvGcJ9TrqWJqhR97
"fuid": "FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGnfaJTn/Ne60I9LwR04t6XmGEimjy3MrXEpSuItnI4KD0FJKzTbw1AN69fBnzR2FuvMmmQZ+1zgJ72wdcVU+mcQxiE2ir0+TEYgjPJt1Qa3K1mLi+P4IWJeag2lvxB4yJ/GgLbz7OSojK1zRbqBESR5Pdk2R9IA3lxxOVzA+Iw1TWLSgWjlFVG9Xmh1+20oPSbrzvDjYtVPmZ+9/6evcXmhcO1Y58MgLozKnaQIaLfWRPAn9I0uOqAMff6fuUeWcH1mRYoTw2Nhr4J4agZi377iM/izL6cVCGRy2F8c0VpEvM5FjnYxYstXg/9EfB3EVmKAfzNRIeToJ5YV9twMcgdmlV1Uhhp5FAe6gNJIUptp7EMAaXYKm11G+JVPszQFdp9AJLcm4YSsYUXkaPI2Tl66J246cmjWQDTahAOINR5rXR5r/7VVI1RMZ8gb40q7az7vCK56XLooKT5a+rsFrf5Zu0yyCiiagElhrTEOtNdBJJq8eHwEHuFBni9ahSwpC7lbKkUwaKH69tf0DFV7hJROiLETSFloIVkHdy3+I2JUr1LsplAz0hMkWt/tE4tXVUV7QcTDTZWS/2mCoS/GV3N9awQ6iM6hs/BWjlgnEa1+5iP7WSc7RJ34FaE5PsyGXyoCWdXwNRGSZPSvVtB/Ea6w5FKazXcZ/j40FJv+iLGBn3nkkgHlne61I8I7KhtQgIkmBMJIjPMkS/L051MeqdGScsKYTJuSucgI5c3+79eVH+y2TvbOTuuHv1uGxwXFb2atIU1ZYPbmmXculmizKcKI/s44qf8uM8iBZLGkKeVyL74aPyLkg7Gk359g98BIGN/ZzJR/h+Y6AyFx+HlMoYJnS06dVmqFbvlCtSdGylKQ5f8eWtxPkJGqOFtWjIVteQYMsH/AaSJonqw+WLiZvGjYfm9p0alEyujapoTy77HzDcUoU1wUSXa5xS/Z6hXEr2OnLi0LdPVcGjz8lpLcdVeSfm9p0alEyujapoTy77HzDWf5PERRSTFqLd9BTUHLyY4Ji3EQLGQPaM1aeHxG1bJZH0s1Si/KwzTaTYzu6ziQiqwcr2kaYUiH+fMOxn69/BhNJVMhpQkhprc1KZuJRvXjppq0gKweencPxgS/jd0rjw==",
"fs": fs
}
# logger.info(data)
response = requests.post(url, headers=self.headers, data=data, proxies=self.proxies).json()
# logger.success(response)
result = {
"ds": response["data"]["ds"],
"op": response["data"]["op"],
"tk": response["data"]["tk"]
}
return result
except Exception as e:
logger.error(f"[post_log] 验证码识别失败 {e}")
@staticmethod
def get_ak():
# 定值
ak = "76AKmP4xDQjB3vAIPef3KxOlJZWCpw64"
return ak

View File

@ -1,9 +1,163 @@
import os.path
import random
import re
import time
import requests
from loguru import logger
from sqlmodel import Session, select
from .base import BaseReporter
from ...config.config import AppCtx
from ...models.report_urls import ReportUrlModel
from ...utils.common import get_all_cookies, get_proxies, report_keywords, get_reporter_name, \
generate_random_phone_number, md5
from ...utils.gen_cookie import GenCookie
from ...utils.ua import random_ua
class SiteReporter(BaseReporter):
def __init__(self):
pass
self.engine_name = "SITE_REPORTER"
self.upload_pic_url = "https://help.baidu.com/api/mpic"
self.report_url = "https://help.baidu.com/jubaosubmit"
self.request = requests.session()
self.headers = {
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": random_ua(),
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,zh-TW;q=0.6",
"sec-ch-ua_wap": '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
"sec-ch-ua_wap-mobile": "?0",
"sec-ch-ua_wap-platform": '"Windows"',
"Cookie": "",
}
self.database = AppCtx.g_db_engine
self.all_cookies = get_all_cookies()
self.proxies = get_proxies()
self.token_pattern = r'name="submit_token" value="(.*?)"'
def run(self):
"""实现 PC 端的举报逻辑"""
pass
with Session(self.database) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.is_report_by_site == False)
rows: list[ReportUrlModel] = session.exec(stmt).all()
logger.info(f"[{self.engine_name}] 共计 {len(rows)} 条需要举报")
for row in rows:
# 生成举报需要的基础数据
surl = row.surl
q = row.q
domain = row.domain
img_path = f"./imgs/{domain}/{md5(surl)}.png"
if not os.path.exists(img_path):
logger.warning(f"{surl} 的截图不存在!")
continue
# 生成 cookie
cookie = random.choice(self.all_cookies)
report_site_cookie = GenCookie.run(cookie)
self.headers["Cookie"] = report_site_cookie
logger.debug(f"设置 cookie 为:{report_site_cookie}")
# 先上传图片
pic_filename = self.upload_pic(img_path)
# 再举报
result = self.do_report(domain, q, pic_filename, surl)
if result:
row.is_report_by_site = True
session.add(row)
session.commit()
# 等待5秒继续举报
time.sleep(5)
def upload_pic(self, img_path: str):
try:
with open(img_path, "rb") as fp:
files = {"pic": fp}
response = self.request.post(
self.upload_pic_url,
headers=self.headers,
files=files,
proxies=self.proxies,
allow_redirects=False,
timeout=5
)
pic_filename = response.json()["filename"]
return pic_filename
except Exception as e:
logger.error(f"上传举报图片失败,错误: {e}")
return None
def do_report(self, domain: str = "", keyword: str = "", filename: str = "", link: str = ""):
try:
# 先获取提交数据用的 token
response = self.request.get(
"https://help.baidu.com/jubao",
headers=self.headers,
proxies=self.proxies,
allow_redirects=False,
timeout=5,
)
match = re.search(self.token_pattern, response.text)
if match:
token_value = match.group(1)
logger.debug(f"成功获取 submit_token: {token_value}")
else:
logger.error("未找到匹配的 submit_token")
return None
# 组装提交数据
if len(keyword) > 50:
keyword = f"site {domain} {random.choice(report_keywords())}"
data = {
"submit_token": token_value,
"os_info": "Macintosh",
"browser_info": f"chrome 1{random.randint(20, 34)}",
"pid": "1",
"type": "3",
"realname": get_reporter_name(),
"mobile": generate_random_phone_number(),
"category_id": "3",
"query": keyword,
"title": domain,
"links": f"{link}",
"content": f"{link} 存在大量淫秽色情的收录搜索结果!!!",
"pic[]": filename
}
# 提交请求
response = self.request.post(
self.report_url,
data=data,
headers=self.headers,
proxies=self.proxies,
allow_redirects=False
)
json_data = response.json()
# logger.debug(f"response={json_data}")
if json_data["errno"] == 0:
logger.success(f"[{self.engine_name}]{link} 举报成功")
return True
if json_data["errno"] == 11:
logger.success(f"[{self.engine_name}]{link} 已经举报过")
return True
logger.debug(f"{json_data=}")
return False
except Exception as e:
logger.error(f"[{self.engine_name}] 举报错误:{e}")
return False

View File

@ -1,11 +1,145 @@
import base64
import json
import os.path
import random
import time
import requests
from loguru import logger
from sqlmodel import Session, select
from .base import BaseReporter
from ...config.config import AppCtx
from ...models.report_urls import ReportUrlModel
from ...utils.common import get_proxies, get_all_cookies, md5
from ...utils.ua import random_ua
class WapReporter(BaseReporter):
def __init__(self):
pass
self.engine_name = "WAP_REPORTER"
self.report_url = "https://ufosdk.baidu.com/api?m=Client&a=postMsg"
self.request = requests.session()
self.proxies = get_proxies()
self.headers = {
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': random_ua(is_wap=True),
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,zh-TW;q=0.6',
'sec-ch-ua_wap': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'sec-ch-ua_wap-mobile': '?0',
'sec-ch-ua_wap-platform': '"Windows"',
"Cookie": "",
}
self.database = AppCtx.g_db_engine
self.all_cookies = get_all_cookies()
def run(self):
"""实现 WAP 端的举报逻辑"""
pass
with Session(self.database) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.is_report_by_wap == False)
rows: list[ReportUrlModel] = session.exec(stmt).all()
logger.debug(f"[{self.engine_name}] 共找到 {len(rows)} 条待举报记录")
for row in rows:
# 选个 cookie
report_cookie = random.choice(get_all_cookies())
self.headers["Cookie"] = report_cookie
logger.debug(f"{report_cookie=}")
# 获取用户信息
userinfo = self.get_user_info()
if not userinfo:
logger.warning(f"[{self.engine_name}] 跳过 {row.surl} 的举报userinfo 获取失败")
continue
# 举报
img_path = f"./imgs/{row.domain}/{md5(row.surl)}.png"
if not os.path.exists(img_path):
logger.error(f"截图文件 {img_path} 不存在")
continue
result = self.do_report(userinfo, img_path, row.surl, row.q)
if result:
row.is_report_by_wap = True
session.add(row)
session.commit()
time.sleep(5)
def get_user_info(self):
try:
userinfo = {}
# wapUserAgent = random.choice(self.wapUserAgent)
response = self.request.get(
"https://ufosdk.baidu.com/api?m=Web&a=getUserInfo&appid=293852",
headers=self.headers, proxies=self.proxies, allow_redirects=False, timeout=5
)
json_data = response.json()
uid = json_data['result']['uid']
un = json_data['result']['un']
userinfo["uid"] = uid
userinfo["un"] = un
return userinfo
except Exception as e:
logger.error(f"[{self.engine_name}]获取用户信息错误: {e}")
return None
def do_report(self, userinfo: dict, img_path: str, fb_url: str, extend_query: str):
# 组装 extra 数据
with open(img_path, "rb") as fp:
img_data = fp.read()
img_data = base64.b64encode(img_data).decode("utf-8")
extra_string_data = {
"ufo_app_version": "3.0",
"feedback_position": "0",
"extend_query": extend_query,
"resource_id": "1599",
"feedback_source": "其他",
"feedback_source_text": "其他",
"extend_url": f"https://wap.baidu.com/s?word=url:{fb_url}",
"fb_url": fb_url,
"extend_feedback_channel": "36923",
"relation_words": "",
"industry_one": "30",
"industry_two": "197",
"user": userinfo["un"],
"baiducuid": ""
}
post_data = {
"appid": "293852",
"content": f"{fb_url} 存在大量色情淫秽信息",
"uid": userinfo["uid"],
"uname": userinfo["un"],
"ajax": "1",
"submit_type": "1",
"extend_feedback_channel": "0",
"baiducuid": "",
"extend_url": "",
"extrastring": json.dumps(extra_string_data, ensure_ascii=False),
"screenshot[]": f"data:image/png;base64,{img_data}"
}
response = self.request.post(
self.report_url,
data=post_data,
headers=self.headers,
proxies=self.proxies,
allow_redirects=False,
timeout=5
)
# logger.debug(req.json())
logger.debug(response.json())
if response.json()['errno'] == 0:
logger.success(f"[{self.engine_name}] {fb_url} 举报成功")
return True
return False

View File

@ -32,7 +32,7 @@ def connect_db(config: AppConfig):
# 导入所有模型,为了自动创建数据表
dsn = f"mysql+pymysql://{config.database.user}:{config.database.password}@{config.database.host}:{config.database.port}/{config.database.database}"
engine = create_engine(dsn, echo=config.debug)
engine = create_engine(dsn, echo=False)
SQLModel.metadata.create_all(engine)
AppCtx.g_db_engine = engine
@ -45,7 +45,7 @@ def create_database(config: AppConfig):
# 先创建一个没有指定数据库的连接
dsn = f"mysql+pymysql://{config.database.user}:{config.database.password}@{config.database.host}:{config.database.port}"
engine = create_engine(dsn, echo=config.debug)
engine = create_engine(dsn, echo=False)
with engine.connect() as conn:
conn.execute(

View File

@ -1,7 +1,87 @@
import hashlib
import random
def md5(s: str) -> str:
m = hashlib.md5()
m.update(s.encode('utf-8'))
return m.hexdigest()
def get_proxies():
username = "t14131310374591"
password = "qg6xwmrq"
tunnel = "d432.kdltps.com:15818"
proxies = {
"http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
"https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
}
return proxies
def get_all_cookies():
cookies_holder = []
def inner():
if not cookies_holder:
for line in open('./data/live_cookies.txt', 'r'):
line = line.strip()
if line:
cookies_holder.append(line)
return cookies_holder
return inner()
def report_keywords():
return [
"成人", "av", "亚洲", "日韩", "欧美", "国产", "无码", "黄色", "丝袜", "少妇", "人妻"
]
def get_reporter_name():
surnames = [
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""
]
# 常用名字字
name_characters = [
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""
]
xing = random.choice(surnames)
ming = random.choice(name_characters)
ming2 = random.choice(name_characters)
if random.choice([True, False]):
return f"{xing}{ming}"
else:
return f"{xing}{ming}{ming2}"
def generate_random_phone_number():
"""
随机生成一个国内11位手机号
"""
# 常见的手机号段前3位移动、联通、电信等
prefixes = [
"134", "135", "136", "137", "138", "139", # 移动号段
"150", "151", "152", "157", "158", "159", # 移动号段
"130", "131", "132", "155", "156", # 联通号段
"133", "153", "180", "181", "189", # 电信号段
"177", "173", "175", # 其他号段
"199", "198", "166", # 新号段
"186", "187", "188", "139" # 常见号段
]
# 随机选择一个前缀
prefix = random.choice(prefixes)
# 随机生成剩余的8位数字
suffix = ''.join(random.choices("0123456789", k=8))
# 返回完整的手机号
return prefix + suffix

View File

@ -151,4 +151,4 @@ class CustomBase64:
def encode(text):
std_encoded = base64.b64encode(text.encode()).decode()
custom_encoded = "".join(CustomBase64.mapping.get(c, c) for c in std_encoded)
return custom_encoded.replace("=", "")
return custom_encoded.replace("=", "")

View File

@ -10,7 +10,7 @@ class DPEngine:
def __init__(self, is_wap: bool = False, no_img: bool = True):
chrome_opts = ChromiumOptions()
chrome_opts.mute(True) # 静音
# chrome_opts.headless(True) # 无头模式
chrome_opts.headless(True) # 无头模式
chrome_opts.no_imgs(no_img) # 不加载图片
chrome_opts.set_argument("--disable-gpu") # 禁用GPU
chrome_opts.set_argument('--ignore-certificate-errors') # 忽略证书错误

132
app/utils/gen_cookie.py Normal file

File diff suppressed because one or more lines are too long

20
app/utils/ydm_verify.py Normal file
View File

@ -0,0 +1,20 @@
import base64
import json
import requests
class YdmVerify(object):
_custom_url = "https://www.jfbym.com/api/YmServer/customApi"
_token = "HhUGwpI6AtQGoux36i1ZpsDv7hwGSbr1hQ0RX-HXSZE"
_headers = {
'Content-Type': 'application/json'
}
def rotate(self, image):
payload = {
"image": base64.b64encode(image).decode(),
"token": self._token,
"type": "90009"
}
resp = requests.post(self._custom_url, headers=self._headers, data=json.dumps(payload))
return resp.json()['data']['data']

6348
data/all_cookies.txt Normal file

File diff suppressed because it is too large Load Diff

1513
data/cookies_2025-03-24 Normal file

File diff suppressed because it is too large Load Diff

1513
data/live_cookies.txt Normal file

File diff suppressed because it is too large Load Diff

0
imgs/.gitkeep Normal file
View File

6657
js/crypto-js.js Normal file

File diff suppressed because it is too large Load Diff

155
js/mkd_v2_link_submit.js Normal file
View File

@ -0,0 +1,155 @@
var CryptoJS = require('./js/crypto-js.js');
function getNewKey(as){
/**
* encryptedStr(SHA3-256) ---> f25f1614appsapi2
* encryptedValue ---> 49d3a9685870cc30f63330b8136c7adfdb8859c6b538308992a1c9a456db2e59
*
* encryptedValue(MD5) ---> 5e4ebc8cappsapi2
* encryptedValue ---> c30b8b5289e46489598de382a658cc7f
*/
var encryptedStr = as + "appsapi2";
var r = as.substr(as.length - 1, 1);
switch (true) {
case ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'a', 'b', 'c', 'd', 'e', 'f', 'g'].includes(r):
encryptedValue = CryptoJS.MD5(encryptedStr).toString();
break;
case ['H', 'I', 'J', 'K', 'L', 'M', 'N', 'h', 'i', 'j', 'k', 'l', 'm', 'n'].includes(r):
encryptedValue = CryptoJS.SHA1(encryptedStr).toString(CryptoJS.enc.Hex);
break;
case ['O', 'P', 'Q', 'R', 'S', 'T', 'o', 'p', 'q', 'r', 's', 't'].includes(r):
encryptedValue = CryptoJS.SHA256(encryptedStr).toString(CryptoJS.enc.Hex);
break;
case ['U', 'V', 'W', 'X', 'Y', 'Z', 'u', 'v', 'w', 'x', 'y', 'z'].includes(r):
encryptedValue = CryptoJS.SHA512(encryptedStr).toString(CryptoJS.enc.Hex);
break;
case ['0', '1', '2', '3', '4'].includes(r):
encryptedValue = CryptoJS.SHA3(encryptedStr, { outputLength: 256 }).toString(CryptoJS.enc.Hex);
break;
case ['5', '6', '7', '8', '9'].includes(r):
encryptedValue = CryptoJS.SHA3(encryptedStr, { outputLength: 512 }).toString(CryptoJS.enc.Hex);
break;
default:
encryptedValue = e;
}
var key = encryptedValue.slice(0, 16);
return key;
}
function aesEncrypt(t, key) {
var r = key
, o = CryptoJS.enc.Utf8.parse(r)
, i = CryptoJS.enc.Utf8.parse(t)
, r = CryptoJS.AES.encrypt(i, o, {
mode: CryptoJS.mode.ECB,
padding: CryptoJS.pad.ZeroPadding
});
return r.toString()
}
function getRzData(backstr, ac_c){
var rzData = {
"common": {
"cl": [],
"mv": [],
"sc": [],
"kb": [],
"sb": [],
"sd": [],
"sm": [],
"cr": {
"screenTop": 0,
"screenLeft": 0,
"clientWidth": 1903,
"clientHeight": 395,
"screenWidth": 1920,
"screenHeight": 1080,
"availWidth": 1920,
"availHeight": 1032,
"outerWidth": 1920,
"outerHeight": 1032,
"scrollWidth": 1903,
"scrollHeight": 1903
},
"simu": 0
},
"backstr": backstr,
"captchalist": {
"spin-0": {
"mv": [
{
"t": 1691824376147,
"fx": 1001,
"fy": 295
},
{
"t": 1691824376435,
"fx": 1002,
"fy": 295
}
],
"ac_c": ac_c,
"cl": [
{
"t": 1691824376212,
"x": 1002,
"y": 295
}
],
"p": {},
"cr": {
"left": 806,
"top": 40,
"width": 290,
"height": 280
},
"back": {
"left": 875,
"top": 84,
"width": 152,
"height": 152
}
}
}
}
return rzData;
}
function getTrackAn(an_spin_0, c_value){
const i = Date.now();
let n, o = 0, s = "0";
let zeroNum = "";
for (let i = 0; i < c_value; i++) {
zeroNum += "0";
}
for (; s !== zeroNum && true; )
s = CryptoJS.SHA1(an_spin_0 + o).toString().substring(0, c_value),
o++;
return n = Date.now() - i,
{
t: n,
an: o - 1
}
}
function getFs1(backstr, ac_c, as, track_p){
var rzData = getRzData(backstr, ac_c, track_p);
var fs_1 = aesEncrypt(JSON.stringify(rzData), getNewKey(as));
return fs_1;
}
function getFs2(backstr, ac_c, as){
//var track_p = getTrackAn(an_spin_0, c_value);
var fs_params = {
common_en: getFs1(backstr, ac_c, as),
backstr: backstr
};
var fs_2 = aesEncrypt(JSON.stringify(fs_params), getNewKey(as));
return fs_2;
}
//var backstr = "2333-kNbq8pYj9qwXDEPyB/K7+sEOCG56rn6WxPRL5MVFf9wjuFjFY+v4eiP21GmXN0/ykYmC7H0irXZOYxsTwIARXTZHjZWVymoqqVA2nNL6/s7aAbbr0WPYCn40XHTCNOBMWONN760VzD2oj8NlJ4SpJz7TOKPeEguv0uvGa4i0v4vgYTSrdNZ2+wz5cZREo6Hf+Mk9zDsy20SEeqJ/9PAwDmH25TvAMQX7Pnb5aMxfHT7GX5nwMPFHlIJHJQ2qI7AwmBRiY0Ei3PoEj8auHGxd9UULDF39Zet+o3AGquoMPrsqD1wrS+Kdn0dp4Q6oxiy10pmVc3uKe+v0M2Nd0LRhbhaukkeTL9bNR4bHz+rpuia8Ki9u02RWPm6NySco6lJ9n/s0lZGDb0WcD6+yRpsdSbwTWCAmxsDA6aaYOsPa/T5tDcqm01XWgl/oir9fhiYNJ5Ys9AW4jKCp7U2r63CvftFhg7hR2gdzYgFwtVQvaSSp5cw5b5+T22k+1nPyvszKIQXgbsdf+pxhV4oy/3cKZgCWnOV16LxSd0QXJmZzGYxpHjJaJ6HHAmh8UhspFeWG8Hwu8WPwEfk/hyMYx8xygxknoYKlHny6FejCSqcfVyLPlmmgzsm48snssFwN/OEJnGODd23gPWReFX+VY4ceQN/zSC0VvPiEEhWEl0jAQ1lGsLm7M8eSALbFxp0IcD7tXBsieG0P8oYHm2BorLAe/Q==";
//var ac_c = 0.55;
//var as = "f25f1614";
//get_fs_2(as);

44
tests/test_cookie.py Normal file
View File

@ -0,0 +1,44 @@
import os
import random
from datetime import datetime
import requests
from loguru import logger
from app.utils.common import get_all_cookies, get_proxies
from app.utils.ua import random_ua
def main():
ua = random_ua()
headers = {
'User-Agent': ua,
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "keep-alive",
"Sec-Fetch-Site": "none",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-User": "?1",
"Sec-Fetch-Dest": "document",
}
with open("../data/live_cookies.txt", "r") as fp:
for cookie in fp:
headers["Cookie"] = cookie.strip()
try:
req = requests.get(
'https://help.baidu.com/api/count',
headers=headers,
proxies=get_proxies(),
timeout=10)
if req.json()['errno'] == 0:
logger.success(cookie)
# with open(f"./data/cookies_{datetime.now().strftime('%Y-%m-%d')}", 'a+') as f:
# f.write(cookie + '\n')
except Exception as e:
logger.error(f"{e=}")
if __name__ == '__main__':
main()

51
tests/test_init_post.py Normal file
View File

@ -0,0 +1,51 @@
import time
import requests
from loguru import logger
from app.utils.common import get_proxies
from app.utils.ua import random_ua
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,zh-TW;q=0.6',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Origin': 'https://jubao.baidu.com',
'Pragma': 'no-cache',
'Referer': "",
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': random_ua(),
'X-Requested-With': 'XMLHttpRequest',
'sec-ch-ua_wap': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'sec-ch-ua_wap-mobile': '?0',
'sec-ch-ua_wap-platform': '"Windows"',
"Cookie": "BIDUPSID=6E6F5ACEDDAE59C1C7F8BEDB76EE3472; PSTM=1743162000; BAIDUID=6E6F5ACEDDAE59C1C7F8BEDB76EE3472:FG=1; H_PS_PSSID=61027_61684_62227_62341_62371_62484_62327_62675_62687_62701_62618_62328_62693_62793_62759; Hm_lvt_90056b3f84f90da57dc0f40150f005d5=1743221214; HOSUPPORT=1; HOSUPPORT_BFESS=1; BDORZ=B490B5EBF6F3CD402E515D22BCDA1598; BAIDUID_BFESS=6E6F5ACEDDAE59C1C7F8BEDB76EE3472:FG=1; BA_HECTOR=800l2k8g8404ak202180012h28e09b1juhubo23; ZFY=u7PzH9OEifErag4uR150Iqd3poJ4TvSXCPoa34vrghQ:C; pplogid=9018L%2BUrQhkdYh9xu5DGISHMKzFCdTvnDRpFspVuDw2cVdu0kPiicN3F%2BLIjIrS24Mnh97bf1rZ4jrYc4bfJjvviHKq3%2BcdD3k3cbzIyP7zjs%2FNbkRUI6ZXN2N6vpsW0vqz4; pplogid_BFESS=9018L%2BUrQhkdYh9xu5DGISHMKzFCdTvnDRpFspVuDw2cVdu0kPiicN3F%2BLIjIrS24Mnh97bf1rZ4jrYc4bfJjvviHKq3%2BcdD3k3cbzIyP7zjs%2FNbkRUI6ZXN2N6vpsW0vqz4; ppfuid=FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGkjS1Q+e/k7Rs6uiFpI37bSGEimjy3MrXEpSuItnI4KD8g41DpSynLlZIw1fiOsIP5cXmpChQyV1e1EPWcXT7Obd3LJjL1Yn6XicXrG4uxcjhJsVwXkGdF24AsEQ3K5XBbh9EHAWDOg2T1ejpq0s2eFy9ar/j566XqWDobGoNNfmfpaEhZpob9le2b5QIEdiQcF+6iOKqU/r67N8lf+wxW6FCMUN0p4SXVVUMsKNJv2TwEq3+MvKTlPBjfdM81CMPq4LkPV+7TROLMG0V6r0A++zkWOdjFiy1eD/0R8HcRWYsUPXjDqADgs+Xs31pnSHeup+HBavJhpxl858h16cMtKQmxzisHOxsE/KMoDNYYE7ucLE22Bi0Ojbor7y6SXfVj7+B4iuZO+f7FUDWABtt/WWQqHKVfXMaw5WUmKnfSR5wwQa+N01amx6X+p+x97kkGmoNOSwxWgGvuezNFuiJQdt51yrWaL9Re9fZveXFsIu/gzGjL50VLcWv2NICayyI8BE9m62pdBPySuv4pVqQ9Sl1uTC//wIcO7QL9nm+0N6JgtCkSAWOZCh7Lr0XP6QztjlyD3bkwYJ4FTiNanaDaDGJvQSBXzsg+IRRhyCDByoQGQeeW2tuLWLdT/wCCYRVapxmYCJZNrnRUkSIDHL7jJSGk66HDxtjKMU4HPNa0dthF7UsHf7NW9eE+gwuTQSa7GLWfOy9+ap4iFBQsmjpefgOF89jAHLbnVUejtrqqvdeh0/gTYfZ0rcnDTfBJCgA4txFw2W/gbxV6NVjlHtRIsACLbj6xl/yMHQzkZkVpRsJWlfRxTg0fATL8GqvWQ4/UDHI1keFBVnYanS4vsc0Vv4r6Ox6qCSA2GVhbcby9TZ0ZP/w8N2tU35oxlseq1LWgPu6XttGgSDD3qzeT+ioGRlVhowqdfR1aHkTtWeGpFIinbqcXU/T4f2hLII9D3g8/VJFpqtKhXSD3+Zxe9dMoovaJ+8O2a8FoOA61ANQdaSJ1gag0ZQYe+hQYRV64Y295VRIIti02vwfwuR8+zl4gzEd/6cK75U6uYcAhJpQbMAmzkzkW7nI8GFajWDjxAW3XV3NICTNZdymb5mwlfccpsD2YQGte0wan2iODRDe4PUg==; logTraceID=5654693b3d03476811e652cf58d1a0dfa472ce17028f7bafa0; UBI=fi_PncwhpxZ%7ETaKAchm6wmV2coOGs4YUtjch-4Zrot8sT1YSnNg1ybFt26pXcYEE5C284P8mos7AXpgAG4rABOEzOzPPcZDAjHxquBSVWmDg5el826AcbI7hmwaUmIF9sV05RVKkkUun2i%7En4ggkoXeqU8lxA__; UBI_BFESS=fi_PncwhpxZ%7ETaKAchm6wmV2coOGs4YUtjch-4Zrot8sT1YSnNg1ybFt26pXcYEE5C284P8mos7AXpgAG4rABOEzOzPPcZDAjHxquBSVWmDg5el826AcbI7hmwaUmIF9sV05RVKkkUun2i%7En4ggkoXeqU8lxA__; STOKEN=c6050c756de921fcb16f7dffc43dd50f2086995f8a899a6e00e920d811f567d2; BDUSS=V0VURQRFBRWGMyY05TdXB5cnpKaTZUUnNVUGllfk92MmQtc0FzSVU0eEQxUkJvSUFBQUFBJCQAAAAAAQAAAAEAAAASuE-WeGh5MjAyNQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAENI6WdDSOlnZ2; PTOKEN=071414a8697fb7580d09c409b29c063c; BDUSS_BFESS=V0VURQRFBRWGMyY05TdXB5cnpKaTZUUnNVUGllfk92MmQtc0FzSVU0eEQxUkJvSUFBQUFBJCQAAAAAAQAAAAEAAAASuE-WeGh5MjAyNQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAENI6WdDSOlnZ2; STOKEN_BFESS=c6050c756de921fcb16f7dffc43dd50f2086995f8a899a6e00e920d811f567d2; PTOKEN_BFESS=071414a8697fb7580d09c409b29c063c; BDRCVFR[feWj1Vr5u3D]=I67x6TjHwwYf0; delPer=0; PSINO=5",
}
surl = "https://www.yunzhiju.net/jxfsxf/"
token = "0DE59186A7A5F2F6F8D41B95ED9E26D0"
title = "深入解析游戏玩法-轻松上手,提升游戏技巧-云之居软件园"
q = "www.yunzhiju.net/jxfsxf/"
timestamp_s = int(time.time() * 1000)
url = "https://passport.baidu.com/cap/init"
data = {
"_": int(time.time() * 1000),
"refer": f"http://jubao.baidu.com/jubao/accu/?surl={surl}&token={token}&title={title}&q={q}&has_gw=0&has_v=0&_t8={timestamp_s}",
"ak": "76AKmP4xDQjB3vAIPef3KxOlJZWCpw64",
"ver": "2",
"scene": "",
"ds": "",
"tk": "",
"as": "",
"reinit": 0
}
logger.debug(f"{headers=}")
logger.debug(f"{data=}")
response = requests.post(url, headers=headers, data=data, proxies={
"http": "http://localhost:8080/",
"https": "http://localhost:8080/"
}, verify=False).json()
logger.success(response)

12
tests/test_proxy.py Normal file
View File

@ -0,0 +1,12 @@
import requests
from app.utils.common import get_proxies
def main():
response = requests.get("https://ip.im", headers={"User-Agent": "curl"}, proxies=get_proxies(), timeout=5)
print(response.text)
if __name__ == '__main__':
main()