502 lines
21 KiB
Python
502 lines
21 KiB
Python
import queue
|
||
import re
|
||
import threading
|
||
import time
|
||
|
||
import execjs
|
||
import requests
|
||
from DrissionPage.errors import ElementNotFoundError
|
||
from loguru import logger
|
||
from sqlmodel import Session, select, or_, and_
|
||
|
||
from app.config.config import AppCtx
|
||
from app.constants.domain import DomainStatus
|
||
from app.models.domain import DomainModel
|
||
from app.models.report_urls import ReportUrlModel
|
||
from app.utils.common import get_proxies
|
||
from app.utils.dp import DPEngine
|
||
from app.utils.ydm_verify import YdmVerify
|
||
|
||
|
||
class CrawlEngine:
|
||
"""色站URL采集器,自动在百度翻页搜索,并保存搜索结果
|
||
对应原项目中的 getBaiDuIncludeUrls 方法
|
||
"""
|
||
|
||
def __init__(self):
|
||
|
||
self.ev = threading.Event()
|
||
# 控制线程的运行状态,1-运行,0-停止
|
||
self.worker_status = 1
|
||
# 标记任务是否添加完成,是 CLI 模式正常结束的标记,1-任务添加完成,没有新任务了,0-还有任务待添加
|
||
self.finish_task = 0
|
||
|
||
# 线程池
|
||
self.pool: list[threading.Thread] = []
|
||
self.worker_count = 1
|
||
|
||
# 工作队列
|
||
self.target_queue = queue.Queue(1024)
|
||
|
||
# 创建一个浏览器
|
||
self.dp_engine = DPEngine()
|
||
|
||
def cli_start(self, target_domains: str, target_domain_filepath: str):
|
||
"""CLI 模式启动
|
||
target_domains: 英文逗号分割的字符串
|
||
target_domain_filepath: 存放目标域名的文件,每行一个
|
||
"""
|
||
# 把输入的域名先存入数据库
|
||
domains = self.add_domain(target_domains, target_domain_filepath)
|
||
|
||
# 启动线程池
|
||
for idx in range(self.worker_count):
|
||
# CLI 启动模式下,和web的启动模式逻辑不一样,这里要单独写一个cli worker
|
||
thread = threading.Thread(target=self.cli_worker, name=f"crawl_engine_{idx}", daemon=True)
|
||
self.pool.append(thread)
|
||
thread.start()
|
||
|
||
# 添加任务到队列中
|
||
for domain in domains:
|
||
logger.debug(f"开始添加 {domain} 到任务队列")
|
||
while True:
|
||
try:
|
||
self.target_queue.put_nowait(domain)
|
||
break
|
||
except queue.Full:
|
||
self.ev.wait(5)
|
||
continue
|
||
# 添加完成了,把标记为改为完成
|
||
self.finish_task = 1
|
||
|
||
def cli_wait(self):
|
||
[x.join() for x in self.pool]
|
||
|
||
def cli_worker(self):
|
||
"""CLI 模式下的 worker,只处理输入的域名,忽略数据库中的刷新条件"""
|
||
while True:
|
||
try:
|
||
# 当控制位被操作的时候,结束
|
||
if not self.worker_status:
|
||
logger.debug(f"{threading.current_thread().name}控制位退出")
|
||
break
|
||
|
||
# 当队列空了,并且任务已经全部推送完成的时候,标记为结束
|
||
if self.target_queue.empty() and self.finish_task:
|
||
logger.debug(f"{threading.current_thread().name}队列空了退出")
|
||
break
|
||
|
||
# 获取数据并开始采集
|
||
domain = self.target_queue.get_nowait()
|
||
surl = self.crawl(domain)
|
||
if not surl:
|
||
logger.debug(f"{threading.current_thread().name} 爬取 {domain} 无结果,开始处理下一个")
|
||
continue
|
||
|
||
# 存入数据库
|
||
with Session(AppCtx.g_db_engine) as session:
|
||
self.save_surl(session, domain, surl)
|
||
except queue.Empty:
|
||
# 队列空了,等1秒再取一次
|
||
self.ev.wait(1)
|
||
continue
|
||
except Exception as e:
|
||
logger.exception(f"{threading.current_thread().name} 执行错误:{e}")
|
||
continue
|
||
|
||
logger.info(f"{threading.current_thread().name} 退出")
|
||
|
||
def add_domain(self, input_domains: str, input_domain_filepath: str) -> list[str]:
|
||
"""把输入的域名存到库里"""
|
||
# 生成所有待采集的域名列表
|
||
domains = []
|
||
if input_domains:
|
||
domains.extend([d.strip() for d in input_domains.split(",") if d.strip()])
|
||
if input_domain_filepath:
|
||
with open(input_domain_filepath, "r") as fp:
|
||
for line in fp:
|
||
line = line.strip()
|
||
if line:
|
||
domains.append(line)
|
||
|
||
logger.debug(f"总共输入 {len(domains)} 个域名")
|
||
|
||
# 检查在数据库中是否有重复的
|
||
for domain in domains:
|
||
with Session(AppCtx.g_db_engine) as session:
|
||
stmt = select(DomainModel).where(DomainModel.domain == domain)
|
||
result = session.exec(stmt).first()
|
||
if not result:
|
||
example = DomainModel(
|
||
domain=domain, status=1, crawl_interval=60 * 7 * 24, latest_crawl_time=0,
|
||
)
|
||
session.add(example)
|
||
session.commit()
|
||
logger.info(f"添加域名 {domain} 到数据库")
|
||
|
||
return domains
|
||
|
||
def stop(self):
|
||
"""停止采集器,通用的"""
|
||
self.ev.set()
|
||
self.worker_status = 0
|
||
self.dp_engine.browser.quit()
|
||
|
||
def start(self):
|
||
"""启动采集器,以 web 方式启动的时候,走这边"""
|
||
for idx in range(self.worker_count):
|
||
thread = threading.Thread(target=self.worker, name=f"crawl_engine_{idx}", daemon=True)
|
||
self.pool.append(thread)
|
||
thread.start()
|
||
|
||
def worker(self):
|
||
"""真正的工作函数,后续以Web模式启动的时候,走这个"""
|
||
logger.info("crawl worker start!")
|
||
while self.worker_status:
|
||
# 检查数据库,从中获取需要爬取的域名
|
||
current_timestamp = int(time.time())
|
||
with Session(AppCtx.g_db_engine) as session:
|
||
|
||
stmt = select(DomainModel).where(
|
||
or_(
|
||
DomainModel.status == 2, # 条件1: status = 2
|
||
and_(
|
||
DomainModel.latest_crawl_time + DomainModel.crawl_interval * 60 <= current_timestamp, # 条件2
|
||
DomainModel.status == 1 # 条件2
|
||
)
|
||
)
|
||
)
|
||
domains = session.exec(stmt).all()
|
||
|
||
for domain_model in domains:
|
||
|
||
# 采集前修改状态
|
||
domain_model.status = DomainStatus.CRAWLING.value
|
||
session.add(domain_model)
|
||
session.commit()
|
||
|
||
# 采集
|
||
surl_set = self.crawl(domain_model.domain)
|
||
|
||
# 存储
|
||
if surl_set:
|
||
self.save_surl(session, domain_model.domain, surl_set)
|
||
|
||
domain_model.latest_crawl_time = int(time.time())
|
||
domain_model.status = DomainStatus.READY.value
|
||
session.add(domain_model)
|
||
session.commit()
|
||
|
||
self.ev.wait(10)
|
||
|
||
logger.info("crawl worker stop!")
|
||
|
||
def crawl(self, domain: str) -> set[str] | None:
|
||
"""爬取URL的函数"""
|
||
logger.debug(f"{threading.current_thread().name} 开始爬取:{domain}")
|
||
tab = self.dp_engine.browser.new_tab()
|
||
surl_set: set[str] = set()
|
||
|
||
try:
|
||
# 初始数据
|
||
# end_time = int(time.time())
|
||
# start_time = end_time - 3600 * 24 * 30 # 获取最近一个月的数据
|
||
|
||
# 依次每一页处理
|
||
max_page = 10 # 最大页码数量,0表示不限制最大数量
|
||
current_page = 0 # 当前页码
|
||
|
||
# 先打开搜索页面
|
||
tab.get("https://www.baidu.com/")
|
||
tab.wait.eles_loaded("#kw")
|
||
tab.ele("#kw").input(f"site:{domain}\n", clear=True)
|
||
tab.wait.eles_loaded("#container")
|
||
tab.wait.eles_loaded("#timeRlt")
|
||
logger.debug(f"{threading.current_thread().name} #timeRlt 加载完成!")
|
||
|
||
# 设置搜索时间范围
|
||
self.ev.wait(2)
|
||
tab.ele("#timeRlt").click(True)
|
||
tab.wait.eles_loaded("@class:time_pop_")
|
||
self.ev.wait(2)
|
||
# logger.debug("时间菜单!")
|
||
tab.ele("t:li@@text()= 一月内 ").click(True)
|
||
tab.wait.eles_loaded(["#container", ".content_none", "#content_left"], any_one=True)
|
||
|
||
while True:
|
||
try:
|
||
# 增加页码
|
||
current_page += 1
|
||
logger.debug(f"{threading.current_thread().name} 爬取 {domain} 的第 {current_page} 页数据")
|
||
# 直接访问 URL 会触发验证码
|
||
# tab.get(
|
||
# f"https://www.baidu.com/s?wd=site%3A{domain}&gpc=stf%3D{start_time}%2C{end_time}%7Cstftype%3D1&pn={(current_page - 1) * 10}")
|
||
# tab.get(f"https://www.baidu.com/s?wd=site%3A{domain}&pn={(current_page - 1) * 10}")
|
||
|
||
# 检查一下当前的URL是不是跳到验证码的页面
|
||
if "//wappass.baidu.com/static/captcha/tuxing_v2.html" in tab.url:
|
||
logger.warning("触发验证码了,尝试识别")
|
||
idx = 0
|
||
while idx < 3:
|
||
idx += 1
|
||
logger.debug(f"开始第{idx}次识别...")
|
||
captcha_result = self.verify_captcha(tab.url)
|
||
if not captcha_result:
|
||
tab.refresh()
|
||
continue
|
||
else:
|
||
tab.get(captcha_result)
|
||
break
|
||
else:
|
||
logger.error("验证码打码失败,放弃本次采集,等待3分钟后继续")
|
||
self.ev.wait(180)
|
||
break
|
||
|
||
|
||
# 终止条件
|
||
if current_page > max_page and max_page:
|
||
logger.debug(f"{threading.current_thread().name} 达到指定页码,退出")
|
||
break
|
||
|
||
# logger.debug(f"tab.html: {tab.html}")
|
||
self.ev.wait(0.3)
|
||
if "未找到相关结果" in tab.html:
|
||
logger.debug(f"{threading.current_thread().name} 未找到结果,退出")
|
||
break
|
||
|
||
# 获取数据
|
||
tab.wait.eles_loaded("@id=content_left")
|
||
results = tab.ele("@id=content_left").eles("@class:result")
|
||
# temp = [result.attr("mu") for result in results if result.attr("mu") is not None]
|
||
# logger.debug(f"{len(results)=}")
|
||
for result in results:
|
||
# logger.debug(f"{result=}")
|
||
surl = result.attr("mu")
|
||
if not surl:
|
||
continue
|
||
|
||
# 添加结果的时候,也检查一下抓到的 surl 是否和目标域名有关
|
||
if domain not in surl:
|
||
logger.debug(f"{threading.current_thread().name} URL {surl} 与目标域名 {domain} 无关,跳过")
|
||
else:
|
||
surl_set.add(surl)
|
||
logger.debug(f"{threading.current_thread().name} 找到 {surl}")
|
||
|
||
# 翻页的时候等一下,别太快了
|
||
self.ev.wait(0.3)
|
||
# 如果没有下一页了,这个地方会找不到元素,有 10 秒的 timeout
|
||
next_btn = tab.ele("t:a@@text():下一页")
|
||
if not next_btn:
|
||
logger.debug(f"{threading.current_thread().name} 没有下一页了")
|
||
break
|
||
next_btn.click(True)
|
||
except ElementNotFoundError as e:
|
||
logger.error(f"没有找到 HTML 元素,跳过,详细信息: {e}")
|
||
break
|
||
return surl_set
|
||
except Exception as e:
|
||
logger.error(f"{threading.current_thread().name} 爬取 {domain} 发生错误:{e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
finally:
|
||
tab.close()
|
||
|
||
@staticmethod
|
||
def save_surl(session: Session, domain: str, surl_set: set[str]):
|
||
"""保存采集到的URL"""
|
||
for surl in surl_set:
|
||
|
||
# 简单的判断一下 surl 中是否包含目标域名
|
||
if domain not in surl:
|
||
logger.debug(f"跳过保存 {surl} 因为与目标域名 {domain} 不符合")
|
||
continue
|
||
|
||
# 先检查是否存在
|
||
stmt = select(ReportUrlModel).where(ReportUrlModel.surl == surl)
|
||
exist = session.exec(stmt).first()
|
||
|
||
if exist:
|
||
continue
|
||
|
||
# 检查域名是否存在
|
||
domain_model = session.exec(
|
||
select(DomainModel).where(DomainModel.domain == domain)
|
||
).first()
|
||
if not domain_model:
|
||
logger.warning(f"域名 {domain} 不在数据库中")
|
||
continue
|
||
|
||
example = ReportUrlModel(
|
||
domain_id=domain_model.id,
|
||
domain=domain_model.domain,
|
||
surl=surl,
|
||
)
|
||
session.add(example)
|
||
session.commit()
|
||
|
||
# def captcha_listener(self):
|
||
# for pkg in self.tab.listen.steps():
|
||
# if "/cap/init" in pkg.url:
|
||
# self.captcha_data["init"] = pkg.response.body
|
||
# if "/cap/style" in pkg.url:
|
||
# self.captcha_data["style"] = pkg.response.body
|
||
# self.captcha_data["referer"] = pkg.request.headers.get("Referer")
|
||
# logger.debug(f"触发验证码的 referer: {self.captcha_data["referer"]}")
|
||
#
|
||
# self.captcha_data["cookie"] = pkg.request.headers.get("Cookie")
|
||
# logger.debug(f"触发验证码的 cookie: {self.captcha_data['cookie']}")
|
||
# if "/cap/log" in pkg.url:
|
||
# self.captcha_data["log"] = pkg.response.body
|
||
|
||
def verify_captcha(self, current_url: str):
|
||
"""尝试识别验证码,因为和 pc_reporter 的逻辑有点区别,所以单独写一遍"""
|
||
headers = {
|
||
'Accept': 'application/json, text/javascript, */*; q=0.01',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,zh-TW;q=0.6',
|
||
'Cache-Control': 'no-cache',
|
||
'Connection': 'keep-alive',
|
||
'Pragma': 'no-cache',
|
||
'Referer': current_url,
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36 Edg/135.0.0.0",
|
||
'X-Requested-With': 'XMLHttpRequest',
|
||
'sec-ch-ua_wap': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
|
||
'sec-ch-ua_wap-mobile': '?0',
|
||
'sec-ch-ua_wap-platform': '"Windows"',
|
||
# "Cookie": self.captcha_data["cookie"],
|
||
}
|
||
|
||
# 解出AS / TK
|
||
ts = time.time()
|
||
ts1 = int(ts)
|
||
ts2 = int(ts * 1000)
|
||
response = requests.post(
|
||
"https://passport.baidu.com/cap/init",
|
||
data={
|
||
"_": ts2,
|
||
"refer": re.sub(r'timestamp=\d+', f'timestamp={ts1}', current_url),
|
||
"ak": "c27bbc89afca0463650ac9bde68ebe06",
|
||
"ver": "2",
|
||
"scene": "",
|
||
"ds": "",
|
||
"tk": "",
|
||
"as": "",
|
||
"reinit": 0
|
||
},
|
||
headers=headers,
|
||
proxies=get_proxies()
|
||
).json()
|
||
as_value = response["data"]["as"]
|
||
tk_value = response["data"]["tk"]
|
||
|
||
# 解出 style
|
||
response = requests.post(
|
||
"https://passport.baidu.com/cap/style",
|
||
data={
|
||
"_": int(time.time() * 1000),
|
||
"refer": re.sub(r'timestamp=\d+', f'timestamp={ts1}', current_url),
|
||
"ak": "c27bbc89afca0463650ac9bde68ebe06",
|
||
"tk": tk_value,
|
||
"scene": "",
|
||
"isios": "0",
|
||
"type": "spin",
|
||
"ver": "2"
|
||
},
|
||
headers=headers,
|
||
proxies=get_proxies()
|
||
)
|
||
response = response.json()
|
||
backstr = response["data"]["backstr"]
|
||
captcha_link = response["data"]["captchalist"][0]["source"]["back"]["path"]
|
||
logger.debug(f"{backstr=}, {captcha_link=}")
|
||
|
||
# 下载验证码图片
|
||
image_response = requests.get(captcha_link, headers=headers, proxies=get_proxies())
|
||
with open("captcha.png", "wb") as f:
|
||
f.write(image_response.content)
|
||
logger.debug("download captcha.png")
|
||
|
||
# 识别验证码
|
||
ydm = YdmVerify()
|
||
with open("captcha.png", "rb") as fp:
|
||
picture = fp.read()
|
||
|
||
slide_distance = ydm.rotate(picture)
|
||
logger.debug(f"{slide_distance=}")
|
||
if not slide_distance:
|
||
logger.error("识别验证码失败")
|
||
return None
|
||
rotate_angle_rate = round(slide_distance / 360, 2)
|
||
logger.debug(f"{rotate_angle_rate=}")
|
||
|
||
if not rotate_angle_rate:
|
||
logger.debug("识别验证码失败")
|
||
return None
|
||
|
||
# 发送验证码请求
|
||
time_log = str(int(time.time() * 1000))
|
||
with open("./js/mkd_v2_link_submit.js", 'r', encoding='utf-8') as f:
|
||
ds_js = f.read()
|
||
fs = execjs.compile(ds_js).call('getFs2', backstr, rotate_angle_rate, as_value)
|
||
data = {
|
||
"_": time_log,
|
||
"refer": current_url,
|
||
"ak": "c27bbc89afca0463650ac9bde68ebe06",
|
||
"as": as_value,
|
||
"scene": "",
|
||
"tk": tk_value,
|
||
"ver": "2",
|
||
"cv": "submit",
|
||
"typeid": "spin-0",
|
||
"fuid": "FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGnfaJTn/Ne60I9LwR04t6XmGEimjy3MrXEpSuItnI4KD0FJKzTbw1AN69fBnzR2FuvMmmQZ+1zgJ72wdcVU+mcQxiE2ir0+TEYgjPJt1Qa3K1mLi+P4IWJeag2lvxB4yJ/GgLbz7OSojK1zRbqBESR5Pdk2R9IA3lxxOVzA+Iw1TWLSgWjlFVG9Xmh1+20oPSbrzvDjYtVPmZ+9/6evcXmhcO1Y58MgLozKnaQIaLfWRPAn9I0uOqAMff6fuUeWcH1mRYoTw2Nhr4J4agZi377iM/izL6cVCGRy2F8c0VpEvM5FjnYxYstXg/9EfB3EVmKAfzNRIeToJ5YV9twMcgdmlV1Uhhp5FAe6gNJIUptp7EMAaXYKm11G+JVPszQFdp9AJLcm4YSsYUXkaPI2Tl66J246cmjWQDTahAOINR5rXR5r/7VVI1RMZ8gb40q7az7vCK56XLooKT5a+rsFrf5Zu0yyCiiagElhrTEOtNdBJJq8eHwEHuFBni9ahSwpC7lbKkUwaKH69tf0DFV7hJROiLETSFloIVkHdy3+I2JUr1LsplAz0hMkWt/tE4tXVUV7QcTDTZWS/2mCoS/GV3N9awQ6iM6hs/BWjlgnEa1+5iP7WSc7RJ34FaE5PsyGXyoCWdXwNRGSZPSvVtB/Ea6w5FKazXcZ/j40FJv+iLGBn3nkkgHlne61I8I7KhtQgIkmBMJIjPMkS/L051MeqdGScsKYTJuSucgI5c3+79eVH+y2TvbOTuuHv1uGxwXFb2atIU1ZYPbmmXculmizKcKI/s44qf8uM8iBZLGkKeVyL74aPyLkg7Gk359g98BIGN/ZzJR/h+Y6AyFx+HlMoYJnS06dVmqFbvlCtSdGylKQ5f8eWtxPkJGqOFtWjIVteQYMsH/AaSJonqw+WLiZvGjYfm9p0alEyujapoTy77HzDcUoU1wUSXa5xS/Z6hXEr2OnLi0LdPVcGjz8lpLcdVeSfm9p0alEyujapoTy77HzDWf5PERRSTFqLd9BTUHLyY4Ji3EQLGQPaM1aeHxG1bJZH0s1Si/KwzTaTYzu6ziQiqwcr2kaYUiH+fMOxn69/BhNJVMhpQkhprc1KZuJRvXjppq0gKweencPxgS/jd0rjw==",
|
||
"fs": fs
|
||
}
|
||
response = requests.post(
|
||
"https://passport.baidu.com/cap/log",
|
||
headers=headers,
|
||
data=data,
|
||
proxies=get_proxies(),
|
||
).json()
|
||
try:
|
||
result = {
|
||
"ds": response["data"]["ds"],
|
||
"op": response["data"]["op"],
|
||
"tk": response["data"]["tk"]
|
||
}
|
||
except KeyError:
|
||
logger.error(f"验证码没转成功, response: {response=}")
|
||
time.sleep(1)
|
||
return None
|
||
logger.debug(f"{result=}")
|
||
|
||
# 检查验证码是否正确
|
||
if result["op"] != 1:
|
||
logger.error(f"op != 1, 重试")
|
||
return None
|
||
|
||
# 发送验证码请求 /cap/c 请求,获取待跳转的URL
|
||
response = requests.post(
|
||
"https://passport.baidu.com/cap/c?ak=c27bbc89afca0463650ac9bde68ebe06",
|
||
headers=headers,
|
||
json={
|
||
"tk": result["tk"],
|
||
"ds": result["ds"],
|
||
"qrsign": "",
|
||
"refer": current_url
|
||
},
|
||
proxies=get_proxies()
|
||
)
|
||
|
||
data = response.json()
|
||
if data["data"].get("f"):
|
||
logger.error(f"验证码失败: {data['data'].get('f')}")
|
||
return None
|
||
if data["data"].get("s"):
|
||
logger.debug("验证成功,URL:" + data["data"].get("s").get("url"))
|
||
url = data["data"].get("s").get("url")
|
||
url = url.encode("utf-8").decode("unicode-escape")
|
||
logger.success("解码后的URL:" + url)
|
||
return url
|