baidu-reporter/app/engines/evidence_engine.py
2025-03-30 16:04:34 +08:00

221 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os.path
import threading
import urllib
from urllib.parse import urlparse
import requests
from DrissionPage._pages.mix_tab import MixTab
from loguru import logger
from sqlmodel import Session, select
from app.config.config import AppCtx
from app.models.report_urls import ReportUrlModel
from app.utils.common import md5
from app.utils.dp import DPEngine
class EvidenceEngine:
"""固定色站证据,搜索 URL 后截图,并生成举报链接存入数据库"""
def __init__(self):
# 开启一个浏览器窗口
self.dp_engine = DPEngine()
self.wap_dp_engine = DPEngine(is_wap=True)
# 控制运行状态的数据
self.ev = threading.Event()
self.status = 1
# 工作线程
self.worker_thread = None
# 数据库连接
self.database = AppCtx.g_db_engine
def start(self):
"""启动线程"""
self.worker_thread = threading.Thread(target=self.worker, name="evidence_engine", daemon=True)
self.worker_thread.start()
def cli_start(self):
"""以CLI模式开启就是只执行一次不循环"""
# 从数据库中获取所有待收集证据的 URL 列表
targets = self.get_surl_from_db()
logger.debug(f"共获取到 {len(targets)} 条待处理数据")
# 依次处理
for target in targets:
logger.debug(f"开始获取 {target['surl']} 的举报数据")
self.get_screenshot_and_report_link(target)
def worker(self):
"""工作函数"""
while self.status:
# 从数据库中获取所有待收集证据的 URL 列表
targets = self.get_surl_from_db()
# 依次处理
for target in targets:
logger.debug(f"开始获取 {target['surl']} 的举报数据")
self.get_screenshot_and_report_link(target)
# 每分钟跑一次
self.ev.wait(60)
def stop(self):
"""结束线程"""
self.status = 0
self.ev.set()
self.dp_engine.close()
self.wap_dp_engine.close()
def get_surl_from_db(self):
"""从数据库中获取数据"""
result: list = []
with Session(self.database) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.has_evidence == False)
surl = session.exec(stmt).all()
for url in surl:
result.append({"id": url.id, "surl": url.surl, "domain": url.domain})
return result
def get_screenshot_and_report_link(self, target: dict):
"""获取证据截图和举报链接"""
try:
surl = target["surl"]
# Part1 获取证据截图
logger.debug(f"开始获取 {surl} 在百度搜索中的截图")
img_path, tab = self.get_screenshot(target)
if not img_path:
return None
# Part2 截一张surl本身的图
logger.debug(f"开始获取 {surl} 的截图")
img_path, wap_tab = self.get_wap_screenshot(target)
wap_tab.close()
# Part3 获取举报链接
logger.debug(f"开始获取 {surl} 的举报链接")
report_link = self.get_report_link(tab)
logger.debug(f"获取到举报链接为: {report_link}")
if not report_link:
return None
# Part4 获取举报链接的信息
logger.debug(f"开始获取举报链接的参数信息")
params = self.resolve_report_link(report_link)
if not params:
logger.error(f"解析举报链接失败surl: {surl}")
return None
token = params["token"][0]
title = params["title"][0]
q = params["q"][0]
surl = params["surl"][0]
# 更新数据库
with Session(self.database) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.id == target["id"])
model: ReportUrlModel = session.exec(stmt).first()
if not model:
logger.error(f"{target['id']} 记录不存在,跳过...")
return None
# 更新数据
model.token = token
model.title = title
model.q = q
model.has_evidence = True
session.add(model)
session.commit()
logger.debug(f"{surl} 处理完成")
except Exception as e:
logger.error(f"获取证据截图和举报链接失败: {e}")
def get_screenshot(self, target: dict) -> tuple[str | None, MixTab]:
"""获取搜索页面的截图,返回 img_path """
search_keyword = target["surl"].lstrip("https://").lstrip("http://")
tab = self.dp_engine.browser.new_tab()
tab.get("https://www.baidu.com")
tab.ele("#kw").input(f"{search_keyword}\n", clear=True)
tab.wait.eles_loaded([".content_none", "#content_left"], any_one=True)
if "未找到相关结果" in tab.html:
logger.info(f"没有关于 {search_keyword} 的数据")
return None, tab
# 图片的存储路径
# 截完图先不要关闭 tab别的地方还要用
img_path = f"./imgs/{target['domain']}/{md5(target['surl'])}.png"
return self.do_screenshot(tab, img_path)
def get_wap_screenshot(self, target: dict) -> tuple[str | None, MixTab]:
"""用 wap dp 再截一张 surl 本身的图"""
tab = self.wap_dp_engine.browser.new_tab()
tab.get(target["surl"])
tab.wait(5) # 这里只能硬等不知道surl的结构没办法精确判断
img_path = f"./imgs/{target['domain']}/{md5(target['surl'])}-wap.png"
return self.do_screenshot(tab, img_path)
@staticmethod
def do_screenshot(tab: MixTab, img_path: str, force=False) -> tuple[str | None, MixTab]:
"""截图函数"""
if os.path.exists(img_path):
if force:
os.remove(img_path)
else:
logger.debug(f"截图路径 {img_path} 已经存在,跳过截图")
return img_path, tab
tab.get_screenshot(path=img_path)
logger.debug(f"截图成功: {img_path}")
return img_path, tab
@staticmethod
def get_report_link(tab: MixTab):
"""获取举报链接,这个时候页面应该停留在搜索结果页"""
tools = tab.eles(".:c-tools")
tab.wait(0.5)
if tools:
tool = tools[0]
tool.hover(0, 0)
tool.click(True)
tips = tab.eles(".c-tip-menu")
if tips:
tip = tips[0]
report = tip.ele("t:a@@text()=举报")
if report:
return report.attr("href")
return None
@staticmethod
def resolve_report_link(report_link):
try:
proxy_link = AppCtx.g_app_config.chrome.proxy
proxies = {
"http": proxy_link,
"https": proxy_link,
}
response = requests.get(report_link, proxies=proxies, timeout=5, allow_redirects=False)
location = response.headers["Location"]
if not location:
logger.warning("没有获取到举报链接的 Location")
return None
parsed_url = urllib.parse.urlparse(response.headers["Location"])
query_params = urllib.parse.parse_qs(parsed_url.query)
decoded_params = {
key: [urllib.parse.unquote(value) for value in values] for key, values in query_params.items()
}
if len(decoded_params) == 0:
return None
return decoded_params
except Exception as e:
logger.error(f"解析举报链接失败,错误: {e}")
pass