更改sqlite为mysql
from tortoise import Tortoise import asyncio async def init(): user = 'root' password = '123456' db_name = 'test' await Tortoise.init( #指定mysql信息 db_url=f'mysql://{user}:{password}@127.0.0.1:3306/{db_name}', #指定models modules={'models': ['sanic_bp.models']} ) #按照模型生成表 # await Tortoise.generate_schemas() asyncio.run(init())
在sanic中使用:
from sanic import Sanic from tortoise import Tortoise app = Sanic(__name__) @app.listener('after_server_start') async def notify_server_started(app, loop): print('sanic sanic服务启动后建立mysql连接') #实例化mysql连接 await Tortoise.init( db_url='mysql://root:123456@127.0.0.1:3306/test?maxsize=50&minsize=3', modules={'models': ['sanic_bp.models']} )
基础使用
from tortoise.contrib.sanic import register_tortoise register_tortoise( app, db_url="sqlite://:memory:", modules={"models": ["models"]}, generate_schemas=True )
sanic 和 fastapi中使用
tortoise-orm 连接多个库
async def run(): await Tortoise.init( { #这里面指定两个不同的数据库,虽然它填写的都是一样的 "connections": { "first": { "engine": "tortoise.backends.sqlite", "credentials": {"file_path": "example.sqlite3"}, }, "second": { "engine": "tortoise.backends.sqlite", "credentials": {"file_path": "example1.sqlite3"}, }, }, #可以指定多个不同的models, 嗯,它指定的也是一样的 "apps": { "tournaments": {"models": ["__main__"], "default_connection": "first"}, "events": {"models": ["__main__"], "default_connection": "second"}, }, } ) #await Tortoise.generate_schemas() #选取数据库 client = Tortoise.get_connection("first") #选取 models second_client = Tortoise.get_connection("second") #然后这下面就可以进行orm操作了
增删改查
from typing import List from fastapi import APIRouter, HTTPException from tortoise.contrib.fastapi import HTTPNotFoundError from schemas.MaintainedData import MaintainedData_Pydantic, MaintainedDataIn_Pydantic from models.MaintainedData import MaintainedData from schemas.basic import Response404, Response200 data = APIRouter(tags=['数据模块']) @data.get("/", summary="数据列表", response_model=List[MaintainedData_Pydantic]) async def data_list(order:str,limit: int = 10, page: int = 1): # limit 是显示的条数 ,page是页数 skip = (page - 1) * limit # select * from movie limit offset,limit # select * from movie limit 0,10 return await MaintainedData_Pydantic.from_queryset(MaintainedData.all().offset(skip).limit(limit)) @data.post("/data", summary="新增数据", ) async def add_data(md_data: MaintainedData_Pydantic): movie_obj = await MaintainedData.create(**md_data.dict(exclude_unset=True)) # movie_obj = await Movie.create(name="",year="",xx="") return await MaintainedData_Pydantic.from_tortoise_orm(movie_obj) @data.put("/data/{data_id}", summary="编辑数据", responses={404: {"model": HTTPNotFoundError}}) async def update_data(site_id: str, md_data: MaintainedDataIn_Pydantic): updated_count = await MaintainedData.filter(site_id=site_id).update(**md_data.dict(exclude_unset=True)) print(updated_count) if not updated_count: return Response404(msg=f"data {site_id} not found") return Response200(data=await MaintainedData_Pydantic.from_queryset_single(MaintainedData.get(site_id=site_id))) @data.get("/data/{data_id}", summary="查找数据", response_model=MaintainedData_Pydantic, responses={404: {"model": HTTPNotFoundError}}) async def get_user(site_id: str): return await MaintainedData_Pydantic.from_queryset_single(MaintainedData.get(site_id=site_id)) @data.delete("/data/{data_id}", summary="删除数据", responses={404: {"model": HTTPNotFoundError}}) async def del_movie(site_id: str): deleted_count = await MaintainedData.filter(site_id=site_id).delete() if not deleted_count: raise HTTPException(status_code=404, detail=f"data {site_id} not found") return HTTPException(status_code=200, detail=f"Deleted Movie {site_id}")
from datetime import datetime from typing import Optional, Iterable from tortoise import fields, models, BaseDBAsyncClient class MaintainedData(models.Model): """ The User model """ id = fields.SmallIntField(pk=True, index=True, description="ID", ) site_id = fields.CharField(max_length=20, unique=True, description="网站ID") site_name = fields.CharField(max_length=128, null=False, description="网站名称") site_type = fields.CharField(max_length=16, null=False, description="网站类型") description = fields.CharField(max_length=255, default="",description="网站备注", ) site_path_name = fields.CharField(max_length=128, null=True, description="目录名称") site_path_url = fields.CharField(max_length=128, null=True, description="目录链接") status = fields.CharField(max_length=128, null=True, description="脚本状态") run_computer = fields.CharField(max_length=255, null=True, description="运行电脑") run_directory = fields.CharField(max_length=255, null=True, description="运行目录") crawling_time :Optional[datetime]= fields.DatetimeField(auto_add=True, description="运行时间") err_message = fields.CharField(max_length=255, null=True, description="错误信息")
order by
db_Secc = MaintainedData.all().order_by("crawling_time")
Eum 枚举
class order_byEnum(str, Enum): pear = "" banana = '-' @data.get("/", summary="数据列表", response_model=List[MaintainedData_Pydantic]) async def data_list( order_by: order_byEnum = "", limit: int = 10, page: int = 1): # limit 是显示的条数 ,page是页数 skip = (page - 1) * limit return await MaintainedData_Pydantic.from_queryset(MaintainedData.all().offset( skip).limit( limit).order_by( f"{order_by}crawling_time"))