57 lines
1.8 KiB
Python
57 lines
1.8 KiB
Python
from datetime import datetime
|
|
|
|
from sqlalchemy import create_engine, event, text
|
|
from sqlalchemy.dialects.mysql import BIGINT
|
|
from sqlmodel import SQLModel, Field
|
|
|
|
from ..config import AppConfig
|
|
from ..config.config import AppCtx
|
|
|
|
|
|
def get_timestamp():
|
|
return int(datetime.now().timestamp())
|
|
|
|
|
|
class BaseModel(SQLModel):
|
|
"""数据库基类"""
|
|
|
|
id: int = Field(default=None, primary_key=True, sa_type=BIGINT(unsigned=True))
|
|
created_at: int = Field(default_factory=get_timestamp, sa_type=BIGINT(unsigned=True))
|
|
updated_at: int = Field(default_factory=get_timestamp, sa_type=BIGINT(unsigned=True))
|
|
|
|
|
|
@event.listens_for(BaseModel, "before_update")
|
|
def update_updated_at(mapper, connection, target):
|
|
"""更新数据库记录的时候,自动更新 updated_at 字段"""
|
|
target.updated_at = get_timestamp()
|
|
|
|
|
|
def connect_db(config: AppConfig):
|
|
"""连接数据库"""
|
|
|
|
# 导入所有模型,为了自动创建数据表
|
|
|
|
dsn = f"mysql+pymysql://{config.database.user}:{config.database.password}@{config.database.host}:{config.database.port}/{config.database.database}"
|
|
engine = create_engine(dsn, echo=False)
|
|
|
|
SQLModel.metadata.create_all(engine)
|
|
AppCtx.g_db_engine = engine
|
|
|
|
return engine
|
|
|
|
|
|
def create_database(config: AppConfig):
|
|
"""如果数据库不存在,就调用这个函数初始化数据库"""
|
|
|
|
# 先创建一个没有指定数据库的连接
|
|
dsn = f"mysql+pymysql://{config.database.user}:{config.database.password}@{config.database.host}:{config.database.port}"
|
|
engine = create_engine(dsn, echo=False)
|
|
|
|
with engine.connect() as conn:
|
|
conn.execute(
|
|
text(
|
|
f"CREATE DATABASE IF NOT EXISTS {config.database.database} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
|
|
)
|
|
)
|
|
conn.commit()
|