Compare commits

..

8 Commits

13 changed files with 519 additions and 29 deletions

View File

@ -112,6 +112,10 @@ class MainApp:
def start_cli(self): def start_cli(self):
"""开启 CLI 模式""" """开启 CLI 模式"""
# 注册 ctrl+c 处理程序,正常结束所有的 engine
signal.signal(signal.SIGINT, self.exit_handler)
if self.args.crawl or self.args.crawl_file: if self.args.crawl or self.args.crawl_file:
crawl = CrawlEngine() crawl = CrawlEngine()
crawl.cli_start(self.args.crawl, self.args.crawl_file) crawl.cli_start(self.args.crawl, self.args.crawl_file)

View File

@ -1,7 +1,10 @@
import queue import queue
import re
import threading import threading
import time import time
import execjs
import requests
from DrissionPage.errors import ElementNotFoundError from DrissionPage.errors import ElementNotFoundError
from loguru import logger from loguru import logger
from sqlmodel import Session, select, or_, and_ from sqlmodel import Session, select, or_, and_
@ -10,7 +13,9 @@ from app.config.config import AppCtx
from app.constants.domain import DomainStatus from app.constants.domain import DomainStatus
from app.models.domain import DomainModel from app.models.domain import DomainModel
from app.models.report_urls import ReportUrlModel from app.models.report_urls import ReportUrlModel
from app.utils.common import get_proxies
from app.utils.dp import DPEngine from app.utils.dp import DPEngine
from app.utils.ydm_verify import YdmVerify
class CrawlEngine: class CrawlEngine:
@ -36,8 +41,6 @@ class CrawlEngine:
# 创建一个浏览器 # 创建一个浏览器
self.dp_engine = DPEngine() self.dp_engine = DPEngine()
self.database = AppCtx.g_db_engine
def cli_start(self, target_domains: str, target_domain_filepath: str): def cli_start(self, target_domains: str, target_domain_filepath: str):
"""CLI 模式启动 """CLI 模式启动
target_domains: 英文逗号分割的字符串 target_domains: 英文逗号分割的字符串
@ -91,7 +94,7 @@ class CrawlEngine:
continue continue
# 存入数据库 # 存入数据库
with Session(self.database) as session: with Session(AppCtx.g_db_engine) as session:
self.save_surl(session, domain, surl) self.save_surl(session, domain, surl)
except queue.Empty: except queue.Empty:
# 队列空了等1秒再取一次 # 队列空了等1秒再取一次
@ -120,7 +123,7 @@ class CrawlEngine:
# 检查在数据库中是否有重复的 # 检查在数据库中是否有重复的
for domain in domains: for domain in domains:
with Session(self.database) as session: with Session(AppCtx.g_db_engine) as session:
stmt = select(DomainModel).where(DomainModel.domain == domain) stmt = select(DomainModel).where(DomainModel.domain == domain)
result = session.exec(stmt).first() result = session.exec(stmt).first()
if not result: if not result:
@ -163,10 +166,6 @@ class CrawlEngine:
) )
) )
) )
# stmt = select(DomainModel).where(
# DomainModel.latest_crawl_time + DomainModel.crawl_interval * 60 <= current_timestamp
# )
domains = session.exec(stmt).all() domains = session.exec(stmt).all()
for domain_model in domains: for domain_model in domains:
@ -234,6 +233,26 @@ class CrawlEngine:
# f"https://www.baidu.com/s?wd=site%3A{domain}&gpc=stf%3D{start_time}%2C{end_time}%7Cstftype%3D1&pn={(current_page - 1) * 10}") # f"https://www.baidu.com/s?wd=site%3A{domain}&gpc=stf%3D{start_time}%2C{end_time}%7Cstftype%3D1&pn={(current_page - 1) * 10}")
# tab.get(f"https://www.baidu.com/s?wd=site%3A{domain}&pn={(current_page - 1) * 10}") # tab.get(f"https://www.baidu.com/s?wd=site%3A{domain}&pn={(current_page - 1) * 10}")
# 检查一下当前的URL是不是跳到验证码的页面
if "//wappass.baidu.com/static/captcha/tuxing_v2.html" in tab.url:
logger.warning("触发验证码了,尝试识别")
idx = 0
while idx < 3:
idx += 1
logger.debug(f"开始第{idx}次识别...")
captcha_result = self.verify_captcha(tab.url)
if not captcha_result:
tab.refresh()
continue
else:
tab.get(captcha_result)
break
else:
logger.error("验证码打码失败放弃本次采集等待3分钟后继续")
self.ev.wait(180)
break
# 终止条件 # 终止条件
if current_page > max_page and max_page: if current_page > max_page and max_page:
logger.debug(f"{threading.current_thread().name} 达到指定页码,退出") logger.debug(f"{threading.current_thread().name} 达到指定页码,退出")
@ -314,3 +333,170 @@ class CrawlEngine:
) )
session.add(example) session.add(example)
session.commit() session.commit()
# def captcha_listener(self):
# for pkg in self.tab.listen.steps():
# if "/cap/init" in pkg.url:
# self.captcha_data["init"] = pkg.response.body
# if "/cap/style" in pkg.url:
# self.captcha_data["style"] = pkg.response.body
# self.captcha_data["referer"] = pkg.request.headers.get("Referer")
# logger.debug(f"触发验证码的 referer: {self.captcha_data["referer"]}")
#
# self.captcha_data["cookie"] = pkg.request.headers.get("Cookie")
# logger.debug(f"触发验证码的 cookie: {self.captcha_data['cookie']}")
# if "/cap/log" in pkg.url:
# self.captcha_data["log"] = pkg.response.body
def verify_captcha(self, current_url: str):
"""尝试识别验证码,因为和 pc_reporter 的逻辑有点区别,所以单独写一遍"""
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,zh-TW;q=0.6',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Referer': current_url,
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36 Edg/135.0.0.0",
'X-Requested-With': 'XMLHttpRequest',
'sec-ch-ua_wap': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'sec-ch-ua_wap-mobile': '?0',
'sec-ch-ua_wap-platform': '"Windows"',
# "Cookie": self.captcha_data["cookie"],
}
# 解出AS / TK
ts = time.time()
ts1 = int(ts)
ts2 = int(ts * 1000)
response = requests.post(
"https://passport.baidu.com/cap/init",
data={
"_": ts2,
"refer": re.sub(r'timestamp=\d+', f'timestamp={ts1}', current_url),
"ak": "c27bbc89afca0463650ac9bde68ebe06",
"ver": "2",
"scene": "",
"ds": "",
"tk": "",
"as": "",
"reinit": 0
},
headers=headers,
proxies=get_proxies()
).json()
as_value = response["data"]["as"]
tk_value = response["data"]["tk"]
# 解出 style
response = requests.post(
"https://passport.baidu.com/cap/style",
data={
"_": int(time.time() * 1000),
"refer": re.sub(r'timestamp=\d+', f'timestamp={ts1}', current_url),
"ak": "c27bbc89afca0463650ac9bde68ebe06",
"tk": tk_value,
"scene": "",
"isios": "0",
"type": "spin",
"ver": "2"
},
headers=headers,
proxies=get_proxies()
)
response = response.json()
backstr = response["data"]["backstr"]
captcha_link = response["data"]["captchalist"][0]["source"]["back"]["path"]
logger.debug(f"{backstr=}, {captcha_link=}")
# 下载验证码图片
image_response = requests.get(captcha_link, headers=headers, proxies=get_proxies())
with open("captcha.png", "wb") as f:
f.write(image_response.content)
logger.debug("download captcha.png")
# 识别验证码
ydm = YdmVerify()
with open("captcha.png", "rb") as fp:
picture = fp.read()
slide_distance = ydm.rotate(picture)
logger.debug(f"{slide_distance=}")
if not slide_distance:
logger.error("识别验证码失败")
return None
rotate_angle_rate = round(slide_distance / 360, 2)
logger.debug(f"{rotate_angle_rate=}")
if not rotate_angle_rate:
logger.debug("识别验证码失败")
return None
# 发送验证码请求
time_log = str(int(time.time() * 1000))
with open("./js/mkd_v2_link_submit.js", 'r', encoding='utf-8') as f:
ds_js = f.read()
fs = execjs.compile(ds_js).call('getFs2', backstr, rotate_angle_rate, as_value)
data = {
"_": time_log,
"refer": current_url,
"ak": "c27bbc89afca0463650ac9bde68ebe06",
"as": as_value,
"scene": "",
"tk": tk_value,
"ver": "2",
"cv": "submit",
"typeid": "spin-0",
"fuid": "FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGnfaJTn/Ne60I9LwR04t6XmGEimjy3MrXEpSuItnI4KD0FJKzTbw1AN69fBnzR2FuvMmmQZ+1zgJ72wdcVU+mcQxiE2ir0+TEYgjPJt1Qa3K1mLi+P4IWJeag2lvxB4yJ/GgLbz7OSojK1zRbqBESR5Pdk2R9IA3lxxOVzA+Iw1TWLSgWjlFVG9Xmh1+20oPSbrzvDjYtVPmZ+9/6evcXmhcO1Y58MgLozKnaQIaLfWRPAn9I0uOqAMff6fuUeWcH1mRYoTw2Nhr4J4agZi377iM/izL6cVCGRy2F8c0VpEvM5FjnYxYstXg/9EfB3EVmKAfzNRIeToJ5YV9twMcgdmlV1Uhhp5FAe6gNJIUptp7EMAaXYKm11G+JVPszQFdp9AJLcm4YSsYUXkaPI2Tl66J246cmjWQDTahAOINR5rXR5r/7VVI1RMZ8gb40q7az7vCK56XLooKT5a+rsFrf5Zu0yyCiiagElhrTEOtNdBJJq8eHwEHuFBni9ahSwpC7lbKkUwaKH69tf0DFV7hJROiLETSFloIVkHdy3+I2JUr1LsplAz0hMkWt/tE4tXVUV7QcTDTZWS/2mCoS/GV3N9awQ6iM6hs/BWjlgnEa1+5iP7WSc7RJ34FaE5PsyGXyoCWdXwNRGSZPSvVtB/Ea6w5FKazXcZ/j40FJv+iLGBn3nkkgHlne61I8I7KhtQgIkmBMJIjPMkS/L051MeqdGScsKYTJuSucgI5c3+79eVH+y2TvbOTuuHv1uGxwXFb2atIU1ZYPbmmXculmizKcKI/s44qf8uM8iBZLGkKeVyL74aPyLkg7Gk359g98BIGN/ZzJR/h+Y6AyFx+HlMoYJnS06dVmqFbvlCtSdGylKQ5f8eWtxPkJGqOFtWjIVteQYMsH/AaSJonqw+WLiZvGjYfm9p0alEyujapoTy77HzDcUoU1wUSXa5xS/Z6hXEr2OnLi0LdPVcGjz8lpLcdVeSfm9p0alEyujapoTy77HzDWf5PERRSTFqLd9BTUHLyY4Ji3EQLGQPaM1aeHxG1bJZH0s1Si/KwzTaTYzu6ziQiqwcr2kaYUiH+fMOxn69/BhNJVMhpQkhprc1KZuJRvXjppq0gKweencPxgS/jd0rjw==",
# "fuid": "FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGm51EODDlnqgz44AdUN5VVLGEimjy3MrXEpSuItnI4KD4X6JLdk9kt5JRR+RlJ66q1+4kQEivhwAoCrm3oUNdYdi+yNJadLMQy5pqjmiW757BJsVwXkGdF24AsEQ3K5XBbh9EHAWDOg2T1ejpq0s2eFy9ar/j566XqWDobGoNNfmfpaEhZpob9le2b5QIEdiQcF+6iOKqU/r67N8lf+wxW6FCMUN0p4SXVVUMsKNJv2TwEq3+MvKTlPBjfdM81CMPq4LkPV+7TROLMG0V6r0A++zkWOdjFiy1eD/0R8HcRWYsUPXjDqADgs+Xs31pnSHeup+HBavJhpxl858h16cMtKQmxzisHOxsE/KMoDNYYE7ucLE22Bi0Ojbor7y6SXfVj7+B4iuZO+f7FUDWABtt/WWQqHKVfXMaw5WUmKnfSR5wwQa+N01amx6X+p+x97kkGmoNOSwxWgGvuezNFuiJQdt51yrWaL9Re9fZveXFsIu/gzGjL50VLcWv2NICayyI8BE9m62pdBPySuv4pVqQ9Sl1uTC//wIcO7QL9nm+0N6JgtCkSAWOZCh7Lr0XP6QztjlyD3bkwYJ4FTiNanaDaDJMNOONUIptCYaHTS+UC6IlHE1MUFHThGQXNkGIX8AdBh0GvEV9dnyTGKy8XFjCQiSGk66HDxtjKMU4HPNa0dtuC6f3Qc1BA80dVENIrm5fvupUvtUx+t4D1r3M6jRrNCFDmi5MpkOxe5k51gshb/lV68JOKfsQeXT2p7EM9kdbZAphQDW3ajjXdDRh/L4vMDrWe1PKtUcuW/fWn+hZVZzw+X5dQWsFNhWzqaDLLTRZQpSBdWrMIHd5mkoSCb/UJmNfWI9UswFst29h1Heb04lgaYXvleBbteLbUi5NoCAChP5oZfoCeoKKuvUEAPXXTPVjO0TTi0sVqFSdG+GFyi03wlrm3wCRN8QsWhT10pXJL0RhcLTagDnxauF9flnVwiWaq+daLSn0MEazavBACRErAMWXEI9EFQPGJKv0Ijpq+0VDw8xeJloxMf4I+yn8oxuqFuBSz8I0Kfe0QZwk5OQW6lRvv5iBU4fcPzWWTZ9FnzQ2GA5eh8aiV0nDOGmtfhiYNjbs2NxP0acAgApNd0ew==",
"fs": fs
}
response = requests.post(
"https://passport.baidu.com/cap/log",
headers=headers,
data=data,
proxies=get_proxies(),
).json()
try:
result = {
"ds": response["data"]["ds"],
"op": response["data"]["op"],
"tk": response["data"]["tk"]
}
except KeyError:
logger.error(f"验证码没转成功, response: {response=}")
time.sleep(1)
return None
logger.debug(f"{result=}")
# 检查验证码是否正确
if result["op"] != 1:
logger.error(f"op != 1, 重试")
return None
# 发送验证码请求 /cap/c 请求获取待跳转的URL
response = requests.post(
"https://passport.baidu.com/cap/c?ak=c27bbc89afca0463650ac9bde68ebe06",
headers=headers,
json={
"tk": result["tk"],
"ds": result["ds"],
"qrsign": "",
"refer": current_url
},
proxies=get_proxies()
)
data = response.json()
if data["data"].get("f"):
logger.error(f"验证码失败: {data['data'].get('f')}")
return None
if data["data"].get("s"):
logger.debug("验证成功URL" + data["data"].get("s").get("url"))
url = data["data"].get("s").get("url")
url = url.encode("utf-8").decode("unicode-escape")
logger.success("解码后的URL" + url)
return url

View File

@ -66,11 +66,11 @@ class Reporter:
def worker(self): def worker(self):
while self.status: while self.status:
for mode in self.mode: for mode in self.mode:
if mode == "pc": if mode == "pc" and self.status:
self.reporters["pc"].run() self.reporters["pc"].run()
elif mode == "wap": elif mode == "wap" and self.status:
self.reporters["wap"].run() self.reporters["wap"].run()
elif mode == "site": elif mode == "site" and self.status:
self.reporters["site"].run() self.reporters["site"].run()
else: else:
logger.error(f"参数错误: {mode}") logger.error(f"参数错误: {mode}")

View File

@ -55,7 +55,7 @@ class PcReporter(BaseReporter):
def run(self): def run(self):
with Session(self.database) as session: with Session(self.database) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.is_report_by_one == False) stmt = select(ReportUrlModel).where(ReportUrlModel.is_report_by_one == False).where(ReportUrlModel.has_evidence == True)
rows: list[ReportUrlModel] = session.exec(stmt).all() rows: list[ReportUrlModel] = session.exec(stmt).all()
logger.info(f"[{self.engine_name}] 共计 {len(rows)} 条记录需要举报") logger.info(f"[{self.engine_name}] 共计 {len(rows)} 条记录需要举报")

View File

@ -54,7 +54,7 @@ class SiteReporter(BaseReporter):
def run(self): def run(self):
"""实现 PC 端的举报逻辑""" """实现 PC 端的举报逻辑"""
with Session(self.database) as session: with Session(self.database) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.is_report_by_site == False) stmt = select(ReportUrlModel).where(ReportUrlModel.is_report_by_site == False).where(ReportUrlModel.has_evidence == True)
rows: list[ReportUrlModel] = session.exec(stmt).all() rows: list[ReportUrlModel] = session.exec(stmt).all()
logger.info(f"[{self.engine_name}] 共计 {len(rows)} 条需要举报") logger.info(f"[{self.engine_name}] 共计 {len(rows)} 条需要举报")

View File

@ -51,7 +51,8 @@ class WapReporter(BaseReporter):
def run(self): def run(self):
"""实现 WAP 端的举报逻辑""" """实现 WAP 端的举报逻辑"""
with Session(self.database) as session: with Session(self.database) as session:
stmt = select(ReportUrlModel).where(ReportUrlModel.is_report_by_wap == False) stmt = select(ReportUrlModel).where(ReportUrlModel.is_report_by_wap == False).where(
ReportUrlModel.has_evidence == True)
rows: list[ReportUrlModel] = session.exec(stmt).all() rows: list[ReportUrlModel] = session.exec(stmt).all()
logger.debug(f"[{self.engine_name}] 共找到 {len(rows)} 条待举报记录") logger.debug(f"[{self.engine_name}] 共找到 {len(rows)} 条待举报记录")
@ -61,6 +62,8 @@ class WapReporter(BaseReporter):
if not self.status: if not self.status:
break break
self.ev.wait(1)
# 选个 cookie # 选个 cookie
report_cookie = random.choice(get_all_cookies()) report_cookie = random.choice(get_all_cookies())
report_site_cookie = GenCookie.run(report_cookie) report_site_cookie = GenCookie.run(report_cookie)
@ -93,9 +96,10 @@ class WapReporter(BaseReporter):
# wapUserAgent = random.choice(self.wapUserAgent) # wapUserAgent = random.choice(self.wapUserAgent)
response = self.request.get( response = self.request.get(
"https://ufosdk.baidu.com/api?m=Web&a=getUserInfo&appid=293852", "https://ufosdk.baidu.com/api?m=Web&a=getUserInfo&appid=293852",
headers=self.headers, proxies=self.proxies, allow_redirects=False, timeout=10, verify=False headers=self.headers, proxies=self.proxies, allow_redirects=False, timeout=10
) )
json_data = response.json() json_data = response.json()
logger.debug(f"{self.engine_name} get_user_info response: {json_data}")
uid = json_data['result']['uid'] uid = json_data['result']['uid']
un = json_data['result']['un'] un = json_data['result']['un']
userinfo["uid"] = uid userinfo["uid"] = uid
@ -151,11 +155,14 @@ class WapReporter(BaseReporter):
proxies=self.proxies, proxies=self.proxies,
allow_redirects=False, allow_redirects=False,
timeout=10, timeout=10,
verify=False
) )
# logger.debug(req.json()) # logger.debug(req.json())
logger.debug(response.json()) data = response.json()
if response.json()['errno'] == 0: logger.debug(data)
if data['errno'] == 0:
logger.success(f"[{self.engine_name}] {fb_url} 举报成功") logger.success(f"[{self.engine_name}] {fb_url} 举报成功")
return True return True
if "请勿重复提交" in data["errmsg"]:
logger.success(f"[{self.engine_name}] {fb_url} 重复提交,标记为成功")
return True
return False return False

View File

@ -35,7 +35,7 @@ def connect_db(config: AppConfig):
from .report_urls import ReportUrlModel from .report_urls import ReportUrlModel
dsn = f"mysql+pymysql://{config.database.user}:{config.database.password}@{config.database.host}:{config.database.port}/{config.database.database}" dsn = f"mysql+pymysql://{config.database.user}:{config.database.password}@{config.database.host}:{config.database.port}/{config.database.database}"
engine = create_engine(dsn, echo=False) engine = create_engine(dsn, echo=False, pool_size=4, max_overflow=10, pool_recycle=60, pool_pre_ping=True)
SQLModel.metadata.create_all(engine) SQLModel.metadata.create_all(engine)
AppCtx.g_db_engine = engine AppCtx.g_db_engine = engine

View File

@ -1,6 +1,8 @@
import hashlib import hashlib
import random import random
from app.config.config import AppCtx
def md5(s: str) -> str: def md5(s: str) -> str:
m = hashlib.md5() m = hashlib.md5()
@ -9,17 +11,24 @@ def md5(s: str) -> str:
def get_proxies(): def get_proxies():
username = "t14131310374591" # username = "t14131310374591"
password = "qg6xwmrq" # password = "qg6xwmrq"
tunnel = "d432.kdltps.com:15818" # tunnel = "d432.kdltps.com:15818"
proxies = { # proxies = {
"http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, # "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
"https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel} # "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
} # }
# proxies = { # proxies = {
# "http": "http://127.0.0.1:8080", # "http": "http://127.0.0.1:8080",
# "https": "http://127.0.0.1:8080" # "https": "http://127.0.0.1:8080"
# } # }
proxy = AppCtx.g_app_config.chrome.proxy
proxies = {
"http": proxy,
"https": proxy
}
return proxies return proxies

View File

@ -7,7 +7,7 @@ from loguru import logger
class YdmVerify(object): class YdmVerify(object):
_custom_url = "https://www.jfbym.com/api/YmServer/customApi" _custom_url = "https://www.jfbym.com/api/YmServer/customApi"
_token = "HhUGwpI6AtQGoux36i1ZpsDv7hwGSbr1hQ0RX-HXSZE" _token = "2HNCDBee_JFmXAZZanQm9I7x1sqQln9BggF1xaGtMX0"
_headers = { _headers = {
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }

View File

@ -78,7 +78,7 @@ class DomainService:
domain=x, domain=x,
status=DomainStatus.READY.value, status=DomainStatus.READY.value,
crawl_interval=interval, crawl_interval=interval,
latest_crawl_time=0 if not crawl_now else int(time.time()) latest_crawl_time=0 if crawl_now else int(time.time())
) for x in domains ) for x in domains
] ]

View File

@ -34,13 +34,17 @@ tab = browser.new_tab(f"https://www.baidu.com/s?wd={keyword}")
# week_btn_el = tab.ele('t:li@@text()= 一月内 ') # week_btn_el = tab.ele('t:li@@text()= 一月内 ')
# week_btn_el.click(by_js=True) # week_btn_el.click(by_js=True)
# tab.wait(2) # tab.wait(2)
print(f"{tab.url=}")
print("2222") print("2222")
tab.get("https://www.163.com/")
print(f"{tab.url=}")
# tab.ele(".content_none") # tab.ele(".content_none")
# tab.wait.eles_loaded(["#container", ".content_none", "#content_left"], any_one=True) # tab.wait.eles_loaded(["#container", ".content_none", "#content_left"], any_one=True)
print("未找到相关结果" in tab.html) print("未找到相关结果" in tab.html)
print("1111") print("1111")
# if "未找到相关结果" in tab.html: # if "未找到相关结果" in tab.html:
# print("未找到相关结果") # print("未找到相关结果")
# else: # else:

272
tests/test_dp3.py Normal file
View File

@ -0,0 +1,272 @@
import random
import re
import sys
import threading
import time
from enum import verify
from pathlib import Path
import execjs
import requests
from DrissionPage import Chromium, ChromiumOptions
from loguru import logger
from app.utils.common import get_all_cookies
from app.utils.ydm_verify import YdmVerify
chrome_opts = ChromiumOptions()
chrome_opts.mute(True) # 静音
chrome_opts.no_imgs(False)
chrome_opts.set_argument("--disable-gpu")
chrome_opts.set_argument('--ignore-certificate-errors')
chrome_opts.set_argument("--proxy-server=http://127.0.0.1:7890")
# chrome_opts.incognito(True)
chrome_opts.set_browser_path(r"C:\Program Files\Google\Chrome\Application\chrome.exe")
chrome_opts.auto_port()
browser = Chromium(addr_or_opts=chrome_opts)
# tab = browser.new_tab()
# tab.listen.start([
# "passport.baidu.com/cap/init",
# "passport.baidu.com/cap/style",
# ])
# tab.get("https://wappass.baidu.com/static/captcha/tuxing_v2.html?&logid=10332554090053311096&ak=c27bbc89afca0463650ac9bde68ebe06&backurl=https%3A%2F%2Fwww.baidu.com%2Fs%3Fwd%3Dsite%253Altxbbs.com%26pn%3D50%26oq%3Dsite%253Altxbbs.com%26ct%3D2097152%26ie%3Dutf-8%26si%3Dltxbbs.com%26fenlei%3D256%26rsv_idx%3D1%26rsv_pq%3D99cae74f0003cd72%26rsv_t%3Dab2dk%252Fq4PohUCmoLbyMlEMrGJszk983ojkNLk%252FUiZGJ4ZLpwvZ46PtQUufk%26gpc%3Dstf%253D1741437499%252C1744115898%257Cstftype%253D1%26tfflag%3D1%26topic_pn%3D%26rsv_page%3D1&ext=x9G9QDmMXq%2FNo87gjGO0P1dyBXu4PagAZrreQL6%2Bticsr0rrDszYO2sAbAnT1vLIUgqUK9LXd1cIlztrhMwiv3XfcB99Y5gyF0c0ETsDFDls5CsGNJQRLPawcntn2ndVLHHLl46IaoOp8l%2FC1xtOHwMQi85PCzAojcSf2wQ76KRxVau99LtSYCIfwtv7By0w&signature=f2fbb1b81926e247835f69195661a06b&timestamp=1744115910")
# for pkg in tab.listen.steps():
# print(f"{pkg.url=}")
# print(f"{pkg.response.raw_body=}")
# current_path = Path(__file__).resolve()
# print(current_path)
# current_dir = current_path.parent.parent
# print(current_dir)
# js_path = current_dir.joinpath("./js/mkd_v2_link_submit.js")
# print(js_path.exists())
# with open("./js/mkd_v2_link_submit.js", "r", encoding="utf-8") as f:
# ds_js = f.read()
#
proxy_str = "http://127.0.0.1:7890"
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,zh-TW;q=0.6',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
# 'Origin': 'https://jubao.baidu.com',
'Pragma': 'no-cache',
'Referer': "https://wappass.baidu.com/",
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36 Edg/135.0.0.0",
'X-Requested-With': 'XMLHttpRequest',
'sec-ch-ua_wap': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'sec-ch-ua_wap-mobile': '?0',
'sec-ch-ua_wap-platform': '"Windows"',
"Cookie": "BDUSS=ldlSDMwdkg5VmlrbE5TZFdHUHVhWEFCTVNqcGtKZHhXeTNaTHFGZHY4Y3F5LVJiQVFBQUFBJCQAAAAAAAAAAAEAAADj3ycY1tC5zNXywO4AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACo-vVsqPr1bS; PHPSESSID=f364o6o7tpsag92pd67630p870; lastIdentity=PassUserIdentity; BAIDUID=5C7396A6BE9E28B769E6E9815A1B8D5E:FG=1; BAIDUID_BFESS=5C7396A6BE9E28B769E6E9815A1B8D5E:FG=1; BDUSS_BFESS=ldlSDMwdkg5VmlrbE5TZFdHUHVhWEFCTVNqcGtKZHhXeTNaTHFGZHY4Y3F5LVJiQVFBQUFBJCQAAAAAAAAAAAEAAADj3ycY1tC5zNXywO4AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACo-vVsqPr1bS",
}
tab = browser.new_tab()
tab.listen.start(r"/cap/(init|style|log)", is_regex=True)
tab.get("https://www.baidu.com")
captcha_data = {}
def listener():
for pkg in tab.listen.steps():
if "/cap/init" in pkg.url:
captcha_data["init"] = pkg.response.body
if "/cap/style" in pkg.url:
captcha_data["style"] = pkg.response.body
captcha_data["referer"] = pkg.request.headers.get("Referer")
logger.debug(f"正确的 referer: {captcha_data["referer"]}")
captcha_data["cookie"] = pkg.request.headers.get("Cookie")
logger.debug(f"cookie: {captcha_data['cookie']}")
if "/cap/log" in pkg.url:
captcha_data["log"] = pkg.response.body
thread = threading.Thread(target=listener, daemon=True)
thread.start()
def verify_captcha(current_url: str):
headers["Referer"] = captcha_data["referer"]
headers["Cookie"] = captcha_data["cookie"]
# 解出AS / TK
as_value = captcha_data["init"]["data"]["as"]
tk_value = captcha_data["init"]["data"]["tk"]
# logger.debug(f"{as_value=}, {tk_value=}")
# ts = time.time()
# ts1 = int(ts)
# ts2 = int(ts * 1000)
# response = requests.post(
# "https://passport.baidu.com/cap/init",
# data={
# "_": ts2,
# "refer": re.sub(r'timestamp=\d+', f'timestamp={ts1}', captcha_data["referer"]),
# "ak": "c27bbc89afca0463650ac9bde68ebe06",
# "ver": "2",
# "scene": "",
# "ds": "",
# "tk": "",
# "as": "",
# "reinit": 0
# },
# headers=headers,
# proxies={
# "http": proxy_str, "https": proxy_str
# }
# ).json()
# as_value = response["data"]["as"]
# tk_value = response["data"]["tk"]
logger.debug(f"{as_value=}, {tk_value=}")
# 解出 style
backstr = captcha_data["style"]["data"]["backstr"]
captcha_link = captcha_data["style"]["data"]["captchalist"][0]["source"]["back"]["path"]
# response = requests.post(
# "https://passport.baidu.com/cap/style",
# data={
# "_": int(time.time() * 1000),
# "refer": re.sub(r'timestamp=\d+', f'timestamp={ts1}', captcha_data["referer"]),
# "ak": "c27bbc89afca0463650ac9bde68ebe06",
# "tk": tk_value,
# "scene": "",
# "isios": "0",
# "type": "spin",
# "ver": "2"
# },
# headers=headers,
# proxies={
# "http": proxy_str, "https": proxy_str
# }
# )
# logger.debug(f"{response.content=}")
# response = response.json()
# backstr = response["data"]["backstr"]
# captcha_link = response["data"]["captchalist"][0]["source"]["back"]["path"]
logger.debug(f"{backstr=}, {captcha_link=}")
# 下载验证码图片
image_response = requests.get(captcha_link, headers=headers)
with open("captcha.png", "wb") as f:
f.write(image_response.content)
logger.debug("download captcha.png")
# 识别验证码
ydm = YdmVerify()
with open("captcha.png", "rb") as fp:
picture = fp.read()
slide_distance = ydm.rotate(picture)
logger.debug(f"{slide_distance=}")
if not slide_distance:
logger.error("识别验证码失败")
return None
rotate_angle_rate = round(slide_distance / 360, 2)
logger.debug(f"{rotate_angle_rate=}")
if not rotate_angle_rate:
logger.debug("识别验证码失败")
return None
# 发送验证码请求
time_log = str(int(time.time() * 1000))
with open("./js/mkd_v2_link_submit.js", 'r', encoding='utf-8') as f:
ds_js = f.read()
fs = execjs.compile(ds_js).call('getFs2', backstr, rotate_angle_rate, as_value)
data = {
"_": time_log,
"refer": captcha_data["referer"],
# "refer": "https://aigc.baidu.com/works",
# "ak": self.get_ak(),
"ak": "c27bbc89afca0463650ac9bde68ebe06", # c27bbc89afca0463650ac9bde68ebe06
# "ak": "76AKmP4xDQjB3vAIPef3KxOlJZWCpw64", # c27bbc89afca0463650ac9bde68ebe06
"as": as_value,
"scene": "",
"tk": tk_value,
"ver": "2",
"cv": "submit",
"typeid": "spin-0",
# fuid 短时间不会变, 指纹, 不同浏览器不一样
# "Edge": "FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGnCfjdYr2J6wEsox+bQtrTEGEimjy3MrXEpSuItnI4KDyOhCNLIvGcJ9TrqWJqhR97vnz96e18U/ntNdoDIMLzy/6P9rOWdIYWmTQAeeG69438PcpN++VzDmPtrURexo5YYWpVFkRs9k5n0AC3djzGRuXr1+yVZXtGGofFzxBmdr9HtaANtMMPysO2XXACLNUNkLLWJN9fLc3OAWce48tpeQQ2ufd7knx4Oo6OM0PpOVDwQcezbGX85VEmymh7f7M5kIyVl+w5yn2AY4BmLrEWEsyw9SzzW8eHQ5zYIUjiw9hXi7OMMoCL+ptOvZYbHZs0R5qLHmmDCW1M8MMX5yyJF0BV1dQvKslKnAJwZu4XCbsXKn3UXosU1U30/poiN2VeXkBPeo8+Xj/4BIoC2I7WZ6zkFa/Uwd5SvC91kvff2a/Z4OwyTQNM7ES9HmRhChdWg0SJ2xEs1aiXAit16RiTlf82esJH+X/j52G7R3ErwQeJT3QoDv64R2702+8NbGIjf1ZOfxhUCpmJqV4jeHSaHRmnKgJZsK91XhhrdJKXdsbt3phIOpxGLupULr2K+v1DNdId8/HuE0776+tTpUl7shVCeM/XWrdkhru42pifhiujnDhIblsLt8grnj5/GRqcD6ZPAXqJW3lLc0/ub9jXgvXK/EczRgKl+7/tTBkPTCrUVtajA0luHLQOrVsXuN1v0/PR3i09SuFzZJkJBKE3M6rYvPttK9NQiBxhxYWDhX82uQu2XK8+8oU3gxCIaJwsQmX/It0kaZ45PZHFqtD40uOX0sXuThvUin4N4RSI2G9d7jPkj5hbBFquQKM4S+tDJ34jmplOTrqqKT7PPVfrdgd4OkK13pEy86BsJ8M0gKXgtivUgM8Bjl1m/pkg0SuDyntWLdrmMxcZYvgySvSSwQ2Qtm8EkKHIMyR/XgfHnpX5vadGpRMro2qaE8u+x8w1gJHIRKib2u6Q1JtQiZE1Rde/vRx8xKfg6uYR37n0BvfgJE5+KbeuwCyAvJRGUA2fpt0VClIfV0m2PRG7bvH00OODKY6cFi7NgWAK6Jc1G4Ugkfp7W8I0ZYwNpTTxVoxIIBF37aBhyiPWPAOeYXBqA",
# Chrome: "FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49IZbsw3/U3NYEqD0LjhKzgMn8fIES5OyXlgwN5I+F8wHowpWWfXkQJw8/9AsO5Q2VOvnc2JlHGIlGS8Vq2z4OA80lVLon08EG3PPxkVZGm39fDi2exK9NDrZB+tNLX6ISxE5PzBgXpCOJ6oP9F1B0OBWaCMD/m01n8FhdDNCvP8EO5cetU79+pgL+ECRdtN6V4VElGJE0mxV4+4Zq4Jf/Xe/q8CkoTNf7Ti1glGYmN32UM9dg0uX+VzET/mmTRe4Dt+MuVHSzsI/bKCjPbpaOqfM8UsxDJUG9hyrGZ8QHa1kC04aTxkkTxI275dv3+ijS1zkWOdjFiy1eD/0R8HcRWYp2smk9EmXBkIAHL4H0gC9lQtdjey37/kyl4JA9Fp4zjuVO0arsD8MrGy1divU++B1KdawGqXpnbOcHZ3CctNGrpgmswaScc6DNWb34jFj0X3tdRE0uuHuqiYa5BClFS2V0TCorKi4CobgR419xWaX8IKLJiaNNLOShWdZdlQO2DXXVxcinzKHqUvWTYx45jsiUVlY78AHQGol6CJLQQ8Q797MShlazvdSwPXgJP5z0uMJp9L+3x/Y2GGhW5sit55sFuMXafALTYf69FCUw5+nVIRs150a4+KK+tA0Eu7Itiu3dM2pflKYWwPE6SDZznyejQ08vd+HpXRB/zhfSUcIYlT5gFEiMIA6SXZCo/XT7vC8D3gHdN+yr46XdVol/WkjFQof0JQH/Vhjj5C1xcAyNxq/VVBT01vdKk6zo6c08e84FEVMLd0m3XWtjFOYu7wRI7lldw2pSxyGnWvA4aiYWcWvvKNJtqB8wHqc5RPr9KRzhbxJnTM5K1vTx4xT/1ZUR3pU7nQKZo/4kP9XycIr/Jg3XMRSnqCBUJlagKAFPt2HF0LdsSk4WWcldb97Ar584nVGbSjPXEUVH0VgbUEm+dADzPoLP+NPMYOyhwgfADiqWaXyKT4UNESYXsPBkdGk6mLCaNSEQsDN1G2677Se3qjzDcyXBnEmHEFptRbmyJzKJ73veHPqfFYtsHO9jH0XnhYk8zKdRuqQ7dnuNIDwxm3UCPo22uFI0ZcgPvQm01s+8jYiMEFJDVra9jWyWTdMpMuhT3p2yYLf70CvUwIkw=",
# fuid.length = 1280, length 235 变化
# FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGnCfjdYr2J6wEsox+bQtrTEGEimjy3MrXEpSuItnI4KDyOhCNLIvGcJ9TrqWJqhR97
"fuid": "FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGnfaJTn/Ne60I9LwR04t6XmGEimjy3MrXEpSuItnI4KD0FJKzTbw1AN69fBnzR2FuvMmmQZ+1zgJ72wdcVU+mcQxiE2ir0+TEYgjPJt1Qa3K1mLi+P4IWJeag2lvxB4yJ/GgLbz7OSojK1zRbqBESR5Pdk2R9IA3lxxOVzA+Iw1TWLSgWjlFVG9Xmh1+20oPSbrzvDjYtVPmZ+9/6evcXmhcO1Y58MgLozKnaQIaLfWRPAn9I0uOqAMff6fuUeWcH1mRYoTw2Nhr4J4agZi377iM/izL6cVCGRy2F8c0VpEvM5FjnYxYstXg/9EfB3EVmKAfzNRIeToJ5YV9twMcgdmlV1Uhhp5FAe6gNJIUptp7EMAaXYKm11G+JVPszQFdp9AJLcm4YSsYUXkaPI2Tl66J246cmjWQDTahAOINR5rXR5r/7VVI1RMZ8gb40q7az7vCK56XLooKT5a+rsFrf5Zu0yyCiiagElhrTEOtNdBJJq8eHwEHuFBni9ahSwpC7lbKkUwaKH69tf0DFV7hJROiLETSFloIVkHdy3+I2JUr1LsplAz0hMkWt/tE4tXVUV7QcTDTZWS/2mCoS/GV3N9awQ6iM6hs/BWjlgnEa1+5iP7WSc7RJ34FaE5PsyGXyoCWdXwNRGSZPSvVtB/Ea6w5FKazXcZ/j40FJv+iLGBn3nkkgHlne61I8I7KhtQgIkmBMJIjPMkS/L051MeqdGScsKYTJuSucgI5c3+79eVH+y2TvbOTuuHv1uGxwXFb2atIU1ZYPbmmXculmizKcKI/s44qf8uM8iBZLGkKeVyL74aPyLkg7Gk359g98BIGN/ZzJR/h+Y6AyFx+HlMoYJnS06dVmqFbvlCtSdGylKQ5f8eWtxPkJGqOFtWjIVteQYMsH/AaSJonqw+WLiZvGjYfm9p0alEyujapoTy77HzDcUoU1wUSXa5xS/Z6hXEr2OnLi0LdPVcGjz8lpLcdVeSfm9p0alEyujapoTy77HzDWf5PERRSTFqLd9BTUHLyY4Ji3EQLGQPaM1aeHxG1bJZH0s1Si/KwzTaTYzu6ziQiqwcr2kaYUiH+fMOxn69/BhNJVMhpQkhprc1KZuJRvXjppq0gKweencPxgS/jd0rjw==",
"fs": fs
}
# logger.info(data)
response = requests.post(
"https://passport.baidu.com/cap/log",
headers=headers,
data=data,
proxies={"http": proxy_str, "https": proxy_str},
).json()
try:
result = {
"ds": response["data"]["ds"],
"op": response["data"]["op"],
"tk": response["data"]["tk"]
}
except KeyError:
logger.error(f"验证码没转成功, response: {response=}")
time.sleep(1)
return None
logger.debug(f"{result=}")
# 检查验证码是否正确
if result["op"] != 1:
logger.error(f"op != 1, 重试")
return None
# 发送验证码请求 /cap/c 请求获取待跳转的URL
response = requests.post(
"https://passport.baidu.com/cap/c?ak=c27bbc89afca0463650ac9bde68ebe06",
headers=headers,
json={
"tk": result["tk"],
"ds": result["ds"],
"qrsign": "",
"refer": captcha_data["referer"]
},
proxies={"http": proxy_str, "https": proxy_str},
)
data = response.json()
if data["data"].get("f"):
logger.error(f"验证码失败: {data['data'].get('f')}")
return None
if data["data"].get("s"):
logger.debug("验证成功URL" + data["data"].get("s").get("url"))
url = data["data"].get("s").get("url")
url = url.encode("utf-8").decode("unicode-escape")
logger.success("解码后的URL" + url)
return url
current_page = 1
while current_page < 15:
tab.get(f"https://www.baidu.com/s?wd=site%3Abaidu.com&pn={(current_page - 1) * 10}")
current_page += 1
if "wappass.baidu.com/static/captcha/tuxing_v2.html" in tab.url:
logger.debug("captcha!!!!")
time.sleep(2)
idx = 0
while idx < 3:
idx += 1
url = verify_captcha(tab.url)
if not url:
tab.refresh()
time.sleep(3)
else:
tab.get(url)
time.sleep(30)
logger.debug(f"{captcha_data=}")
# browser.quit()

8
tests/test_unicode.py Normal file
View File

@ -0,0 +1,8 @@
import certifi
print(certifi.where())
url = r"https://www.baidu.com/s?wd=site%3Abaidu.com\u0026pn=10\u0026p_tk=30610C1sd8U0U%2BPJYAWv8nhtnx0emHFxWZ9edG%2BaRz9YAiXcODGGnlpuX%2FIMRoUmFESarFc5H8HQuG2nq8%2FVXRIsPZt%2BoxjJAmxxHNGCVs0oz%2FZSTZsdUlvw5a53dshtXQASLvZg71Bg4ZT6j%2B5a%2B%2FM3CHWuHs8cjlMBRCAX4l%2BZt8k%3D\u0026p_timestamp=1744202399\u0026p_sign=a1ee13c92f54d14d019cbdd8edcb4088\u0026p_signature=737f76b967318af4b309d30784d440c5\u0026__pc2ps_ab=30610C1sd8U0U%2BPJYAWv8nhtnx0emHFxWZ9edG%2BaRz9YAiXcODGGnlpuX%2FIMRoUmFESarFc5H8HQuG2nq8%2FVXRIsPZt%2BoxjJAmxxHNGCVs0oz%2FZSTZsdUlvw5a53dshtXQASLvZg71Bg4ZT6j%2B5a%2B%2FM3CHWuHs8cjlMBRCAX4l%2BZt8k%3D|1744202399|737f76b967318af4b309d30784d440c5|a1ee13c92f54d14d019cbdd8edcb4088"
url = url.encode("utf-8").decode("unicode-escape")
print(url)