证据采集器完成

This commit is contained in:
xhy 2025-03-30 16:04:34 +08:00
parent f8b7898d35
commit e2604067fe
10 changed files with 373 additions and 25 deletions

View File

@ -7,6 +7,7 @@ from app.engines.reporter import Reporter
from .config import load_config, AppConfig
from .engines.crawl_engine import CrawlEngine
from .engines.evidence_engine import EvidenceEngine
from .models.base import connect_db, create_database
from loguru import logger
@ -33,12 +34,13 @@ class MainApp:
help="指定配置文件路径,默认为 ./config.local.toml",
)
# 添加输入文件参数
parser.add_argument(
"-f",
"--file",
default="./urls.txt",
help="指定输入文件路径,默认为 ./urls.txt",
"--crawl", help="采集模式,根据域名批量采集 SURL",
)
parser.add_argument(
"--evidence", help="收集证据模式,对数据库内的 SURL 获取证据",
action="store_true"
)
# 添加运行模式参数
@ -72,8 +74,8 @@ class MainApp:
args.mode = ["pc", "site", "wap"]
# 检查输入的文件是否存在
if not os.path.exists(args.file):
parser.error(f"输入的文件不存在: {args.file}")
# if not os.path.exists(args.file):
# parser.error(f"输入的文件不存在: {args.file}")
# 检查配置文件是否存在
if not os.path.exists(args.config):
@ -83,14 +85,15 @@ class MainApp:
def start_cli(self):
"""开启 CLI 模式"""
# reporter = Reporter(self.args.file, self.args.mode, self.db_engine)
# reporter.run()
crawler = CrawlEngine()
crawler.start()
time.sleep(3600)
crawler.stop()
logger.debug(f"args.crawl: {self.args.crawl}")
if self.args.crawl:
crawl = CrawlEngine()
crawl.cli_start(self.args.crawl)
crawl.stop()
elif self.args.evidence:
evidence = EvidenceEngine()
evidence.cli_start()
evidence.stop()
def start_web(self):
"""开启 Web 模式"""

View File

@ -26,11 +26,33 @@ class CrawlEngine:
# 工作线程
self.worker_thread = None
self.database = AppCtx.g_db_engine
def start(self):
"""启动采集器"""
self.worker_thread = threading.Thread(target=self.worker, name="crawl_engine", daemon=True)
self.worker_thread.start()
def cli_start(self, target_domain: str):
"""CLI 模式启动"""
with Session(self.database) as session:
stmt = select(DomainModel).where(DomainModel.domain == target_domain)
result = session.exec(stmt).first()
if not result:
model: DomainModel = DomainModel(
domain=target_domain,
status=1,
crawl_interval=60 * 7 * 24,
latest_crawl_time=0,
)
session.add(model)
session.commit()
result = model
# 直接采集
surl = self.crawl(target_domain)
self.save_surl(session, result, surl)
def stop(self):
"""停止采集器"""
self.ev.set()
@ -90,6 +112,7 @@ class CrawlEngine:
logger.debug("首页加载完成!")
# 设置搜索时间范围
self.ev.wait(1)
tab.ele("#timeRlt").click(True)
tab.wait.eles_loaded("@class:time_pop_")
self.ev.wait(1)
@ -118,7 +141,6 @@ class CrawlEngine:
logger.debug("未找到结果,退出")
break
# 获取数据
tab.wait.eles_loaded("@id=content_left")
results = tab.ele("@id=content_left").eles("@class:result")

View File

@ -1,7 +0,0 @@
class EvidenceHolder:
"""固定色站证据,搜索 URL 后截图,并生成举报链接存入数据库"""
def __init__(self):
pass

View File

@ -0,0 +1,220 @@
import os.path
import threading
import urllib
from urllib.parse import urlparse
import requests
from DrissionPage._pages.mix_tab import MixTab
from loguru import logger
from sqlmodel import Session, select
from app.config.config import AppCtx
from app.models.report_urls import ReportUrlModel
from app.utils.common import md5
from app.utils.dp import DPEngine
class EvidenceEngine:
"""固定色站证据,搜索 URL 后截图,并生成举报链接存入数据库"""
def __init__(self):
# 开启一个浏览器窗口
self.dp_engine = DPEngine()
self.wap_dp_engine = DPEngine(is_wap=True)
# 控制运行状态的数据
self.ev = threading.Event()
self.status = 1
# 工作线程
self.worker_thread = None
# 数据库连接
self.database = AppCtx.g_db_engine
def start(self):
"""启动线程"""
self.worker_thread = threading.Thread(target=self.worker, name="evidence_engine", daemon=True)
self.worker_thread.start()
def cli_start(self):
"""以CLI模式开启就是只执行一次不循环"""
# 从数据库中获取所有待收集证据的 URL 列表
targets = self.get_surl_from_db()
logger.debug(f"共获取到 {len(targets)} 条待处理数据")
# 依次处理
for target in targets:
logger.debug(f"开始获取 {target['surl']} 的举报数据")
self.get_screenshot_and_report_link(target)
def worker(self):
"""工作函数"""
while self.status:
# 从数据库中获取所有待收集证据的 URL 列表
targets = self.get_surl_from_db()
# 依次处理
for target in targets:
logger.debug(f"开始获取 {target['surl']} 的举报数据")
self.get_screenshot_and_report_link(target)
# 每分钟跑一次
self.ev.wait(60)
def stop(self):
"""结束线程"""
self.status = 0
self.ev.set()
self.dp_engine.close()
self.wap_dp_engine.close()
def get_surl_from_db(self):
"""从数据库中获取数据"""
result: list = []
with Session(self.database) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.has_evidence == False)
surl = session.exec(stmt).all()
for url in surl:
result.append({"id": url.id, "surl": url.surl, "domain": url.domain})
return result
def get_screenshot_and_report_link(self, target: dict):
"""获取证据截图和举报链接"""
try:
surl = target["surl"]
# Part1 获取证据截图
logger.debug(f"开始获取 {surl} 在百度搜索中的截图")
img_path, tab = self.get_screenshot(target)
if not img_path:
return None
# Part2 截一张surl本身的图
logger.debug(f"开始获取 {surl} 的截图")
img_path, wap_tab = self.get_wap_screenshot(target)
wap_tab.close()
# Part3 获取举报链接
logger.debug(f"开始获取 {surl} 的举报链接")
report_link = self.get_report_link(tab)
logger.debug(f"获取到举报链接为: {report_link}")
if not report_link:
return None
# Part4 获取举报链接的信息
logger.debug(f"开始获取举报链接的参数信息")
params = self.resolve_report_link(report_link)
if not params:
logger.error(f"解析举报链接失败surl: {surl}")
return None
token = params["token"][0]
title = params["title"][0]
q = params["q"][0]
surl = params["surl"][0]
# 更新数据库
with Session(self.database) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.id == target["id"])
model: ReportUrlModel = session.exec(stmt).first()
if not model:
logger.error(f"{target['id']} 记录不存在,跳过...")
return None
# 更新数据
model.token = token
model.title = title
model.q = q
model.has_evidence = True
session.add(model)
session.commit()
logger.debug(f"{surl} 处理完成")
except Exception as e:
logger.error(f"获取证据截图和举报链接失败: {e}")
def get_screenshot(self, target: dict) -> tuple[str | None, MixTab]:
"""获取搜索页面的截图,返回 img_path """
search_keyword = target["surl"].lstrip("https://").lstrip("http://")
tab = self.dp_engine.browser.new_tab()
tab.get("https://www.baidu.com")
tab.ele("#kw").input(f"{search_keyword}\n", clear=True)
tab.wait.eles_loaded([".content_none", "#content_left"], any_one=True)
if "未找到相关结果" in tab.html:
logger.info(f"没有关于 {search_keyword} 的数据")
return None, tab
# 图片的存储路径
# 截完图先不要关闭 tab别的地方还要用
img_path = f"./imgs/{target['domain']}/{md5(target['surl'])}.png"
return self.do_screenshot(tab, img_path)
def get_wap_screenshot(self, target: dict) -> tuple[str | None, MixTab]:
"""用 wap dp 再截一张 surl 本身的图"""
tab = self.wap_dp_engine.browser.new_tab()
tab.get(target["surl"])
tab.wait(5) # 这里只能硬等不知道surl的结构没办法精确判断
img_path = f"./imgs/{target['domain']}/{md5(target['surl'])}-wap.png"
return self.do_screenshot(tab, img_path)
@staticmethod
def do_screenshot(tab: MixTab, img_path: str, force=False) -> tuple[str | None, MixTab]:
"""截图函数"""
if os.path.exists(img_path):
if force:
os.remove(img_path)
else:
logger.debug(f"截图路径 {img_path} 已经存在,跳过截图")
return img_path, tab
tab.get_screenshot(path=img_path)
logger.debug(f"截图成功: {img_path}")
return img_path, tab
@staticmethod
def get_report_link(tab: MixTab):
"""获取举报链接,这个时候页面应该停留在搜索结果页"""
tools = tab.eles(".:c-tools")
tab.wait(0.5)
if tools:
tool = tools[0]
tool.hover(0, 0)
tool.click(True)
tips = tab.eles(".c-tip-menu")
if tips:
tip = tips[0]
report = tip.ele("t:a@@text()=举报")
if report:
return report.attr("href")
return None
@staticmethod
def resolve_report_link(report_link):
try:
proxy_link = AppCtx.g_app_config.chrome.proxy
proxies = {
"http": proxy_link,
"https": proxy_link,
}
response = requests.get(report_link, proxies=proxies, timeout=5, allow_redirects=False)
location = response.headers["Location"]
if not location:
logger.warning("没有获取到举报链接的 Location")
return None
parsed_url = urllib.parse.urlparse(response.headers["Location"])
query_params = urllib.parse.parse_qs(parsed_url.query)
decoded_params = {
key: [urllib.parse.unquote(value) for value in values] for key, values in query_params.items()
}
if len(decoded_params) == 0:
return None
return decoded_params
except Exception as e:
logger.error(f"解析举报链接失败,错误: {e}")
pass

7
app/utils/common.py Normal file
View File

@ -0,0 +1,7 @@
import hashlib
def md5(s: str) -> str:
m = hashlib.md5()
m.update(s.encode('utf-8'))
return m.hexdigest()

View File

@ -10,7 +10,7 @@ class DPEngine:
def __init__(self, is_wap: bool = False, no_img: bool = True):
chrome_opts = ChromiumOptions()
chrome_opts.mute(True) # 静音
chrome_opts.headless(True) # 无头模式
# chrome_opts.headless(True) # 无头模式
chrome_opts.no_imgs(no_img) # 不加载图片
chrome_opts.set_argument("--disable-gpu") # 禁用GPU
chrome_opts.set_argument('--ignore-certificate-errors') # 忽略证书错误

View File

@ -1,3 +1,5 @@
from loguru import logger
from app import MainApp
import sys
@ -9,7 +11,7 @@ def main():
app = MainApp()
app.run()
except Exception as e:
print(f"程序运行失败,错误 {e} 信息如下:")
logger.error(f"程序运行失败,错误 {e} 信息如下:")
traceback.print_exc()
sys.exit(1)

22
tests/test_db.py Normal file
View File

@ -0,0 +1,22 @@
from sqlalchemy import create_engine
from sqlmodel import Session, select
from app.models.report_urls import ReportUrlModel
dsn = f"mysql+pymysql://root:123456@localhost:3306/baidu_reporter"
engine = create_engine(dsn, echo=True)
with Session(engine) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.surl == "4444")
result = session.exec(stmt).first()
print(result)
if not result:
example = ReportUrlModel(
domain_id=1,
domain="111",
surl="4444",
)
session.add(example)
session.commit()

47
tests/test_evidence.py Normal file
View File

@ -0,0 +1,47 @@
from DrissionPage import ChromiumOptions
from DrissionPage import Chromium
chrome_opts = ChromiumOptions()
chrome_opts.mute(True) # 静音
chrome_opts.no_imgs(False)
chrome_opts.set_argument("--disable-gpu")
chrome_opts.set_argument('--ignore-certificate-errors')
chrome_opts.set_argument("--proxy-server=http://127.0.0.1:7890")
chrome_opts.incognito(True)
chrome_opts.set_browser_path(r"C:\Program Files\Google\Chrome\Application\chrome.exe")
# chrome_opts.auto_port(True)
browser = Chromium(addr_or_opts=chrome_opts)
search_keyword = "www.yunzhiju.net/zxysx/11456.html"
tab = browser.new_tab("https://www.baidu.com/")
tab.ele("#kw").input(f"{search_keyword}\n", clear=True)
print("before wait")
tab.wait.eles_loaded([".content_none", "#content_left"], any_one=True)
print("after wait")
tools = tab.eles('.:c-tools')
print(tools)
tab.wait(1)
if tools:
tool = tools[0]
tool.hover(0, 0)
tool.click(True)
tips = tab.eles(".c-tip-menu")
print("tips:", tips)
if tips:
tip = tips[0]
temp = tip.ele("t:a@@text()=举报")
print(temp)
href = temp.attr("href")
print(f"href={href}")
# tools = tab.eles(".c-tools")
# print(tools)
# for tool in tools:
# tool.hover(1,1)
#
# for x in tab.eles("t:a@@text()=举报"):
# print(x)

32
tests/test_report_link.py Normal file
View File

@ -0,0 +1,32 @@
import sys
import urllib
from urllib.parse import urlparse
import requests
from loguru import logger
def main():
report_link = "https://www.baidu.com/tools?url=https%3A%2F%2Fwww.yunzhiju.net%2Fzxysx%2F11456.html&jump=http%3A%2F%2Fjubao.baidu.com%2Fjubao%2Faccu%2F%3Ftitle%3D%2501%25E5%25A6%2582%25E4%25BD%2595%2501%25E9%2580%259A%25E8%25BF%2587%2501%25E7%259B%25B4%25E6%2592%25AD%2501%25E6%2590%25AC%25E5%25AE%25B6%2501app%2501%25E4%25B8%258B%25E8%25BD%25BD%2501%25E5%25AE%2598%25E6%2596%25B9%2501%25E6%25AD%25A3%25E7%2589%2588%2501%25E5%25AE%2589%25E8%25A3%2585%2501%25E5%25B9%25B6%2501%25E4%25BD%2593%25E9%25AA%258C%2501%25E4%25BE%25BF%25E6%258D%25B7%2501%25E7%259A%2584%2501%25E6%2590%25AC%25E5%25AE%25B6%2501%25E6%259C%258D%25E5%258A%25A1%2501%253F%2501-%2501%25E4%25BA%2591%2501%25E4%25B9%258B%2501...%26q%3Dwww.yunzhiju.net%252Fzxysx%252F11456.html%26has_gw%3D0%26has_v%3D0&key=surl"
proxy_link = "http://localhost:7890"
proxies = {
"http": proxy_link,
"https": proxy_link,
}
response = requests.get(report_link, proxies=proxies, timeout=5, allow_redirects=False)
location = response.headers["Location"]
if not location:
logger.warning("没有获取到举报链接的 Location")
return
parsed_url = urllib.parse.urlparse(response.headers["Location"])
query_params = urllib.parse.parse_qs(parsed_url.query)
decoded_params = {
key: [urllib.parse.unquote(value) for value in values] for key, values in query_params.items()
}
print(decoded_params)
if __name__ == '__main__':
main()