From e2604067fe0ff3b21627b743b0f7c9f19659259a Mon Sep 17 00:00:00 2001 From: xhy Date: Sun, 30 Mar 2025 16:04:34 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AF=81=E6=8D=AE=E9=87=87=E9=9B=86=E5=99=A8?= =?UTF-8?q?=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/app.py | 33 ++--- app/engines/crawl_engine.py | 24 +++- app/engines/evidence.py | 7 -- app/engines/evidence_engine.py | 220 +++++++++++++++++++++++++++++++++ app/utils/common.py | 7 ++ app/utils/dp.py | 2 +- main.py | 4 +- tests/test_db.py | 22 ++++ tests/test_evidence.py | 47 +++++++ tests/test_report_link.py | 32 +++++ 10 files changed, 373 insertions(+), 25 deletions(-) delete mode 100644 app/engines/evidence.py create mode 100644 app/engines/evidence_engine.py create mode 100644 app/utils/common.py create mode 100644 tests/test_db.py create mode 100644 tests/test_evidence.py create mode 100644 tests/test_report_link.py diff --git a/app/app.py b/app/app.py index 884eaae..aff3d65 100644 --- a/app/app.py +++ b/app/app.py @@ -7,6 +7,7 @@ from app.engines.reporter import Reporter from .config import load_config, AppConfig from .engines.crawl_engine import CrawlEngine +from .engines.evidence_engine import EvidenceEngine from .models.base import connect_db, create_database from loguru import logger @@ -33,12 +34,13 @@ class MainApp: help="指定配置文件路径,默认为 ./config.local.toml", ) - # 添加输入文件参数 parser.add_argument( - "-f", - "--file", - default="./urls.txt", - help="指定输入文件路径,默认为 ./urls.txt", + "--crawl", help="采集模式,根据域名批量采集 SURL", + ) + + parser.add_argument( + "--evidence", help="收集证据模式,对数据库内的 SURL 获取证据", + action="store_true" ) # 添加运行模式参数 @@ -72,8 +74,8 @@ class MainApp: args.mode = ["pc", "site", "wap"] # 检查输入的文件是否存在 - if not os.path.exists(args.file): - parser.error(f"输入的文件不存在: {args.file}") + # if not os.path.exists(args.file): + # parser.error(f"输入的文件不存在: {args.file}") # 检查配置文件是否存在 if not os.path.exists(args.config): @@ -83,14 +85,15 @@ class MainApp: def start_cli(self): """开启 CLI 模式""" - # reporter = Reporter(self.args.file, self.args.mode, self.db_engine) - # reporter.run() - crawler = CrawlEngine() - crawler.start() - - time.sleep(3600) - crawler.stop() - + logger.debug(f"args.crawl: {self.args.crawl}") + if self.args.crawl: + crawl = CrawlEngine() + crawl.cli_start(self.args.crawl) + crawl.stop() + elif self.args.evidence: + evidence = EvidenceEngine() + evidence.cli_start() + evidence.stop() def start_web(self): """开启 Web 模式""" diff --git a/app/engines/crawl_engine.py b/app/engines/crawl_engine.py index 6fabac9..d60322a 100644 --- a/app/engines/crawl_engine.py +++ b/app/engines/crawl_engine.py @@ -26,11 +26,33 @@ class CrawlEngine: # 工作线程 self.worker_thread = None + self.database = AppCtx.g_db_engine + def start(self): """启动采集器""" self.worker_thread = threading.Thread(target=self.worker, name="crawl_engine", daemon=True) self.worker_thread.start() + def cli_start(self, target_domain: str): + """CLI 模式启动""" + with Session(self.database) as session: + stmt = select(DomainModel).where(DomainModel.domain == target_domain) + result = session.exec(stmt).first() + if not result: + model: DomainModel = DomainModel( + domain=target_domain, + status=1, + crawl_interval=60 * 7 * 24, + latest_crawl_time=0, + ) + session.add(model) + session.commit() + result = model + + # 直接采集 + surl = self.crawl(target_domain) + self.save_surl(session, result, surl) + def stop(self): """停止采集器""" self.ev.set() @@ -90,6 +112,7 @@ class CrawlEngine: logger.debug("首页加载完成!") # 设置搜索时间范围 + self.ev.wait(1) tab.ele("#timeRlt").click(True) tab.wait.eles_loaded("@class:time_pop_") self.ev.wait(1) @@ -118,7 +141,6 @@ class CrawlEngine: logger.debug("未找到结果,退出") break - # 获取数据 tab.wait.eles_loaded("@id=content_left") results = tab.ele("@id=content_left").eles("@class:result") diff --git a/app/engines/evidence.py b/app/engines/evidence.py deleted file mode 100644 index a1d2f68..0000000 --- a/app/engines/evidence.py +++ /dev/null @@ -1,7 +0,0 @@ - -class EvidenceHolder: - """固定色站证据,搜索 URL 后截图,并生成举报链接存入数据库""" - - def __init__(self): - pass - diff --git a/app/engines/evidence_engine.py b/app/engines/evidence_engine.py new file mode 100644 index 0000000..b81fd19 --- /dev/null +++ b/app/engines/evidence_engine.py @@ -0,0 +1,220 @@ +import os.path +import threading +import urllib +from urllib.parse import urlparse + +import requests +from DrissionPage._pages.mix_tab import MixTab +from loguru import logger +from sqlmodel import Session, select + +from app.config.config import AppCtx +from app.models.report_urls import ReportUrlModel +from app.utils.common import md5 +from app.utils.dp import DPEngine + + +class EvidenceEngine: + """固定色站证据,搜索 URL 后截图,并生成举报链接存入数据库""" + + def __init__(self): + # 开启一个浏览器窗口 + self.dp_engine = DPEngine() + self.wap_dp_engine = DPEngine(is_wap=True) + + # 控制运行状态的数据 + self.ev = threading.Event() + self.status = 1 + + # 工作线程 + self.worker_thread = None + + # 数据库连接 + self.database = AppCtx.g_db_engine + + def start(self): + """启动线程""" + self.worker_thread = threading.Thread(target=self.worker, name="evidence_engine", daemon=True) + self.worker_thread.start() + + def cli_start(self): + """以CLI模式开启,就是只执行一次,不循环""" + # 从数据库中获取所有待收集证据的 URL 列表 + targets = self.get_surl_from_db() + logger.debug(f"共获取到 {len(targets)} 条待处理数据") + + # 依次处理 + for target in targets: + logger.debug(f"开始获取 {target['surl']} 的举报数据") + self.get_screenshot_and_report_link(target) + + def worker(self): + """工作函数""" + while self.status: + # 从数据库中获取所有待收集证据的 URL 列表 + targets = self.get_surl_from_db() + + # 依次处理 + for target in targets: + logger.debug(f"开始获取 {target['surl']} 的举报数据") + self.get_screenshot_and_report_link(target) + + # 每分钟跑一次 + self.ev.wait(60) + + def stop(self): + """结束线程""" + self.status = 0 + self.ev.set() + self.dp_engine.close() + self.wap_dp_engine.close() + + def get_surl_from_db(self): + """从数据库中获取数据""" + result: list = [] + with Session(self.database) as session: + stmt = select(ReportUrlModel).where(ReportUrlModel.has_evidence == False) + surl = session.exec(stmt).all() + for url in surl: + result.append({"id": url.id, "surl": url.surl, "domain": url.domain}) + + return result + + def get_screenshot_and_report_link(self, target: dict): + """获取证据截图和举报链接""" + try: + surl = target["surl"] + + # Part1 获取证据截图 + logger.debug(f"开始获取 {surl} 在百度搜索中的截图") + img_path, tab = self.get_screenshot(target) + if not img_path: + return None + + # Part2 截一张surl本身的图 + logger.debug(f"开始获取 {surl} 的截图") + img_path, wap_tab = self.get_wap_screenshot(target) + wap_tab.close() + + # Part3 获取举报链接 + logger.debug(f"开始获取 {surl} 的举报链接") + report_link = self.get_report_link(tab) + logger.debug(f"获取到举报链接为: {report_link}") + if not report_link: + return None + + # Part4 获取举报链接的信息 + logger.debug(f"开始获取举报链接的参数信息") + params = self.resolve_report_link(report_link) + if not params: + logger.error(f"解析举报链接失败,surl: {surl}") + return None + + token = params["token"][0] + title = params["title"][0] + q = params["q"][0] + surl = params["surl"][0] + + # 更新数据库 + with Session(self.database) as session: + stmt = select(ReportUrlModel).where(ReportUrlModel.id == target["id"]) + model: ReportUrlModel = session.exec(stmt).first() + if not model: + logger.error(f"{target['id']} 记录不存在,跳过...") + return None + # 更新数据 + model.token = token + model.title = title + model.q = q + model.has_evidence = True + session.add(model) + session.commit() + logger.debug(f"{surl} 处理完成") + except Exception as e: + logger.error(f"获取证据截图和举报链接失败: {e}") + + def get_screenshot(self, target: dict) -> tuple[str | None, MixTab]: + """获取搜索页面的截图,返回 img_path """ + search_keyword = target["surl"].lstrip("https://").lstrip("http://") + tab = self.dp_engine.browser.new_tab() + tab.get("https://www.baidu.com") + tab.ele("#kw").input(f"{search_keyword}\n", clear=True) + tab.wait.eles_loaded([".content_none", "#content_left"], any_one=True) + + if "未找到相关结果" in tab.html: + logger.info(f"没有关于 {search_keyword} 的数据") + return None, tab + + # 图片的存储路径 + # 截完图先不要关闭 tab,别的地方还要用 + img_path = f"./imgs/{target['domain']}/{md5(target['surl'])}.png" + return self.do_screenshot(tab, img_path) + + def get_wap_screenshot(self, target: dict) -> tuple[str | None, MixTab]: + """用 wap dp 再截一张 surl 本身的图""" + tab = self.wap_dp_engine.browser.new_tab() + tab.get(target["surl"]) + tab.wait(5) # 这里只能硬等,不知道surl的结构,没办法精确判断 + + img_path = f"./imgs/{target['domain']}/{md5(target['surl'])}-wap.png" + return self.do_screenshot(tab, img_path) + + @staticmethod + def do_screenshot(tab: MixTab, img_path: str, force=False) -> tuple[str | None, MixTab]: + """截图函数""" + if os.path.exists(img_path): + if force: + os.remove(img_path) + else: + logger.debug(f"截图路径 {img_path} 已经存在,跳过截图") + return img_path, tab + + tab.get_screenshot(path=img_path) + logger.debug(f"截图成功: {img_path}") + + return img_path, tab + + @staticmethod + def get_report_link(tab: MixTab): + """获取举报链接,这个时候页面应该停留在搜索结果页""" + tools = tab.eles(".:c-tools") + tab.wait(0.5) + if tools: + tool = tools[0] + tool.hover(0, 0) + tool.click(True) + + tips = tab.eles(".c-tip-menu") + if tips: + tip = tips[0] + report = tip.ele("t:a@@text()=举报") + if report: + return report.attr("href") + + return None + + @staticmethod + def resolve_report_link(report_link): + try: + proxy_link = AppCtx.g_app_config.chrome.proxy + proxies = { + "http": proxy_link, + "https": proxy_link, + } + response = requests.get(report_link, proxies=proxies, timeout=5, allow_redirects=False) + location = response.headers["Location"] + if not location: + logger.warning("没有获取到举报链接的 Location") + return None + + parsed_url = urllib.parse.urlparse(response.headers["Location"]) + query_params = urllib.parse.parse_qs(parsed_url.query) + decoded_params = { + key: [urllib.parse.unquote(value) for value in values] for key, values in query_params.items() + } + if len(decoded_params) == 0: + return None + return decoded_params + except Exception as e: + logger.error(f"解析举报链接失败,错误: {e}") + pass diff --git a/app/utils/common.py b/app/utils/common.py new file mode 100644 index 0000000..0c6ea5d --- /dev/null +++ b/app/utils/common.py @@ -0,0 +1,7 @@ +import hashlib + + +def md5(s: str) -> str: + m = hashlib.md5() + m.update(s.encode('utf-8')) + return m.hexdigest() diff --git a/app/utils/dp.py b/app/utils/dp.py index 183e71c..cea94f0 100644 --- a/app/utils/dp.py +++ b/app/utils/dp.py @@ -10,7 +10,7 @@ class DPEngine: def __init__(self, is_wap: bool = False, no_img: bool = True): chrome_opts = ChromiumOptions() chrome_opts.mute(True) # 静音 - chrome_opts.headless(True) # 无头模式 + # chrome_opts.headless(True) # 无头模式 chrome_opts.no_imgs(no_img) # 不加载图片 chrome_opts.set_argument("--disable-gpu") # 禁用GPU chrome_opts.set_argument('--ignore-certificate-errors') # 忽略证书错误 diff --git a/main.py b/main.py index bfc0201..ab59764 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,5 @@ +from loguru import logger + from app import MainApp import sys @@ -9,7 +11,7 @@ def main(): app = MainApp() app.run() except Exception as e: - print(f"程序运行失败,错误 {e} 信息如下:") + logger.error(f"程序运行失败,错误 {e} 信息如下:") traceback.print_exc() sys.exit(1) diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..5a30a01 --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,22 @@ +from sqlalchemy import create_engine +from sqlmodel import Session, select + +from app.models.report_urls import ReportUrlModel + +dsn = f"mysql+pymysql://root:123456@localhost:3306/baidu_reporter" +engine = create_engine(dsn, echo=True) + +with Session(engine) as session: + + stmt = select(ReportUrlModel).where(ReportUrlModel.surl == "4444") + result = session.exec(stmt).first() + print(result) + + if not result: + example = ReportUrlModel( + domain_id=1, + domain="111", + surl="4444", + ) + session.add(example) + session.commit() diff --git a/tests/test_evidence.py b/tests/test_evidence.py new file mode 100644 index 0000000..80f7fc3 --- /dev/null +++ b/tests/test_evidence.py @@ -0,0 +1,47 @@ +from DrissionPage import ChromiumOptions +from DrissionPage import Chromium + +chrome_opts = ChromiumOptions() +chrome_opts.mute(True) # 静音 +chrome_opts.no_imgs(False) +chrome_opts.set_argument("--disable-gpu") +chrome_opts.set_argument('--ignore-certificate-errors') +chrome_opts.set_argument("--proxy-server=http://127.0.0.1:7890") +chrome_opts.incognito(True) +chrome_opts.set_browser_path(r"C:\Program Files\Google\Chrome\Application\chrome.exe") +# chrome_opts.auto_port(True) +browser = Chromium(addr_or_opts=chrome_opts) + +search_keyword = "www.yunzhiju.net/zxysx/11456.html" + +tab = browser.new_tab("https://www.baidu.com/") +tab.ele("#kw").input(f"{search_keyword}\n", clear=True) +print("before wait") +tab.wait.eles_loaded([".content_none", "#content_left"], any_one=True) +print("after wait") + +tools = tab.eles('.:c-tools') +print(tools) +tab.wait(1) + +if tools: + tool = tools[0] + tool.hover(0, 0) + tool.click(True) + +tips = tab.eles(".c-tip-menu") +print("tips:", tips) +if tips: + tip = tips[0] + temp = tip.ele("t:a@@text()=举报") + print(temp) + href = temp.attr("href") + print(f"href={href}") + +# tools = tab.eles(".c-tools") +# print(tools) +# for tool in tools: +# tool.hover(1,1) +# +# for x in tab.eles("t:a@@text()=举报"): +# print(x) diff --git a/tests/test_report_link.py b/tests/test_report_link.py new file mode 100644 index 0000000..8933024 --- /dev/null +++ b/tests/test_report_link.py @@ -0,0 +1,32 @@ +import sys +import urllib +from urllib.parse import urlparse + +import requests +from loguru import logger + + +def main(): + report_link = "https://www.baidu.com/tools?url=https%3A%2F%2Fwww.yunzhiju.net%2Fzxysx%2F11456.html&jump=http%3A%2F%2Fjubao.baidu.com%2Fjubao%2Faccu%2F%3Ftitle%3D%2501%25E5%25A6%2582%25E4%25BD%2595%2501%25E9%2580%259A%25E8%25BF%2587%2501%25E7%259B%25B4%25E6%2592%25AD%2501%25E6%2590%25AC%25E5%25AE%25B6%2501app%2501%25E4%25B8%258B%25E8%25BD%25BD%2501%25E5%25AE%2598%25E6%2596%25B9%2501%25E6%25AD%25A3%25E7%2589%2588%2501%25E5%25AE%2589%25E8%25A3%2585%2501%25E5%25B9%25B6%2501%25E4%25BD%2593%25E9%25AA%258C%2501%25E4%25BE%25BF%25E6%258D%25B7%2501%25E7%259A%2584%2501%25E6%2590%25AC%25E5%25AE%25B6%2501%25E6%259C%258D%25E5%258A%25A1%2501%253F%2501-%2501%25E4%25BA%2591%2501%25E4%25B9%258B%2501...%26q%3Dwww.yunzhiju.net%252Fzxysx%252F11456.html%26has_gw%3D0%26has_v%3D0&key=surl" + proxy_link = "http://localhost:7890" + proxies = { + "http": proxy_link, + "https": proxy_link, + } + response = requests.get(report_link, proxies=proxies, timeout=5, allow_redirects=False) + location = response.headers["Location"] + if not location: + logger.warning("没有获取到举报链接的 Location") + return + + parsed_url = urllib.parse.urlparse(response.headers["Location"]) + query_params = urllib.parse.parse_qs(parsed_url.query) + decoded_params = { + key: [urllib.parse.unquote(value) for value in values] for key, values in query_params.items() + } + + print(decoded_params) + + +if __name__ == '__main__': + main()