import os.path import random import re import time import requests from loguru import logger from sqlmodel import Session, select from .base import BaseReporter from ...config.config import AppCtx from ...models.report_urls import ReportUrlModel from ...utils.common import get_all_cookies, get_proxies, report_keywords, get_reporter_name, \ generate_random_phone_number, md5 from ...utils.gen_cookie import GenCookie from ...utils.ua import random_ua class SiteReporter(BaseReporter): def __init__(self): self.engine_name = "SITE_REPORTER" self.upload_pic_url = "https://help.baidu.com/api/mpic" self.report_url = "https://help.baidu.com/jubaosubmit" self.request = requests.session() self.headers = { "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": random_ua(), "Accept": "application/json, text/javascript, */*; q=0.01", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,zh-TW;q=0.6", "sec-ch-ua_wap": '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"', "sec-ch-ua_wap-mobile": "?0", "sec-ch-ua_wap-platform": '"Windows"', "Cookie": "", } self.database = AppCtx.g_db_engine self.all_cookies = get_all_cookies() self.proxies = get_proxies() self.token_pattern = r'name="submit_token" value="(.*?)"' def run(self): """实现 PC 端的举报逻辑""" with Session(self.database) as session: stmt = select(ReportUrlModel).where(ReportUrlModel.is_report_by_site == False) rows: list[ReportUrlModel] = session.exec(stmt).all() logger.info(f"[{self.engine_name}] 共计 {len(rows)} 条需要举报") for row in rows: # 生成举报需要的基础数据 surl = row.surl q = row.q domain = row.domain img_path = f"./imgs/{domain}/{md5(surl)}.png" if not os.path.exists(img_path): logger.warning(f"{surl} 的截图不存在!") continue # 生成 cookie cookie = random.choice(self.all_cookies) report_site_cookie = GenCookie.run(cookie) self.headers["Cookie"] = report_site_cookie self.headers["User-Agent"] = random_ua() logger.debug(f"设置 cookie 为:{report_site_cookie}") # 先上传图片 pic_filename = self.upload_pic(img_path) # 再举报 result = self.do_report(domain, q, pic_filename, surl) if result: row.is_report_by_site = True session.add(row) session.commit() # 等待5秒继续举报 time.sleep(5) def upload_pic(self, img_path: str): try: with open(img_path, "rb") as fp: files = {"pic": fp} response = self.request.post( self.upload_pic_url, headers=self.headers, files=files, proxies=self.proxies, allow_redirects=False, timeout=5 ) pic_filename = response.json()["filename"] return pic_filename except Exception as e: logger.error(f"上传举报图片失败,错误: {e}") return None def do_report(self, domain: str = "", keyword: str = "", filename: str = "", link: str = ""): try: # 先获取提交数据用的 token response = self.request.get( "https://help.baidu.com/jubao", headers=self.headers, proxies=self.proxies, allow_redirects=False, timeout=5, ) match = re.search(self.token_pattern, response.text) if match: token_value = match.group(1) logger.debug(f"成功获取 submit_token: {token_value}") else: logger.error("未找到匹配的 submit_token") return None # 组装提交数据 if len(keyword) > 50: keyword = f"site {domain} {random.choice(report_keywords())}" data = { "submit_token": token_value, "os_info": "Macintosh", "browser_info": f"chrome 1{random.randint(20, 34)}", "pid": "1", "type": "3", "realname": get_reporter_name(), "mobile": generate_random_phone_number(), "category_id": "3", "query": keyword, "title": domain, "links": f"{link}", "content": f"{link} 存在大量淫秽色情的收录搜索结果!!!", "pic[]": filename } # 提交请求 response = self.request.post( self.report_url, data=data, headers=self.headers, proxies=self.proxies, allow_redirects=False ) json_data = response.json() # logger.debug(f"response={json_data}") if json_data["errno"] == 0: logger.success(f"[{self.engine_name}]{link} 举报成功") return True if json_data["errno"] == 11: logger.success(f"[{self.engine_name}]{link} 已经举报过") return True logger.debug(f"{json_data=}") return False except Exception as e: logger.error(f"[{self.engine_name}] 举报错误:{e}") return False