from typing import Annotated from fastapi import APIRouter, UploadFile, Form, Query from app.constants.api_result import ApiCode from app.constants.domain import DomainStatus from app.web.request.domain_request import AddDomainRequest, DeleteDomainRequest, UpdateDomainRequest, \ GetDomainListRequest, CrawlNowRequest, ToggleDomainRequest from app.web.results import ApiResult from app.web.service.domain_service import DomainService router = APIRouter(prefix="/api/domain", tags=["域名管理"]) @router.get("/v1/list") def get_all_domains(request: Annotated[GetDomainListRequest, Query()]): """获取所有的域名信息,支持根据域名、状态进行搜索,不传则返回全部数据,支持分页""" return DomainService.get_list(request.page, request.size, request.domain, request.status) @router.post("/v1/add") def add_domains(request: AddDomainRequest): """添加域名""" # 检查是否有重复的 result = DomainService.get_by_domains(request.domains) if not result.success: return result existed_domains = [item.domain for item in result.data] new_domains = [x for x in request.domains if x not in existed_domains] if not new_domains: return ApiResult.ok(0) # 添加并返回 return DomainService.add_domains(request.crawl_interval, request.crawl_now, new_domains) @router.post("/v1/import") def import_domains( # 同时提交文件和参数的时候,没办法使用 FormModel 的形式,必须一个一个定义 file: UploadFile, crawl_interval: int = Form(), crawl_now: bool = Form(), ): """通过上传文件添加域名,如果单个文件很大,以后改成开新协程/线程处理""" # 把文件内容读出来 domains = [] for line in file.file: line = line.strip() domains.append(line.decode("UTF-8")) # 创建协程任务 # asyncio.create_task(DomainService.add_domains(crawl_interval, crawl_now, domains)) # 检查是否有重复域名 result = DomainService.get_by_domains(domains) if not result.success: return result existed_domains = [item.domain for item in result.data] new_domains = [x for x in domains if x not in existed_domains] # 添加并返回 return DomainService.add_domains(crawl_interval, crawl_now, new_domains) # noinspection DuplicatedCode @router.post("/v1/update") def update_domain(request: UpdateDomainRequest): """更新域名的数据,主要是采集间隔,支持批量修改,传入多个 id""" # 检查待更新的域名是否存在 result = DomainService.get_by_ids(request.domain_ids) if not result.success: return result existed_domain_ids = [item.id for item in result.data] for domain_id in request.domain_ids: if domain_id not in existed_domain_ids: return ApiResult.error(ApiCode.PARAM_ERROR.value, f"域名 ID {domain_id} 不存在") # 更新刷新时间 return DomainService.update_domain_interval(request.domain_ids, request.crawl_interval) # noinspection DuplicatedCode @router.post("/v1/delete") def delete_domain(request: DeleteDomainRequest): """删除域名,支持批量删除,传入多个 id""" # 检查待删除的域名是否存在 result = DomainService.get_by_ids(request.domain_ids) if not result.success: return result existed_domain_ids = [item.id for item in result.data] for domain_id in request.domain_ids: if domain_id not in existed_domain_ids: return ApiResult.error(ApiCode.PARAM_ERROR.value, f"域名 ID {domain_id} 不存在") # 删除域名 return DomainService.delete_domains(request.domain_ids, request.remove_surl) @router.post("/v1/crawl") def crawl_now(request: CrawlNowRequest): """立即爬取,实际上是把 status 置为 2""" result = DomainService.update_domain_status(request.domain_ids, DomainStatus.QUEUEING.value) return result @router.post("/v1/toggle") def toggle_domain(request: ToggleDomainRequest): """暂停爬取某个域名""" return DomainService.update_domain_status(request.domain_ids, DomainStatus.PAUSE.value)