FastAPI(58)- 使用 OAuth2PasswordBearer 的簡單栗子

背景

  • 假設在某個域中擁有後端 API(127.0.0.1:8080)
  • 並且在另一個域或同一域的不同路徑(或移動應用程式)中有一個前端(127.0.0.1:8081)
  • 並且希望有一種方法讓前端使用用戶名和密碼與後端進行身份驗證
  • 可以使用 OAuth2 通過 FastAPI 來構建它,通過 FastAPI 提供的工具來處理安全性

 

OAuth2 的授權模式

  • 授權碼授權模式 Authorization Code Grant
  • 隱式授權模式 Implicit Grant
  • 密碼授權模式 Resource Owner Password Credentials Grant
  • 客戶端憑證授權模式 Client Credentials Grant

這裡講 FastAPI 的是第三種

 

密碼授權模式的簡易流程圖

  1. 用戶在客戶端輸入用戶名、密碼
  2. 客戶端攜帶用戶名、密碼去請求授權伺服器,訪問獲取 token 的介面
  3. 授權伺服器驗證用戶名、密碼(身份驗證)
  4. 驗證通過後,返回這個用戶的 token 到客戶端
  5. 客戶端存儲 token,在後續發送請求攜帶該 token,就能通過身份驗證了

 

FastAPI 中使用 OAuth2 的簡單栗子

import uvicorn
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}

if __name__ == '__main__':
    uvicorn.run(app="49_bearer:app", reload=True, host="127.0.0.1", port=8080)

  

程式碼解析

  • OAuth2 旨在使後端或 API 可以獨立於對用戶進行身份驗證的伺服器
  • 但在這種情況下,同一個 FastAPI 應用程式將同時處理 API 和身份驗證
  • 前端請求 /items 的之前要先進行身份驗證,也就是用戶名和密碼,這個驗證的路徑就是 tokenUrl,是相對路徑POST請求
  • oauth2_scheme 中接收一個 str 類型的 token,就是當驗證通過後,要返回給客戶端的一個令牌(常說的 token)
  • 方便下次請求攜帶這個 token 就可以通過身份認證,這個 token 有過期時間,過期後需要重新驗證

 

OAuth2PasswordBearer 

  • 使用 OAuth2、密碼授權模式、Bearer Token(不記名 token),就是通過 OAuth2PasswordBearer 來完成
  • OAuth2PasswordBearer 是接收 URL 作為參數的一個
  • 客戶端會向該 URL 發送 username 和 password 參數(通過表單的格式發送)然後得到一個 token  
  • OAuth2PasswordBearer 不會創建相應的 URL 路徑操作,只是指明了客戶端用來獲取 token 的目標 URL

 

tokenUrl 是相對路徑

  • 如果 API 位於 //example.com/,那麼它將引用 //example.com/token
  • 如果API 位於 //example.com/api/v1/,那麼它將引用 //example.com/api/v1/token

 

oauth2_scheme

該變數是 OAuth2PasswordBearer 的一個實例,但它也是一個可調用對象,所以它可以用於依賴項

async def read_items(token: str = Depends(oauth2_scheme)):

 

OAuth2PasswordBearer 會做什麼

  • 客戶端發送請求的時候,FastAPI 會檢查請求的 Authorization 頭資訊,如果沒有找到 Authorization 頭資訊
  • 或者頭資訊的內容不是 Bearer token,它會返回 401 狀態碼( UNAUTHORIZED )

 

傳遞 token 的請求結果

目前因為沒有對 token 做驗證,所以 token 傳什麼值都可以驗證通過

 

看看 OAuth2PasswordBearer 的源碼

 

查看 Swagger API 文檔

多了個 Authorize 按鈕,點擊它

可以看到一個包含用戶名、密碼還有其他可選欄位的授權表單

 

上述程式碼的問題

還沒有獲取 token 的路徑操作

 

完善 OAuth2

#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
# author: 小菠蘿測試筆記
# blog:  //www.cnblogs.com/poloyy/
# time: 2021/10/6 12:05 下午
# file: 49_bearer.py
"""
from typing import Optional

import uvicorn
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

# 模擬資料庫
from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "[email protected]",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "[email protected]",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


# 模擬 hash 加密演算法
def fake_hase_password(password: str) -> str:
    return "fakehashed" + password


# 返回給客戶端的 User Model,不需要包含密碼
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


# 繼承 User,用於密碼驗證,所以要包含密碼
class UserInDB(User):
    hashed_password: str


# OAuth2 獲取 token 的請求路徑
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 1、獲取客戶端傳過來的用戶名、密碼
    username = form_data.username
    password = form_data.password
    # 2、模擬從資料庫中根據用戶名查找對應的用戶
    user_dict = fake_users_db.get(username)
    if not user_dict:
        # 3、若沒有找到用戶則返回錯誤碼
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用戶名或密碼不正確")

    # 4、找到用戶
    user = UserInDB(**user_dict)
    # 5、將傳進來的密碼模擬 hash 加密
    hashed_password = fake_hase_password(password)
    # 6、如果 hash 後的密碼和資料庫中存儲的密碼不相等,則返回錯誤碼
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用戶名或密碼不正確")

    # 7、用戶名、密碼驗證通過後,返回一個 JSON
    return {"access_token": user.username, "token_type": "bearer"}


# 模擬從資料庫中根據用戶名查找用戶
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


# 模擬驗證 token,驗證通過則返回對應的用戶資訊
def fake_decode_token(token):
    user = get_user(fake_users_db, token)
    return user


# 根據當前用戶的 token 獲取用戶,token 已失效則返回錯誤碼
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


# 判斷用戶是否活躍,活躍則返回,不活躍則返回錯誤碼
async def get_current_active_user(user: User = Depends(get_current_user)):
    if user.disabled:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid User")
    return user


# 獲取當前用戶資訊
@app.get("/user/me")
async def read_user(user: User = Depends(get_current_active_user)):
    return user


# 正常的請求
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}


if __name__ == '__main__':
    uvicorn.run(app="49_bearer:app", reload=True, host="127.0.0.1", port=8080)

 

/token 路徑操作函數的響應

# 7、用戶名、密碼驗證通過後,返回一個 JSON
return {"access_token": user.username, "token_type": "bearer"}
  • 獲取 token 的介面的響應必須是一個 JSON 對象(返回一個 dict 即可)
  • 它應該有一個 token_type,當使用 Bearer toklen 時,令牌類型應該是 bearer
  • 它應該有一個 access_token,一個包含訪問 token 的字元串
  • 對於上面簡單的例子,返回的 token 是用戶名,這是不安全,只是作為栗子好理解一點

 

返回 401 的HTTPException

# 根據當前用戶的 token 獲取用戶,token 已失效則返回錯誤碼
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user
  • 任何 HTTP(錯誤)狀態碼為 401 UNAUTHORIZED 都應該返回 WWW-Authenticate 的 Header
  • 在此處返回的帶有值 Bearer 的 WWW-Authenticate Header 也是 OAuth2 規範的一部分
  • 在 Beaer token 的情況下,該值應該是 Bearer
  • 當然,這並不是必須的,但建議符合規範

 

查看 Swagger API Authorize

驗證通過

 

請求 /user/me 的結果

請求頭帶上了 ‘Authorization: Bearer johndoe’

 

logout 後再次請求,查看結果

logout 之後,請求頭沒有 ‘Authorization: Bearer johndoe’ 所以驗證就失敗啦

 

驗證一個不活躍的用戶

authenticate 表單填入

  • username:alice
  • password:secret2

請求 /users/me

得到的響應

{
  "detail": "Inactive user"
}

 

存在的問題

目前的 token 和驗證方式並不安全,下一篇中將介紹 JWT token