import queue import re import threading import time import execjs import requests from DrissionPage.errors import ElementNotFoundError from loguru import logger from sqlmodel import Session, select, or_, and_ from app.config.config import AppCtx from app.constants.domain import DomainStatus from app.models.domain import DomainModel from app.models.report_urls import ReportUrlModel from app.utils.common import get_proxies from app.utils.dp import DPEngine from app.utils.ydm_verify import YdmVerify class CrawlEngine: """色站URL采集器,自动在百度翻页搜索,并保存搜索结果 对应原项目中的 getBaiDuIncludeUrls 方法 """ def __init__(self): self.ev = threading.Event() # 控制线程的运行状态,1-运行,0-停止 self.worker_status = 1 # 标记任务是否添加完成,是 CLI 模式正常结束的标记,1-任务添加完成,没有新任务了,0-还有任务待添加 self.finish_task = 0 # 线程池 self.pool: list[threading.Thread] = [] self.worker_count = 1 # 工作队列 self.target_queue = queue.Queue(1024) # 创建一个浏览器 self.dp_engine = DPEngine() def cli_start(self, target_domains: str, target_domain_filepath: str): """CLI 模式启动 target_domains: 英文逗号分割的字符串 target_domain_filepath: 存放目标域名的文件,每行一个 """ # 把输入的域名先存入数据库 domains = self.add_domain(target_domains, target_domain_filepath) # 启动线程池 for idx in range(self.worker_count): # CLI 启动模式下,和web的启动模式逻辑不一样,这里要单独写一个cli worker thread = threading.Thread(target=self.cli_worker, name=f"crawl_engine_{idx}", daemon=True) self.pool.append(thread) thread.start() # 添加任务到队列中 for domain in domains: logger.debug(f"开始添加 {domain} 到任务队列") while True: try: self.target_queue.put_nowait(domain) break except queue.Full: self.ev.wait(5) continue # 添加完成了,把标记为改为完成 self.finish_task = 1 def cli_wait(self): [x.join() for x in self.pool] def cli_worker(self): """CLI 模式下的 worker,只处理输入的域名,忽略数据库中的刷新条件""" while True: try: # 当控制位被操作的时候,结束 if not self.worker_status: logger.debug(f"{threading.current_thread().name}控制位退出") break # 当队列空了,并且任务已经全部推送完成的时候,标记为结束 if self.target_queue.empty() and self.finish_task: logger.debug(f"{threading.current_thread().name}队列空了退出") break # 获取数据并开始采集 domain = self.target_queue.get_nowait() surl = self.crawl(domain) if not surl: logger.debug(f"{threading.current_thread().name} 爬取 {domain} 无结果,开始处理下一个") continue # 存入数据库 with Session(AppCtx.g_db_engine) as session: self.save_surl(session, domain, surl) except queue.Empty: # 队列空了,等1秒再取一次 self.ev.wait(1) continue except Exception as e: logger.exception(f"{threading.current_thread().name} 执行错误:{e}") continue logger.info(f"{threading.current_thread().name} 退出") def add_domain(self, input_domains: str, input_domain_filepath: str) -> list[str]: """把输入的域名存到库里""" # 生成所有待采集的域名列表 domains = [] if input_domains: domains.extend([d.strip() for d in input_domains.split(",") if d.strip()]) if input_domain_filepath: with open(input_domain_filepath, "r") as fp: for line in fp: line = line.strip() if line: domains.append(line) logger.debug(f"总共输入 {len(domains)} 个域名") # 检查在数据库中是否有重复的 for domain in domains: with Session(AppCtx.g_db_engine) as session: stmt = select(DomainModel).where(DomainModel.domain == domain) result = session.exec(stmt).first() if not result: example = DomainModel( domain=domain, status=1, crawl_interval=60 * 7 * 24, latest_crawl_time=0, ) session.add(example) session.commit() logger.info(f"添加域名 {domain} 到数据库") return domains def stop(self): """停止采集器,通用的""" self.ev.set() self.worker_status = 0 self.dp_engine.browser.quit() def start(self): """启动采集器,以 web 方式启动的时候,走这边""" for idx in range(self.worker_count): thread = threading.Thread(target=self.worker, name=f"crawl_engine_{idx}", daemon=True) self.pool.append(thread) thread.start() def worker(self): """真正的工作函数,后续以Web模式启动的时候,走这个""" logger.info("crawl worker start!") while self.worker_status: # 检查数据库,从中获取需要爬取的域名 current_timestamp = int(time.time()) with Session(AppCtx.g_db_engine) as session: stmt = select(DomainModel).where( or_( DomainModel.status == 2, # 条件1: status = 2 and_( DomainModel.latest_crawl_time + DomainModel.crawl_interval * 60 <= current_timestamp, # 条件2 DomainModel.status == 1 # 条件2 ) ) ) domains = session.exec(stmt).all() for domain_model in domains: # 采集前修改状态 domain_model.status = DomainStatus.CRAWLING.value session.add(domain_model) session.commit() # 采集 surl_set = self.crawl(domain_model.domain) # 存储 if surl_set: self.save_surl(session, domain_model.domain, surl_set) domain_model.latest_crawl_time = int(time.time()) domain_model.status = DomainStatus.READY.value session.add(domain_model) session.commit() self.ev.wait(10) logger.info("crawl worker stop!") def crawl(self, domain: str) -> set[str] | None: """爬取URL的函数""" logger.debug(f"{threading.current_thread().name} 开始爬取:{domain}") tab = self.dp_engine.browser.new_tab() surl_set: set[str] = set() try: # 初始数据 # end_time = int(time.time()) # start_time = end_time - 3600 * 24 * 30 # 获取最近一个月的数据 # 依次每一页处理 max_page = 10 # 最大页码数量,0表示不限制最大数量 current_page = 0 # 当前页码 # 先打开搜索页面 tab.get("https://www.baidu.com/") tab.wait.eles_loaded("#kw") tab.ele("#kw").input(f"site:{domain}\n", clear=True) tab.wait.eles_loaded("#container") tab.wait.eles_loaded("#timeRlt") logger.debug(f"{threading.current_thread().name} #timeRlt 加载完成!") # 设置搜索时间范围 self.ev.wait(2) tab.ele("#timeRlt").click(True) tab.wait.eles_loaded("@class:time_pop_") self.ev.wait(2) # logger.debug("时间菜单!") tab.ele("t:li@@text()= 一月内 ").click(True) tab.wait.eles_loaded(["#container", ".content_none", "#content_left"], any_one=True) while True: try: # 增加页码 current_page += 1 logger.debug(f"{threading.current_thread().name} 爬取 {domain} 的第 {current_page} 页数据") # 直接访问 URL 会触发验证码 # tab.get( # f"https://www.baidu.com/s?wd=site%3A{domain}&gpc=stf%3D{start_time}%2C{end_time}%7Cstftype%3D1&pn={(current_page - 1) * 10}") # tab.get(f"https://www.baidu.com/s?wd=site%3A{domain}&pn={(current_page - 1) * 10}") # 检查一下当前的URL是不是跳到验证码的页面 if "//wappass.baidu.com/static/captcha/tuxing_v2.html" in tab.url: logger.warning("触发验证码了,尝试识别") idx = 0 while idx < 3: idx += 1 logger.debug(f"开始第{idx}次识别...") captcha_result = self.verify_captcha(tab.url) if not captcha_result: tab.refresh() continue else: tab.get(captcha_result) break else: logger.error("验证码打码失败,放弃本次采集,等待3分钟后继续") self.ev.wait(180) break # 终止条件 if current_page > max_page and max_page: logger.debug(f"{threading.current_thread().name} 达到指定页码,退出") break # logger.debug(f"tab.html: {tab.html}") self.ev.wait(0.3) if "未找到相关结果" in tab.html: logger.debug(f"{threading.current_thread().name} 未找到结果,退出") break # 获取数据 tab.wait.eles_loaded("@id=content_left") results = tab.ele("@id=content_left").eles("@class:result") # temp = [result.attr("mu") for result in results if result.attr("mu") is not None] # logger.debug(f"{len(results)=}") for result in results: # logger.debug(f"{result=}") surl = result.attr("mu") if not surl: continue # 添加结果的时候,也检查一下抓到的 surl 是否和目标域名有关 if domain not in surl: logger.debug(f"{threading.current_thread().name} URL {surl} 与目标域名 {domain} 无关,跳过") else: surl_set.add(surl) logger.debug(f"{threading.current_thread().name} 找到 {surl}") # 翻页的时候等一下,别太快了 self.ev.wait(0.3) # 如果没有下一页了,这个地方会找不到元素,有 10 秒的 timeout next_btn = tab.ele("t:a@@text():下一页") if not next_btn: logger.debug(f"{threading.current_thread().name} 没有下一页了") break next_btn.click(True) except ElementNotFoundError as e: logger.error(f"没有找到 HTML 元素,跳过,详细信息: {e}") break return surl_set except Exception as e: logger.error(f"{threading.current_thread().name} 爬取 {domain} 发生错误:{e}") import traceback traceback.print_exc() finally: tab.close() @staticmethod def save_surl(session: Session, domain: str, surl_set: set[str]): """保存采集到的URL""" for surl in surl_set: # 简单的判断一下 surl 中是否包含目标域名 if domain not in surl: logger.debug(f"跳过保存 {surl} 因为与目标域名 {domain} 不符合") continue # 先检查是否存在 stmt = select(ReportUrlModel).where(ReportUrlModel.surl == surl) exist = session.exec(stmt).first() if exist: continue # 检查域名是否存在 domain_model = session.exec( select(DomainModel).where(DomainModel.domain == domain) ).first() if not domain_model: logger.warning(f"域名 {domain} 不在数据库中") continue example = ReportUrlModel( domain_id=domain_model.id, domain=domain_model.domain, surl=surl, ) session.add(example) session.commit() # def captcha_listener(self): # for pkg in self.tab.listen.steps(): # if "/cap/init" in pkg.url: # self.captcha_data["init"] = pkg.response.body # if "/cap/style" in pkg.url: # self.captcha_data["style"] = pkg.response.body # self.captcha_data["referer"] = pkg.request.headers.get("Referer") # logger.debug(f"触发验证码的 referer: {self.captcha_data["referer"]}") # # self.captcha_data["cookie"] = pkg.request.headers.get("Cookie") # logger.debug(f"触发验证码的 cookie: {self.captcha_data['cookie']}") # if "/cap/log" in pkg.url: # self.captcha_data["log"] = pkg.response.body def verify_captcha(self, current_url: str): """尝试识别验证码,因为和 pc_reporter 的逻辑有点区别,所以单独写一遍""" headers = { 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,zh-TW;q=0.6', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Pragma': 'no-cache', 'Referer': current_url, 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36 Edg/135.0.0.0", 'X-Requested-With': 'XMLHttpRequest', 'sec-ch-ua_wap': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"', 'sec-ch-ua_wap-mobile': '?0', 'sec-ch-ua_wap-platform': '"Windows"', # "Cookie": self.captcha_data["cookie"], } # 解出AS / TK ts = time.time() ts1 = int(ts) ts2 = int(ts * 1000) response = requests.post( "https://passport.baidu.com/cap/init", data={ "_": ts2, "refer": re.sub(r'timestamp=\d+', f'timestamp={ts1}', current_url), "ak": "c27bbc89afca0463650ac9bde68ebe06", "ver": "2", "scene": "", "ds": "", "tk": "", "as": "", "reinit": 0 }, headers=headers, proxies=get_proxies() ).json() as_value = response["data"]["as"] tk_value = response["data"]["tk"] # 解出 style response = requests.post( "https://passport.baidu.com/cap/style", data={ "_": int(time.time() * 1000), "refer": re.sub(r'timestamp=\d+', f'timestamp={ts1}', current_url), "ak": "c27bbc89afca0463650ac9bde68ebe06", "tk": tk_value, "scene": "", "isios": "0", "type": "spin", "ver": "2" }, headers=headers, proxies=get_proxies() ) logger.debug(f"{response.content=}") response = response.json() backstr = response["data"]["backstr"] captcha_link = response["data"]["captchalist"][0]["source"]["back"]["path"] # 下载验证码图片 image_response = requests.get(captcha_link, headers=headers, proxies=get_proxies()) with open("captcha.png", "wb") as f: f.write(image_response.content) logger.debug("download captcha.png") # 识别验证码 ydm = YdmVerify() with open("captcha.png", "rb") as fp: picture = fp.read() slide_distance = ydm.rotate(picture) logger.debug(f"{slide_distance=}") if not slide_distance: logger.error("识别验证码失败") return None rotate_angle_rate = round(slide_distance / 360, 2) logger.debug(f"{rotate_angle_rate=}") if not rotate_angle_rate: logger.debug("识别验证码失败") return None # 发送验证码请求 time_log = str(int(time.time() * 1000)) with open("./js/mkd_v2_link_submit.js", 'r', encoding='utf-8') as f: ds_js = f.read() fs = execjs.compile(ds_js).call('getFs2', backstr, rotate_angle_rate, as_value) data = { "_": time_log, "refer": current_url, "ak": "c27bbc89afca0463650ac9bde68ebe06", "as": as_value, "scene": "", "tk": tk_value, "ver": "2", "cv": "submit", "typeid": "spin-0", "fuid": "FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGnfaJTn/Ne60I9LwR04t6XmGEimjy3MrXEpSuItnI4KD0FJKzTbw1AN69fBnzR2FuvMmmQZ+1zgJ72wdcVU+mcQxiE2ir0+TEYgjPJt1Qa3K1mLi+P4IWJeag2lvxB4yJ/GgLbz7OSojK1zRbqBESR5Pdk2R9IA3lxxOVzA+Iw1TWLSgWjlFVG9Xmh1+20oPSbrzvDjYtVPmZ+9/6evcXmhcO1Y58MgLozKnaQIaLfWRPAn9I0uOqAMff6fuUeWcH1mRYoTw2Nhr4J4agZi377iM/izL6cVCGRy2F8c0VpEvM5FjnYxYstXg/9EfB3EVmKAfzNRIeToJ5YV9twMcgdmlV1Uhhp5FAe6gNJIUptp7EMAaXYKm11G+JVPszQFdp9AJLcm4YSsYUXkaPI2Tl66J246cmjWQDTahAOINR5rXR5r/7VVI1RMZ8gb40q7az7vCK56XLooKT5a+rsFrf5Zu0yyCiiagElhrTEOtNdBJJq8eHwEHuFBni9ahSwpC7lbKkUwaKH69tf0DFV7hJROiLETSFloIVkHdy3+I2JUr1LsplAz0hMkWt/tE4tXVUV7QcTDTZWS/2mCoS/GV3N9awQ6iM6hs/BWjlgnEa1+5iP7WSc7RJ34FaE5PsyGXyoCWdXwNRGSZPSvVtB/Ea6w5FKazXcZ/j40FJv+iLGBn3nkkgHlne61I8I7KhtQgIkmBMJIjPMkS/L051MeqdGScsKYTJuSucgI5c3+79eVH+y2TvbOTuuHv1uGxwXFb2atIU1ZYPbmmXculmizKcKI/s44qf8uM8iBZLGkKeVyL74aPyLkg7Gk359g98BIGN/ZzJR/h+Y6AyFx+HlMoYJnS06dVmqFbvlCtSdGylKQ5f8eWtxPkJGqOFtWjIVteQYMsH/AaSJonqw+WLiZvGjYfm9p0alEyujapoTy77HzDcUoU1wUSXa5xS/Z6hXEr2OnLi0LdPVcGjz8lpLcdVeSfm9p0alEyujapoTy77HzDWf5PERRSTFqLd9BTUHLyY4Ji3EQLGQPaM1aeHxG1bJZH0s1Si/KwzTaTYzu6ziQiqwcr2kaYUiH+fMOxn69/BhNJVMhpQkhprc1KZuJRvXjppq0gKweencPxgS/jd0rjw==", "fs": fs } response = requests.post( "https://passport.baidu.com/cap/log", headers=headers, data=data, proxies=get_proxies(), ).json() try: result = { "ds": response["data"]["ds"], "op": response["data"]["op"], "tk": response["data"]["tk"] } except KeyError: logger.error(f"验证码没转成功, response: {response=}") time.sleep(1) return None logger.debug(f"{result=}") # 检查验证码是否正确 if result["op"] != 1: logger.error(f"op != 1, 重试") return None # 发送验证码请求 /cap/c 请求,获取待跳转的URL response = requests.post( "https://passport.baidu.com/cap/c?ak=c27bbc89afca0463650ac9bde68ebe06", headers=headers, json={ "tk": result["tk"], "ds": result["ds"], "qrsign": "", "refer": current_url }, proxies=get_proxies() ) data = response.json() if data["data"].get("f"): logger.error(f"验证码失败: {data['data'].get('f')}") return None if data["data"].get("s"): logger.debug("验证成功,URL:" + data["data"].get("s").get("url")) url = data["data"].get("s").get("url") url = url.encode("utf-8").decode("unicode-escape") logger.success("解码后的URL:" + url) return url