FastAPI第四天
1. 多应用程序管理
当我们开发的时候,往往会涉及到大量的路由,如果将所有的路由都写在一个文件中,不利于我们对于某个路由以及其处理函数进行修改,更不利于整个项目后期的维护升级。而且一个文件中代码行数过大还会使得开发尤为不便,因此需要将路由进行分文件(模块化)处理。
相信讲到这里,之前有学习过flask框架的应该都有感觉,这不就是flask中的蓝图吗?没错,FastAPI中的APIRouter与蓝图非常相似,同样都是为了分文件编写路由,也都是需要最终到主文件中进行注册。话不多说,借助官网的例子来理解一下。
首先来看看项目的目录结构
main
是最终的主文件,也就是FastAPI实例化以及路由注册的地方routers
和internal
分别是两个路由组文件
为了简便化,像在routers
中有两个路由的情况,我们直接把APIRouter实例化在__init__
文件中,然后分别引入就好。
items.py
from ..routers import router
from fastapi import HTTPException
data={1:{"name":'aaa'},2:{"name":'bbb'}}
@router.get("/items/",tags=['items'])
async def read_data():
return data
@router.get("/items/{item_id}",tags=["items"])
async def read_item(item_id:int):
if item_id not in data:
raise HTTPException(status_code=404,detail="Item not Found!")
return {"item_id":item_id,"name":data[item_id]["name"]}
@router.put("/items/{item_id}",tags=["custom"],responses={403:{"description":"Operation forbidden"}})
async def update_item(item_id:int):
if item_id!=3:
raise HTTPException(
status_code=403,
detail="You can only update the item:3"
)
return {"item_id":item_id,"name":"ccc"}
users.py
from ..routers import router
@router.get("/users/",tags=["users"])
async def read_users():
return [
{"username":"aaa"},
{"username":"bbb"}
]
@router.get("/users/{username}",tags=['users'])
async def read_user(username:str):
return {"username":username}
admin.py
from fastapi import APIRouter
router=APIRouter()
@router.post("/")
async def update_admin():
return {"message":"Admin change"}
main.py
from fastapi import FastAPI
from .routers import items,users
from .internal import admin
app=FastAPI()
app.include_router(users.router)
app.include_router(items.router)
app.include_router(
admin.router,
prefix="/admin",
tags=['admin'],
)
@app.get("/")
async def root():
return {"message":"hello"}
这里稍微总结一下有关的知识
prefix
代表前缀,比如prefix=/index
,那么后面如果是router.get("aaa")
请求的实际上是/index/aaa
tags
是标签,也就是Swagger中的名字
- 编写好
router
后不要忘了使用.include_router()
将写好的进行注册
2. 数据库相关
我们除了将数据存储到文件,其实更多的就是将数据存储到数据库中,这样更利于对较大量数据进行管理(增删改查),所以下面就来看看与数据库相关的操作。
首先是数据库连接部分
import sqlalchemy
from databases import Database
DATABASE_URL="sqlite:///./test.db"
database=Database(DATABASE_URL)
sqlalchemy_engine=sqlalchemy.create_engine(DATABASE_URL)
def get_database()->Database:
return database
然后再来定义数据模型
from datetime import datetime
from typing import Optional
import sqlalchemy
from pydantic import BaseModel, Field
class PostBase(BaseModel):
title: str
content: str
publication_date: datetime = Field(default_factory=datetime.now)
class PostPartialUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
class PostCreate(PostBase):
pass
class PostDB(PostBase):
id: int
metadata = sqlalchemy.MetaData()
posts=sqlalchemy.Table(
"posts",
metadata,
sqlalchemy.Column("id",sqlalchemy.Integer,primary_key=True,autoincrement=True),
sqlalchemy.Column("publication_date",sqlalchemy.DateTime(),nullable=False),
sqlalchemy.Column("title",sqlalchemy.String(length=255),nullable=False),
sqlalchemy.Column("content",sqlalchemy.Text(),nullable=False),
)
首先定义了数据库的Pydantic模型PostBase
,然后定义数据库的提交表格式
这里就类似于创建数据库表的时候定义字段一样,有类型、主键、自增、是否可以为空等等,这部分内容就更偏向于数据库基础,btw过段时间也准备重新学习一遍Mysql。
有了数据库连接以及定义之后,就可以创建增删改查的接口了
from typing import List, Tuple
from databases import Database
from fastapi import Depends, FastAPI, HTTPException, Query, status
from database import get_database, sqlalchemy_engine
from models import (
metadata,
posts,
PostDB,
PostCreate,
PostPartialUpdate,
)
app = FastAPI()
@app.on_event("startup")
async def startup():
await get_database().connect()
metadata.create_all(sqlalchemy_engine)
@app.on_event("shutdown")
async def shutdown():
await get_database().disconnect()
async def pagination(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=0),
) -> Tuple[int, int]:
capped_limit = min(100, limit)
return (skip, capped_limit)
async def get_post_or_404(
id: int, database: Database = Depends(get_database)
) -> PostDB:
select_query = posts.select().where(posts.c.id == id)
raw_post = await database.fetch_one(select_query)
if raw_post is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return PostDB(**raw_post)
@app.get("/posts")
async def list_posts(
pagination: Tuple[int, int] = Depends(pagination),
database: Database = Depends(get_database),
) -> List[PostDB]:
skip, limit = pagination
select_query = posts.select().offset(skip).limit(limit)
rows = await database.fetch_all(select_query)
results = [PostDB(**row) for row in rows]
return results
@app.get("/posts/{id}", response_model=PostDB)
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:
return post
@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(
post: PostCreate, database: Database = Depends(get_database)
) -> PostDB:
insert_query = posts.insert().values(post.dict())
post_id = await database.execute(insert_query)
post_db = await get_post_or_404(post_id, database)
return post_db
@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(
post_update: PostPartialUpdate,
post: PostDB = Depends(get_post_or_404),
database: Database = Depends(get_database),
) -> PostDB:
update_query = (
posts.update()
.where(posts.c.id == post.id)
.values(post_update.dict(exclude_unset=True))
)
post_id = await database.execute(update_query)
post_db = await get_post_or_404(post_id, database)
return post_db
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
post: PostDB = Depends(get_post_or_404), database: Database = Depends(get_database)
):
delete_query = posts.delete().where(posts.c.id == post.id)
await database.execute(delete_query)
在开始,我们设置了两个事件
在我们需要的时候引入依赖函数,得到数据库的连接或者关闭数据库连接。然后在路由中设置增删改查的操作
insert_query = posts.insert().values(post.dict())
post_id = await database.execute(insert_query)
post_db = await get_post_or_404(post_id, database)
插入语句就是先设置好插入请求语句,然后执行这条语句。
其他的也是类似,这样我们就能实现基本的数据库增删改查操作。