from typing import Optional from loguru import logger from sqlalchemy import update, func from sqlmodel import Session, select from app.config.config import AppCtx from app.constants.api_result import ApiCode from app.models.report_urls import ReportUrlModel from app.web.request.report_request import AddUrlItem from app.web.results import ApiResult class ReportURLService: @classmethod def get_list( cls, domain: str, surl: str, is_report_by_one: Optional[int], is_report_by_site: Optional[int], is_report_by_wap: Optional[int], has_evidence: Optional[int], page: int, size: int ): with Session(AppCtx.g_db_engine) as session: stmt = select(ReportUrlModel) total_stmt = select(func.count(ReportUrlModel.id)) if domain: stmt = stmt.where(ReportUrlModel.domain.like(f"%{domain}%")) total_stmt = total_stmt.where(ReportUrlModel.domain.like(f"%{domain}%")) if surl: stmt = stmt.where(ReportUrlModel.surl.like(f"%{surl}%")) total_stmt = total_stmt.where(ReportUrlModel.surl.like(f"%{surl}%")) if is_report_by_one and is_report_by_one != 2: stmt = stmt.where(ReportUrlModel.is_report_by_one == is_report_by_one) total_stmt = total_stmt.where(ReportUrlModel.is_report_by_one == is_report_by_one) if is_report_by_site and is_report_by_site != 2: stmt = stmt.where(ReportUrlModel.is_report_by_site == is_report_by_site) total_stmt = total_stmt.where(ReportUrlModel.is_report_by_site == is_report_by_site) if is_report_by_wap and is_report_by_wap != 2: stmt = stmt.where(ReportUrlModel.is_report_by_wap == is_report_by_wap) total_stmt = total_stmt.where(ReportUrlModel.is_report_by_wap == is_report_by_wap) if has_evidence and has_evidence != 2: stmt = stmt.where(ReportUrlModel.has_evidence == has_evidence) total_stmt = total_stmt.where(ReportUrlModel.has_evidence == has_evidence) # 设置分页 stmt = stmt.offset((page - 1) * size).limit(size) # logger.debug(f"{str(stmt)=}") try: total = session.exec(total_stmt).first() urls = session.exec(stmt).all() return ApiResult.ok({ "total": total, "data": urls, }) except Exception as e: logger.error(f"获取URL列表失败: {e}") return ApiResult.error(ApiCode.DB_ERROR.value, str(e)) @classmethod def add_urls(cls, domain_map: dict[str, int], urls: list[AddUrlItem]) -> ApiResult[Optional[int]]: """添加URL""" if not urls: return ApiResult.ok(0) models = [] for url in urls: domain_id = domain_map.get(url.domain, None) if not domain_id: return ApiResult.error(ApiCode.PARAM_ERROR.value, f"域名 {url.domain} 不存在") models.append(ReportUrlModel( domain_id=domain_id, domain=url.domain, surl=url.surl, )) with Session(AppCtx.g_db_engine) as session: try: session.add_all(models) session.commit() return ApiResult.ok(len(models)) except Exception as e: logger.error(f"添加URL失败: {e}") session.rollback() return ApiResult.error(ApiCode.DB_ERROR.value, str(e)) @classmethod def batch_update_evidence_flag(cls, url_ids: list[int]): """批量更新URL的has_evidence字段""" with Session(AppCtx.g_db_engine) as session: try: stmt = update(ReportUrlModel).where(ReportUrlModel.id.in_(url_ids)).values(has_evidence=False) session.exec(stmt) session.commit() return ApiResult.ok(len(url_ids)) except Exception as e: logger.error(f"批量更新URL的has_evidence字段失败: {e}") session.rollback() return ApiResult.error(ApiCode.DB_ERROR.value, str(e)) @classmethod def batch_update_report_flag(cls, ids: list[int], report_by_one: bool, report_by_site: bool, report_by_wap: bool): with Session(AppCtx.g_db_engine) as session: try: stmt = update(ReportUrlModel).where(ReportUrlModel.id.in_(ids)) if report_by_wap: stmt = stmt.values(is_report_by_wap=False) if report_by_site: stmt = stmt.values(is_report_by_site=False) if report_by_one: stmt = stmt.values(is_report_by_one=False) logger.debug(f"{str(stmt)=}") session.exec(stmt) session.commit() return ApiResult.ok(len(ids)) except Exception as e: logger.error(f"批量更新URL的has_evidence字段失败: {e}") session.rollback() return ApiResult.error(ApiCode.DB_ERROR.value, str(e))