60 lines
2.0 KiB
Python

from datetime import datetime
from sqlalchemy import create_engine, event, text
from sqlalchemy.dialects.mysql import BIGINT
from sqlmodel import SQLModel, Field
from ..config import AppConfig
from ..config.config import AppCtx
def get_timestamp():
return int(datetime.now().timestamp())
class BaseModel(SQLModel):
"""数据库基类"""
id: int = Field(default=None, primary_key=True, sa_type=BIGINT(unsigned=True))
created_at: int = Field(default_factory=get_timestamp, sa_type=BIGINT(unsigned=True))
updated_at: int = Field(default_factory=get_timestamp, sa_type=BIGINT(unsigned=True))
@event.listens_for(BaseModel, "before_update")
def update_updated_at(mapper, connection, target):
"""更新数据库记录的时候,自动更新 updated_at 字段"""
target.updated_at = get_timestamp()
# noinspection PyUnresolvedReferences
def connect_db(config: AppConfig):
"""连接数据库"""
# 导入所有模型,为了自动创建数据表
from .domain import DomainModel
from .report_urls import ReportUrlModel
dsn = f"mysql+pymysql://{config.database.user}:{config.database.password}@{config.database.host}:{config.database.port}/{config.database.database}"
engine = create_engine(dsn, echo=False, pool_size=4, max_overflow=10, pool_recycle=60, pool_pre_ping=True)
SQLModel.metadata.create_all(engine)
AppCtx.g_db_engine = engine
return engine
def create_database(config: AppConfig):
"""如果数据库不存在,就调用这个函数初始化数据库"""
# 先创建一个没有指定数据库的连接
dsn = f"mysql+pymysql://{config.database.user}:{config.database.password}@{config.database.host}:{config.database.port}"
engine = create_engine(dsn, echo=False)
with engine.connect() as conn:
conn.execute(
text(
f"CREATE DATABASE IF NOT EXISTS {config.database.database} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
)
)
conn.commit()