baidu-reporter/app/engines/crawl_engine.py
2025-03-28 23:19:42 +08:00

168 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import time
from loguru import logger
from sqlmodel import Session, select
from app.config.config import AppCtx
from app.models.domain import DomainModel
from app.models.report_urls import ReportUrlModel
from app.utils.dp import DPEngine
class CrawlEngine:
"""色站URL采集器,自动在百度翻页搜索,并保存搜索结果
对应原项目中的 getBaiDuIncludeUrls 方法
"""
def __init__(self):
self.ev = threading.Event()
self.status = 1
# 创建一个浏览器
self.dp_engine = DPEngine()
# 工作线程
self.worker_thread = None
def start(self):
"""启动采集器"""
self.worker_thread = threading.Thread(target=self.worker, name="crawl_engine", daemon=True)
self.worker_thread.start()
def stop(self):
"""停止采集器"""
self.ev.set()
self.status = 0
self.dp_engine.browser.quit()
def worker(self):
"""真正的工作函数"""
logger.info("crawl worker start!")
while self.status == 1:
# 检查数据库,从中获取需要爬取的域名
current_timestamp = int(time.time())
with Session(AppCtx.g_db_engine) as session:
stmt = select(DomainModel).where(
DomainModel.latest_crawl_time + DomainModel.crawl_interval <= current_timestamp
)
domains = session.exec(stmt).all()
for domain_model in domains:
# 采集
surl_set = self.crawl(domain_model.domain)
# 存储
if surl_set:
self.save_surl(session, domain_model, surl_set)
domain_model.latest_crawl_time = int(time.time())
session.add(domain_model)
session.commit()
self.ev.wait(60)
logger.info("crawl worker stop!")
def crawl(self, domain: str) -> set[str] | None:
"""爬取URL的函数"""
logger.debug(f"开始爬取:{domain}")
tab = self.dp_engine.browser.new_tab()
surl_set: set[str] = set()
try:
# 初始数据
end_time = int(time.time())
start_time = end_time - 3600 * 24 * 30 # 获取最近一个月的数据
# 依次每一页处理
max_page = 10 # 最大页码数量0表示不限制最大数量
current_page = 0 # 当前页码
# 先打开搜索页面
tab.get("https://www.baidu.com/")
tab.wait.eles_loaded("#kw")
tab.ele("#kw").input(f"site:{domain}\n", clear=True)
tab.wait.eles_loaded("#container")
tab.wait.eles_loaded("#timeRlt")
logger.debug("首页加载完成!")
# 设置搜索时间范围
tab.ele("#timeRlt").click(True)
tab.wait.eles_loaded("@class:time_pop_")
self.ev.wait(1)
# logger.debug("时间菜单!")
tab.ele("t:li@@text()= 一月内 ").click(True)
tab.wait.eles_loaded(["#container", ".content_none", "#content_left"], any_one=True)
while True:
# 增加页码
current_page += 1
logger.debug(f"爬取 {domain} 的第 {current_page} 页数据")
# 直接访问 URL 会触发验证码
# tab.get(
# f"https://www.baidu.com/s?wd=site%3A{domain}&gpc=stf%3D{start_time}%2C{end_time}%7Cstftype%3D1&pn={(current_page - 1) * 10}")
# tab.get(f"https://www.baidu.com/s?wd=site%3A{domain}&pn={(current_page - 1) * 10}")
# 终止条件
if current_page > max_page and max_page:
logger.debug("达到指定页码,退出")
break
# logger.debug(f"tab.html: {tab.html}")
self.ev.wait(0.3)
if "未找到相关结果" in tab.html:
logger.debug("未找到结果,退出")
break
# 获取数据
tab.wait.eles_loaded("@id=content_left")
results = tab.ele("@id=content_left").eles("@class:result")
# temp = [result.attr("mu") for result in results if result.attr("mu") is not None]
for result in results:
surl = result.attr("mu")
if not surl:
continue
logger.debug(f"找到 URL : {surl}")
surl_set.add(surl)
# 翻页的时候等一下,别太快了
self.ev.wait(0.3)
# 如果没有下一页了,这个地方会找不到元素,有 10 秒的 timeout
next_btn = tab.ele("t:a@@text():下一页")
if not next_btn:
logger.debug("没有下一页了")
break
next_btn.click(True)
return surl_set
except Exception as e:
logger.error(f"爬取{domain}发生错误:{e}")
import traceback
traceback.print_exc()
finally:
tab.close()
@staticmethod
def save_surl(session, domain_model, surl_set):
"""保存采集到的URL"""
for surl in surl_set:
# 先检查是否存在
stmt = select(ReportUrlModel).where(ReportUrlModel.surl == surl)
exist = session.exec(stmt).first()
# 不存在再插入
if not exist:
example = ReportUrlModel(
domain_id=domain_model.id,
domain=domain_model.domain,
surl=surl,
)
session.add(example)
session.commit()