diff --git a/README.md b/README.md new file mode 100644 index 0000000..9810d9a --- /dev/null +++ b/README.md @@ -0,0 +1,8 @@ +# baidu-reporter + +- crawl_engine: + - 从任务队列中获取待爬取的 URL,打开百度搜索收集链接,结果存入 surl 表中 +- evidence_engine: + - 轮询 surl 表,检查还没有收集证据的 surl,开始收集证据 +- reporter_engine: + - 轮询 surl 表,找到没有被对应渠道举报过的 url,开始去举报 \ No newline at end of file diff --git a/app/app.py b/app/app.py index f7b2316..884eaae 100644 --- a/app/app.py +++ b/app/app.py @@ -1,10 +1,12 @@ import argparse import sys import os +import time from app.engines.reporter import Reporter from .config import load_config, AppConfig +from .engines.crawl_engine import CrawlEngine from .models.base import connect_db, create_database from loguru import logger @@ -81,8 +83,14 @@ class MainApp: def start_cli(self): """开启 CLI 模式""" - reporter = Reporter(self.args.file, self.args.mode, self.db_engine) - reporter.run() + # reporter = Reporter(self.args.file, self.args.mode, self.db_engine) + # reporter.run() + crawler = CrawlEngine() + crawler.start() + + time.sleep(3600) + crawler.stop() + def start_web(self): """开启 Web 模式""" @@ -112,7 +120,7 @@ class MainApp: logger.info(f"连接数据库 {self.config.database.database} 成功") else: logger.error(f"连接数据库失败,请检查配置文件或数据库服务是否正常: {e}") - sys.exit(1) + sys.exit(1) # 如果指定了 --web 参数,启动 web 服务器,忽略其他选项 if self.args.web: diff --git a/app/config/__init__.py b/app/config/__init__.py index 9281934..5795fe9 100644 --- a/app/config/__init__.py +++ b/app/config/__init__.py @@ -1 +1 @@ -from .config import load_config, AppConfig, DatabaseConfig, ChromeConfig, gAppConfig +from .config import load_config, AppConfig, DatabaseConfig, ChromeConfig diff --git a/app/config/config.py b/app/config/config.py index 812b46d..cb15b62 100644 --- a/app/config/config.py +++ b/app/config/config.py @@ -2,6 +2,15 @@ from dataclasses import dataclass import toml +class AppCtx: + # 全局变量 + # 配置信息 + g_app_config = None + + # 数据库连接 + g_db_engine = None + + @dataclass class DatabaseConfig: """数据库配置""" @@ -18,6 +27,7 @@ class ChromeConfig: """浏览器配置""" proxy: str + browser_path: str @dataclass @@ -29,10 +39,6 @@ class AppConfig: chrome: ChromeConfig -# 全局变量,存储配置信息,not best practice, but it's ok for now -gAppConfig = AppConfig() - - def load_config(config_path: str) -> AppConfig: """加载配置""" with open(config_path, "r", encoding="utf-8") as f: @@ -41,7 +47,7 @@ def load_config(config_path: str) -> AppConfig: database_config = DatabaseConfig(**config_dict["database"]) chrome_config = ChromeConfig(**config_dict["chrome"]) - gAppConfig = AppConfig( + AppCtx.g_app_config = AppConfig( debug=config_dict["debug"], database=database_config, chrome=chrome_config ) - return gAppConfig + return AppCtx.g_app_config diff --git a/app/engines/collector.py b/app/engines/collector.py deleted file mode 100644 index 388d425..0000000 --- a/app/engines/collector.py +++ /dev/null @@ -1,8 +0,0 @@ - -class Collector: - """色站URL采集器,自动在百度翻页搜索,并保存搜索结果""" - - def __init__(self): - pass - - diff --git a/app/engines/crawl_engine.py b/app/engines/crawl_engine.py new file mode 100644 index 0000000..6fabac9 --- /dev/null +++ b/app/engines/crawl_engine.py @@ -0,0 +1,167 @@ +import threading +import time + +from loguru import logger +from sqlmodel import Session, select + +from app.config.config import AppCtx +from app.models.domain import DomainModel +from app.models.report_urls import ReportUrlModel +from app.utils.dp import DPEngine + + +class CrawlEngine: + """色站URL采集器,自动在百度翻页搜索,并保存搜索结果 + 对应原项目中的 getBaiDuIncludeUrls 方法 + """ + + def __init__(self): + + self.ev = threading.Event() + self.status = 1 + + # 创建一个浏览器 + self.dp_engine = DPEngine() + + # 工作线程 + self.worker_thread = None + + def start(self): + """启动采集器""" + self.worker_thread = threading.Thread(target=self.worker, name="crawl_engine", daemon=True) + self.worker_thread.start() + + def stop(self): + """停止采集器""" + self.ev.set() + self.status = 0 + self.dp_engine.browser.quit() + + def worker(self): + """真正的工作函数""" + logger.info("crawl worker start!") + while self.status == 1: + # 检查数据库,从中获取需要爬取的域名 + current_timestamp = int(time.time()) + with Session(AppCtx.g_db_engine) as session: + + stmt = select(DomainModel).where( + DomainModel.latest_crawl_time + DomainModel.crawl_interval <= current_timestamp + ) + domains = session.exec(stmt).all() + + for domain_model in domains: + # 采集 + surl_set = self.crawl(domain_model.domain) + + # 存储 + if surl_set: + self.save_surl(session, domain_model, surl_set) + + domain_model.latest_crawl_time = int(time.time()) + session.add(domain_model) + session.commit() + + self.ev.wait(60) + + logger.info("crawl worker stop!") + + def crawl(self, domain: str) -> set[str] | None: + """爬取URL的函数""" + logger.debug(f"开始爬取:{domain}") + tab = self.dp_engine.browser.new_tab() + surl_set: set[str] = set() + + try: + # 初始数据 + end_time = int(time.time()) + start_time = end_time - 3600 * 24 * 30 # 获取最近一个月的数据 + + # 依次每一页处理 + max_page = 10 # 最大页码数量,0表示不限制最大数量 + current_page = 0 # 当前页码 + + # 先打开搜索页面 + tab.get("https://www.baidu.com/") + tab.wait.eles_loaded("#kw") + tab.ele("#kw").input(f"site:{domain}\n", clear=True) + tab.wait.eles_loaded("#container") + tab.wait.eles_loaded("#timeRlt") + logger.debug("首页加载完成!") + + # 设置搜索时间范围 + tab.ele("#timeRlt").click(True) + tab.wait.eles_loaded("@class:time_pop_") + self.ev.wait(1) + # logger.debug("时间菜单!") + tab.ele("t:li@@text()= 一月内 ").click(True) + tab.wait.eles_loaded(["#container", ".content_none", "#content_left"], any_one=True) + + while True: + + # 增加页码 + current_page += 1 + logger.debug(f"爬取 {domain} 的第 {current_page} 页数据") + # 直接访问 URL 会触发验证码 + # tab.get( + # f"https://www.baidu.com/s?wd=site%3A{domain}&gpc=stf%3D{start_time}%2C{end_time}%7Cstftype%3D1&pn={(current_page - 1) * 10}") + # tab.get(f"https://www.baidu.com/s?wd=site%3A{domain}&pn={(current_page - 1) * 10}") + + # 终止条件 + if current_page > max_page and max_page: + logger.debug("达到指定页码,退出") + break + + # logger.debug(f"tab.html: {tab.html}") + self.ev.wait(0.3) + if "未找到相关结果" in tab.html: + logger.debug("未找到结果,退出") + break + + + # 获取数据 + tab.wait.eles_loaded("@id=content_left") + results = tab.ele("@id=content_left").eles("@class:result") + # temp = [result.attr("mu") for result in results if result.attr("mu") is not None] + for result in results: + surl = result.attr("mu") + if not surl: + continue + logger.debug(f"找到 URL : {surl}") + surl_set.add(surl) + + # 翻页的时候等一下,别太快了 + self.ev.wait(0.3) + # 如果没有下一页了,这个地方会找不到元素,有 10 秒的 timeout + next_btn = tab.ele("t:a@@text():下一页") + if not next_btn: + logger.debug("没有下一页了") + break + next_btn.click(True) + + return surl_set + except Exception as e: + logger.error(f"爬取{domain}发生错误:{e}") + import traceback + traceback.print_exc() + finally: + tab.close() + + @staticmethod + def save_surl(session, domain_model, surl_set): + """保存采集到的URL""" + + for surl in surl_set: + # 先检查是否存在 + stmt = select(ReportUrlModel).where(ReportUrlModel.surl == surl) + exist = session.exec(stmt).first() + + # 不存在再插入 + if not exist: + example = ReportUrlModel( + domain_id=domain_model.id, + domain=domain_model.domain, + surl=surl, + ) + session.add(example) + session.commit() diff --git a/app/engines/reporter.py b/app/engines/reporter.py index 79e0d7c..fafa43e 100644 --- a/app/engines/reporter.py +++ b/app/engines/reporter.py @@ -1,14 +1,13 @@ +from loguru import logger from sqlalchemy import Engine, select from sqlmodel import Session from app.utils.dp import DPEngine from .reporters.pc_reporter import PcReporter -from .reporters.wap_reporter import WapReporter from .reporters.site_reporter import SiteReporter +from .reporters.wap_reporter import WapReporter from ..models.report_urls import ReportUrlModel -from loguru import logger - class Reporter: """举报器,目前有三个渠道,以后可以继续扩展""" @@ -27,14 +26,14 @@ class Reporter: # TODO 初始化 reporter 需要的公共数据,比如headless chrome等等 # self.baseDP = DPEngine(is_wap=False, no_img=True) - + def run(self): """开始k站""" self.get_reports_data() def get_reports_data(self): """获取举报数据:页面截图,举报链接等,并存到数据库中""" - + urls = self.read_urls() logger.info(f"从文件 {self.urls_file} 读取到 {len(urls)} 个 URL") @@ -42,9 +41,11 @@ class Reporter: # 如果数据库中没有,去获取页面截图、举报链接 # 如果获取成功,就插入数据库 # 如果获取失败,就记录日志 - dp = DPEngine(is_wap=False, no_img=True) # 用来截图的浏览器实例 + dp = DPEngine(is_wap=False, no_img=True) # 用来截图的浏览器实例 with Session(self.db_engine) as session: for url in urls: + + # 检查是否已经存在,如果存在就跳过 stmt = select(ReportUrlModel).where(ReportUrlModel.surl == url) report_url = session.exec(stmt).one_or_none() logger.debug(f"查询 {url} 的结果: {report_url}") @@ -52,12 +53,20 @@ class Reporter: continue # 获取页面截图、举报链接 - # 1. 打开baidu搜索URL,如果有结果就截一张图,没有的话就跳过 + # 打开baidu搜索URL + tab = dp.browser.new_tab(url) + tab.wait(5) + if "未找到相关结果" in tab.html: + logger.info(f"SRUl {url} 搜索结果空") + continue + + # 截一张图 + # img_path = f"./imgs/{report_domain}/{md5_hash(report_url)}.png" + # tab.get_screenshot() + # 2. 点击举报按钮,获取举报链接 # 3. 打开 URL 截一张图 # 4. 存到数据库里 - dp.browser.new_tab(url) - def read_urls(self) -> list[str]: """读取 urls 文件""" @@ -69,4 +78,3 @@ class Reporter: continue urls.append(url) return urls - diff --git a/app/models/base.py b/app/models/base.py index 271f009..6d12482 100644 --- a/app/models/base.py +++ b/app/models/base.py @@ -2,11 +2,10 @@ from datetime import datetime from sqlalchemy import create_engine, event, text from sqlalchemy.dialects.mysql import BIGINT -from sqlmodel import SQLModel, Field, Session -from sqlalchemy.exc import OperationalError -from loguru import logger +from sqlmodel import SQLModel, Field from ..config import AppConfig +from ..config.config import AppCtx def get_timestamp(): @@ -17,8 +16,8 @@ class BaseModel(SQLModel): """数据库基类""" id: int = Field(default=None, primary_key=True, sa_type=BIGINT(unsigned=True)) - created_at: int = Field(default=get_timestamp, sa_type=BIGINT(unsigned=True)) - updated_at: int = Field(default=get_timestamp, sa_type=BIGINT(unsigned=True)) + created_at: int = Field(default_factory=get_timestamp, sa_type=BIGINT(unsigned=True)) + updated_at: int = Field(default_factory=get_timestamp, sa_type=BIGINT(unsigned=True)) @event.listens_for(BaseModel, "before_update") @@ -27,27 +26,31 @@ def update_updated_at(mapper, connection, target): target.updated_at = get_timestamp() - def connect_db(config: AppConfig): """连接数据库""" - + # 导入所有模型,为了自动创建数据表 - from .domain import DomainModel - from .report_urls import ReportUrlModel - + dsn = f"mysql+pymysql://{config.database.user}:{config.database.password}@{config.database.host}:{config.database.port}/{config.database.database}" engine = create_engine(dsn, echo=config.debug) SQLModel.metadata.create_all(engine) + AppCtx.g_db_engine = engine return engine + def create_database(config: AppConfig): """如果数据库不存在,就调用这个函数初始化数据库""" - + + # 先创建一个没有指定数据库的连接 dsn = f"mysql+pymysql://{config.database.user}:{config.database.password}@{config.database.host}:{config.database.port}" engine = create_engine(dsn, echo=config.debug) - + with engine.connect() as conn: - conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {config.database.database} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")) + conn.execute( + text( + f"CREATE DATABASE IF NOT EXISTS {config.database.database} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci" + ) + ) conn.commit() diff --git a/app/models/domain.py b/app/models/domain.py index 647d845..1d56c96 100644 --- a/app/models/domain.py +++ b/app/models/domain.py @@ -5,7 +5,18 @@ from .base import BaseModel class DomainModel(BaseModel, table=True): - """存储域名的模型""" + """存储根域名""" + __tablename__ = 'domain' + + # 域名 domain: str = Field(alias="domain", default="", sa_type=VARCHAR(1024)) + + # 爬取状态,TODO:先空着,后续有任务控制之后,用这个字段表示这个域名的任务状态 status: int = Field(alias="status", default=0) + + # 爬取间隔,默认间隔为1周 + crawl_interval: int = Field(alias="crawl_interval", default=60 * 24 * 7) + + # 最近一次爬取时间,精确到秒的时间戳 + latest_crawl_time: int = Field(alias="latest_crawl_time", default=0) diff --git a/app/models/report_urls.py b/app/models/report_urls.py index 262bf13..52c5caf 100644 --- a/app/models/report_urls.py +++ b/app/models/report_urls.py @@ -1,4 +1,5 @@ from sqlalchemy import VARCHAR +from sqlalchemy.dialects.mysql import BIGINT from sqlmodel import Field from .base import BaseModel @@ -7,11 +8,30 @@ from .base import BaseModel class ReportUrlModel(BaseModel, table=True): """存储举报 URL 的模型""" + __tablename__ = 'report_url' + + # 域名ID + domain_id: int = Field(alias='domain_id', default=0, sa_type=BIGINT(unsigned=True)) + + # 域名内容 domain: str = Field(alias="domain", default="", sa_type=VARCHAR(1024)) + + # SURL surl: str = Field(alias="surl", default="", sa_type=VARCHAR(2048)) # 待举报的 URL + + # 举报 token token: str = Field(alias="token", default="", sa_type=VARCHAR(64)) + + # 标题 title: str = Field(alias="title", default="", sa_type=VARCHAR(1024)) + + # 搜索关键词 q: str = Field(alias="q", default="", sa_type=VARCHAR(1024)) # TODO: 这是干啥的? + + # 举报渠道状态 is_report_by_one: bool = Field(alias="is_report_by_one", default=False) is_report_by_site: bool = Field(alias="is_report_by_site", default=False) is_report_by_wap: bool = Field(alias="is_report_by_wap", default=False) + + # 证据状态 + has_evidence: bool = Field(alias="has_evidence", default=False) diff --git a/app/utils/dp.py b/app/utils/dp.py index cad7b21..183e71c 100644 --- a/app/utils/dp.py +++ b/app/utils/dp.py @@ -1,7 +1,8 @@ from DrissionPage import Chromium, ChromiumOptions -from app.config import gAppConfig -from app.utils import random_ua +from .ua import random_ua +from ..config.config import AppCtx + class DPEngine: """DrissionPage 引擎""" @@ -10,14 +11,19 @@ class DPEngine: chrome_opts = ChromiumOptions() chrome_opts.mute(True) # 静音 chrome_opts.headless(True) # 无头模式 - chrome_opts.no_image(no_img) # 不加载图片 + chrome_opts.no_imgs(no_img) # 不加载图片 chrome_opts.set_argument("--disable-gpu") # 禁用GPU chrome_opts.set_argument('--ignore-certificate-errors') # 忽略证书错误 - chrome_opts.set_argument('--start-maximized') # 最大化启动 - chrome_opts.set_argument(f'--user-agent={random_ua(is_wap)}') # 设置 UA + chrome_opts.set_argument('--start-maximized') # 最大化启动 + chrome_opts.set_argument(f'--user-agent={random_ua(is_wap)}') # 设置 UA + chrome_opts.incognito(True) + chrome_opts.auto_port(True) # 设置代理 - if gAppConfig.chrome.proxy: - chrome_opts.set_argument('--proxy-server', gAppConfig.chrome.proxy) - + if AppCtx.g_app_config.chrome.proxy: + chrome_opts.set_argument('--proxy-server', AppCtx.g_app_config.chrome.proxy) + # 创建浏览器对象 self.browser = Chromium(addr_or_opts=chrome_opts) + + def close(self): + self.browser.quit() diff --git a/tests/test_dp.py b/tests/test_dp.py index 1ed0853..ef6734e 100644 --- a/tests/test_dp.py +++ b/tests/test_dp.py @@ -13,7 +13,9 @@ chrome_opts.set_argument('--ignore-certificate-errors') chrome_opts.set_argument("--proxy-server=http://127.0.0.1:7890") chrome_opts.incognito(True) chrome_opts.set_browser_path(r"C:\Program Files\Google\Chrome\Application\chrome.exe") +chrome_opts.auto_port(True) browser = Chromium(addr_or_opts=chrome_opts) +browser2 = Chromium(addr_or_opts=chrome_opts) domain = "www.yunzhiju.net" keyword = "www.yunzhiju.net/zxysx/9850.html" @@ -33,12 +35,18 @@ tab = browser.new_tab(f"https://www.baidu.com/s?wd={keyword}") # week_btn_el.click(by_js=True) # tab.wait(2) -if "未找到相关结果" in tab.html: - print("未找到相关结果") -else: - print("找到相关结果") - img_path = f"./imgs/{domain}/{md5_hash(keyword)}.png" - print(img_path) - tab.get_screenshot(img_path) +print("2222") +# tab.ele(".content_none") +# tab.wait.eles_loaded(["#container", ".content_none", "#content_left"], any_one=True) +print("未找到相关结果" in tab.html) +print("1111") + +# if "未找到相关结果" in tab.html: +# print("未找到相关结果") +# else: +# print("找到相关结果") + # img_path = f"./imgs/{domain}/{md5_hash(keyword)}.png" + # print(img_path) + # tab.get_screenshot(img_path) diff --git a/tests/test_dp2.py b/tests/test_dp2.py new file mode 100644 index 0000000..0e3ba75 --- /dev/null +++ b/tests/test_dp2.py @@ -0,0 +1,50 @@ +import sys +import time + +from DrissionPage import Chromium, ChromiumOptions +from loguru import logger + +chrome_opts = ChromiumOptions() +chrome_opts.mute(True) # 静音 +chrome_opts.no_imgs(False) +chrome_opts.set_argument("--disable-gpu") +chrome_opts.set_argument('--ignore-certificate-errors') +chrome_opts.set_argument("--proxy-server=http://127.0.0.1:7890") +# chrome_opts.incognito(True) +chrome_opts.set_browser_path(r"C:\Program Files\Google\Chrome\Application\chrome.exe") +browser = Chromium(addr_or_opts=chrome_opts) + +tab = browser.new_tab() + +domain = "lightless.me" +end_time = int(time.time()) +start_time = end_time - 3600 * 24 * 30 +# tab.get(f"https://www.baidu.com/s?wd=site%3A{domain}&&gpc=stf%3D{start_time}%2C{end_time}%7Cstftype%3D1") +tab.get(f"https://www.baidu.com/s?wd=site%3A{domain}") + +tab.wait.eles_loaded("@id=container") + +# print(tab.html) +logger.debug("1") +if "抱歉,未找到相关结果。" in tab.html: + print("no result") + browser.quit() + sys.exit() + + +tab.wait.eles_loaded("@id=content_left") +logger.debug("2") +el = tab.ele("@id=content_left") + +results = el.eles("@class:result") + +# tab.wait.eles_loaded("@class:result c-container xpath-log new-pmd") +# logger.debug("3") +# results = tab.eles("@class:result c-container xpath-log new-pmd") +# logger.debug("4") +for result in results: + logger.debug(result) + logger.debug("找到了一个URL:{}", result.attr("mu")) + + +browser.quit() \ No newline at end of file