From cdc51a6eac8a6e5e939fe2dfc9070ccff28d0533 Mon Sep 17 00:00:00 2001 From: xhy Date: Tue, 1 Apr 2025 00:36:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=A4=9A=E4=B8=AA=E5=9F=9F?= =?UTF-8?q?=E5=90=8D=E7=9A=84=E8=AF=BB=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/app.py | 14 ++- app/engines/crawl_engine.py | 189 ++++++++++++++++++++++++++---------- app/utils/dp.py | 4 +- 3 files changed, 151 insertions(+), 56 deletions(-) diff --git a/app/app.py b/app/app.py index abecd26..4d36c83 100644 --- a/app/app.py +++ b/app/app.py @@ -35,7 +35,10 @@ class MainApp: ) parser.add_argument( - "--crawl", help="采集模式,根据域名批量采集 SURL", + "--crawl", help="采集模式,根据域名批量采集 SURL,多个域名可使用英文逗号分割,也可通过 --crawl-file 传入文件", + ) + parser.add_argument( + "--crawl-file", help="目标域名文件,批量传入待采集的域名,每行一个" ) parser.add_argument( @@ -62,6 +65,7 @@ class MainApp: sys.exit(0) args = parser.parse_args() + logger.debug(f"{args=}") # 处理模式参数 if args.report: @@ -75,8 +79,8 @@ class MainApp: args.report = ["pc", "site", "wap"] # 检查输入的文件是否存在 - # if not os.path.exists(args.file): - # parser.error(f"输入的文件不存在: {args.file}") + if args.crawl_file and not os.path.exists(args.crawl_file): + parser.error(f"--crawl_file 指定的文件 {args.crawl_file} 不存在") # 检查配置文件是否存在 if not os.path.exists(args.config): @@ -86,10 +90,10 @@ class MainApp: def start_cli(self): """开启 CLI 模式""" - logger.debug(f"args.crawl: {self.args.crawl}") if self.args.crawl: crawl = CrawlEngine() - crawl.cli_start(self.args.crawl) + crawl.cli_start(self.args.crawl, self.args.crawl_file) + crawl.cli_wait() crawl.stop() elif self.args.evidence: evidence = EvidenceEngine() diff --git a/app/engines/crawl_engine.py b/app/engines/crawl_engine.py index d60322a..a6eb225 100644 --- a/app/engines/crawl_engine.py +++ b/app/engines/crawl_engine.py @@ -1,3 +1,4 @@ +import queue import threading import time @@ -18,51 +19,133 @@ class CrawlEngine: def __init__(self): self.ev = threading.Event() - self.status = 1 + # 控制线程的运行状态,1-运行,0-停止 + self.worker_status = 1 + # 标记任务是否添加完成,是 CLI 模式正常结束的标记,1-任务添加完成,没有新任务了,0-还有任务待添加 + self.finish_task = 0 + + # 线程池 + self.pool: list[threading.Thread] = [] + self.worker_count = 4 + + # 工作队列 + self.target_queue = queue.Queue(1024) # 创建一个浏览器 self.dp_engine = DPEngine() - # 工作线程 - self.worker_thread = None - self.database = AppCtx.g_db_engine - def start(self): - """启动采集器""" - self.worker_thread = threading.Thread(target=self.worker, name="crawl_engine", daemon=True) - self.worker_thread.start() + def cli_start(self, target_domains: str, target_domain_filepath: str): + """CLI 模式启动 + target_domains: 英文逗号分割的字符串 + target_domain_filepath: 存放目标域名的文件,每行一个 + """ + # 把输入的域名先存入数据库 + domains = self.add_domain(target_domains, target_domain_filepath) - def cli_start(self, target_domain: str): - """CLI 模式启动""" - with Session(self.database) as session: - stmt = select(DomainModel).where(DomainModel.domain == target_domain) - result = session.exec(stmt).first() - if not result: - model: DomainModel = DomainModel( - domain=target_domain, - status=1, - crawl_interval=60 * 7 * 24, - latest_crawl_time=0, - ) - session.add(model) - session.commit() - result = model + # 启动线程池 + for idx in range(self.worker_count): + # CLI 启动模式下,和web的启动模式逻辑不一样,这里要单独写一个cli worker + thread = threading.Thread(target=self.cli_worker, name=f"crawl_engine_{idx}", daemon=True) + self.pool.append(thread) + thread.start() - # 直接采集 - surl = self.crawl(target_domain) - self.save_surl(session, result, surl) + # 添加任务到队列中 + for domain in domains: + logger.debug(f"开始添加 {domain} 到任务队列") + while True: + try: + self.target_queue.put_nowait(domain) + break + except queue.Full: + self.ev.wait(5) + continue + # 添加完成了,把标记为改为完成 + self.finish_task = 1 + + def cli_wait(self): + [x.join() for x in self.pool] + + def cli_worker(self): + """CLI 模式下的 worker,只处理输入的域名,忽略数据库中的刷新条件""" + while True: + try: + # 当控制位被操作的时候,结束 + if not self.worker_status: + logger.debug(f"{threading.current_thread().name}控制位退出") + break + + # 当队列空了,并且任务已经全部推送完成的时候,标记为结束 + if self.target_queue.empty() and self.finish_task: + logger.debug(f"{threading.current_thread().name}队列空了退出") + break + + # 获取数据并开始采集 + domain = self.target_queue.get_nowait() + surl = self.crawl(domain) + if not surl: + logger.debug(f"{threading.current_thread().name} 爬取 {domain} 异常,开始处理下一个") + continue + + # 存入数据库 + with Session(self.database) as session: + self.save_surl(session, domain, surl) + except queue.Empty: + # 队列空了,等1秒再取一次 + self.ev.wait(1) + continue + except Exception as e: + logger.exception(f"{threading.current_thread().name} 执行错误:{e}") + continue + + logger.info(f"{threading.current_thread().name} 退出") + + def add_domain(self, input_domains: str, input_domain_filepath: str) -> list[str]: + """把输入的域名存到库里""" + # 生成所有待采集的域名列表 + domains = [d.strip() for d in input_domains.split(",") if d.strip()] + if input_domain_filepath: + with open(input_domain_filepath, "r") as fp: + for line in fp: + line = line.strip() + if line: + domains.append(line) + + logger.debug(f"总共输入 {len(domains)} 个域名") + + # 检查在数据库中是否有重复的 + for domain in domains: + with Session(self.database) as session: + stmt = select(DomainModel).where(DomainModel.domain == domain) + result = session.exec(stmt).first() + if not result: + example = DomainModel( + domain=domain, status=1, crawl_interval=60 * 7 * 24, latest_crawl_time=0, + ) + session.add(example) + session.commit() + logger.info(f"添加域名 {domain} 到数据库") + + return domains def stop(self): """停止采集器""" self.ev.set() - self.status = 0 + self.worker_status = 0 self.dp_engine.browser.quit() + def start(self): + """启动采集器,以 web 方式启动的时候,走这边""" + for idx in range(self.worker_count): + thread = threading.Thread(target=self.worker, name=f"crawl_engine_{idx}", daemon=True) + self.pool.append(thread) + thread.start() + def worker(self): """真正的工作函数""" logger.info("crawl worker start!") - while self.status == 1: + while self.worker_status == 1: # 检查数据库,从中获取需要爬取的域名 current_timestamp = int(time.time()) with Session(AppCtx.g_db_engine) as session: @@ -90,7 +173,7 @@ class CrawlEngine: def crawl(self, domain: str) -> set[str] | None: """爬取URL的函数""" - logger.debug(f"开始爬取:{domain}") + logger.debug(f"{threading.current_thread().name} 开始爬取:{domain}") tab = self.dp_engine.browser.new_tab() surl_set: set[str] = set() @@ -109,13 +192,13 @@ class CrawlEngine: tab.ele("#kw").input(f"site:{domain}\n", clear=True) tab.wait.eles_loaded("#container") tab.wait.eles_loaded("#timeRlt") - logger.debug("首页加载完成!") + logger.debug(f"{threading.current_thread().name} #timeRlt 加载完成!") # 设置搜索时间范围 - self.ev.wait(1) + self.ev.wait(2) tab.ele("#timeRlt").click(True) tab.wait.eles_loaded("@class:time_pop_") - self.ev.wait(1) + self.ev.wait(2) # logger.debug("时间菜单!") tab.ele("t:li@@text()= 一月内 ").click(True) tab.wait.eles_loaded(["#container", ".content_none", "#content_left"], any_one=True) @@ -124,7 +207,7 @@ class CrawlEngine: # 增加页码 current_page += 1 - logger.debug(f"爬取 {domain} 的第 {current_page} 页数据") + logger.debug(f"{threading.current_thread().name} 爬取 {domain} 的第 {current_page} 页数据") # 直接访问 URL 会触发验证码 # tab.get( # f"https://www.baidu.com/s?wd=site%3A{domain}&gpc=stf%3D{start_time}%2C{end_time}%7Cstftype%3D1&pn={(current_page - 1) * 10}") @@ -132,13 +215,13 @@ class CrawlEngine: # 终止条件 if current_page > max_page and max_page: - logger.debug("达到指定页码,退出") + logger.debug(f"{threading.current_thread().name} 达到指定页码,退出") break # logger.debug(f"tab.html: {tab.html}") self.ev.wait(0.3) if "未找到相关结果" in tab.html: - logger.debug("未找到结果,退出") + logger.debug(f"{threading.current_thread().name} 未找到结果,退出") break # 获取数据 @@ -149,7 +232,7 @@ class CrawlEngine: surl = result.attr("mu") if not surl: continue - logger.debug(f"找到 URL : {surl}") + logger.debug(f"{threading.current_thread().name} 找到 URL : {surl}") surl_set.add(surl) # 翻页的时候等一下,别太快了 @@ -157,33 +240,41 @@ class CrawlEngine: # 如果没有下一页了,这个地方会找不到元素,有 10 秒的 timeout next_btn = tab.ele("t:a@@text():下一页") if not next_btn: - logger.debug("没有下一页了") + logger.debug(f"{threading.current_thread().name} 没有下一页了") break next_btn.click(True) return surl_set except Exception as e: - logger.error(f"爬取{domain}发生错误:{e}") + logger.error(f"{threading.current_thread().name} 爬取{domain}发生错误:{e}") import traceback traceback.print_exc() finally: tab.close() @staticmethod - def save_surl(session, domain_model, surl_set): + def save_surl(session: Session, domain: str, surl_set: set[str]): """保存采集到的URL""" - for surl in surl_set: # 先检查是否存在 stmt = select(ReportUrlModel).where(ReportUrlModel.surl == surl) exist = session.exec(stmt).first() - # 不存在再插入 - if not exist: - example = ReportUrlModel( - domain_id=domain_model.id, - domain=domain_model.domain, - surl=surl, - ) - session.add(example) - session.commit() + if exist: + continue + + # 检查域名是否存在 + domain_model = session.exec( + select(DomainModel).where(DomainModel.domain == domain) + ).first() + if not domain_model: + logger.warning(f"域名 {domain} 不在数据库中") + continue + + example = ReportUrlModel( + domain_id=domain_model.id, + domain=domain_model.domain, + surl=surl, + ) + session.add(example) + session.commit() diff --git a/app/utils/dp.py b/app/utils/dp.py index 183e71c..92b0d7c 100644 --- a/app/utils/dp.py +++ b/app/utils/dp.py @@ -10,13 +10,13 @@ class DPEngine: def __init__(self, is_wap: bool = False, no_img: bool = True): chrome_opts = ChromiumOptions() chrome_opts.mute(True) # 静音 - chrome_opts.headless(True) # 无头模式 + chrome_opts.headless(False) # 无头模式 chrome_opts.no_imgs(no_img) # 不加载图片 chrome_opts.set_argument("--disable-gpu") # 禁用GPU chrome_opts.set_argument('--ignore-certificate-errors') # 忽略证书错误 chrome_opts.set_argument('--start-maximized') # 最大化启动 chrome_opts.set_argument(f'--user-agent={random_ua(is_wap)}') # 设置 UA - chrome_opts.incognito(True) + # chrome_opts.incognito(True) chrome_opts.auto_port(True) # 设置代理 if AppCtx.g_app_config.chrome.proxy: