sanic-jwt 的使用

Sanic 是基于 Python 的一个支持高并发的异步 web 框架,sanic-jwt 则是针对Sanic 开发的一个基于 PyJWT 封装的 JWT 授权认证模块。

 

sanic-jwt

 

安装

pip install sanic-jwt

 

实例

下面实例主要总结:

  • 常用参数的配置;
  • 修改发生异常时返回的响应数据;
  • 解析和修改payload;
  • 查找用户;

 

from sanic import Sanic, request, response
from sanic_jwt import initialize, Configuration, Responses, protected, exceptions, Authentication, inject_user


class User:

    def __init__(self, uid, username, sex, password, info, black_level=0):
        self.user_id = uid
        self.sex = sex
        self.username = username
        self.password = password
        self.personal_info = info  # 只能登录后个人可见的信息
        self.black_level = black_level  # 黑名单等级,默认0为正常用户

    def __repr__(self):
        return "User(id='{}')".format(self.user_id)

    def to_dict(self):

        return {
            "uid": self.user_id,  # 注意:此处 "uid" 要与 MyJWTConfig 中的 user_id 设置一致!
            "sex": self.sex,
            "username": self.username,
            "personal_info": self.personal_info
        }


# 模拟一个用户列表
users = [
    User(1, "user1", "", "123",  "这是仅 user1 可见信息", 1),
    User(2, "user2", "", "456",  "这是仅 user2 可见信息", 0)
]

username_table = {u.username: u for u in users}
userid_table = {u.user_id: u for u in users}


async def authenticate(req: request.Request):
    username = req.json.get("username", None)
    password = req.json.get("password", None)

    if not username or not password:
        raise exceptions.AuthenticationFailed("用户名或密码为空!")
    user = username_table.get(username, None)
    if user is None:
        raise exceptions.AuthenticationFailed("用户名或密码不正确!")
    if password != user.password:
        raise exceptions.AuthenticationFailed("用户名或密码不正确!")
    return user


class MyJWTConfig(Configuration):
    # -------------- url_prefix ---------------------
    # [描述] 获取授权的路由地址
    # [默认] '/auth'
    url_prefix = '/login'

    # -------------- secret -------------------------
    # [描述] 加密密码
    # [默认] 'This is a big secret. Shhhhh'
    # [建议] 该密码是 JWT 的安全核心所在,需要保密,尽量使用更长更复杂的密码
    secret = ',$FCyFZ^b16#m:ragM#d-!;4!U5zgZDF(EhswOL_HGV#xN1Ll%MaBU42AN=jXgp7'

    # -------------- expiration_delta ----------------------
    # [描述] 过期时间,单位为秒
    # [默认] 30 分钟,即:60 * 30
    # [建议] 该时间不宜过长,同时建议开启 refresh_token_enabled 以便自动更新 token
    expiration_delta = 60 * 60  # 改为 10 分钟过期

    # -------------- cookie_set ---------------------
    # [描述] 是否将获取到的 token 信息写入到 cookie
    # [默认] False,即不写入cookie
    # 只有该项为 True,其它 cookie 相关设置才会起效。
    # cookie_set = True

    # -------------- cookie_access_token_name ---------------
    # [描述] cookie 中存储 token 的名称。
    # [默认] 'access_token'
    # cookie_access_token_name = "token"

    #  -------------- cookie_access_token_name ---------------
    # [描述] 包含用户 id 的用户对象的键或属性,这里对应 User 类的用户唯一标识
    # [默认] 'user_id'
    user_id = "uid"

    claim_iat = True  # 显示签发时间,JWT的默认保留字段,在 sanic-jwt 中默认不显示该项


class MyJWTAuthentication(Authentication):

    # 从 payload 中解析用户信息,然后返回查找到的用户
    # args[0]: request
    # args[1]: payload
    async def retrieve_user(self, *args, **kwargs):
        user_id_attribute = self.config.user_id()
        if not args or len(args) < 2 or user_id_attribute not in args[1]:
            return {}
        user_id = dict(args[1]).get(user_id_attribute)
        # TODO: 根据项目实际情况进行修改
        user = userid_table.get(user_id)
        return user

    # 拓展 payload
    async def extend_payload(self, payload, *args, **kwargs):
        # 可以获取 User 中的一些属性添加到 payload 中
        # 注意:payload 信息是公开的,这里不要添加敏感信息
        user_id_attribute = self.config.user_id()
        user_id = payload.get(user_id_attribute)
        # TODO: 根据项目实际情况进行修改
        user: User = userid_table.get(user_id)
        payload.update({'sex': user.sex})  # 比如添加性别属性
        return payload

    async def extract_payload(self, req, verify=True, *args, **kwargs):
        return await super().extract_payload(req, verify)


class MyJWTResponse(Responses):

    # 自定义发生异常的返回数据
    @staticmethod
    def exception_response(req: request.Request, exception: exceptions):
        # sanic-jwt.exceptions 下面定义的异常类型:
        # AuthenticationFailed
        # MissingAuthorizationHeader
        # MissingAuthorizationCookie
        # InvalidAuthorizationHeader
        # MissingRegisteredClaim
        # Unauthorized
        msg = str(exception)
        if exception.status_code == 500:
            msg = str(exception)
        elif isinstance(exception, exceptions.AuthenticationFailed):
            msg = str(exception)
        else:
            if "expired" in msg:
                msg = "授权已失效,请重新登录!"
            else:
                msg = "未授权,请先登录!"
        result = {
            "status": exception.status_code,
            "data": None,
            "msg": msg
        }
        return response.json(result, status=exception.status_code)


app = Sanic("my_auth_app")
initialize(app, authenticate=authenticate,
           authentication_class=MyJWTAuthentication, configuration_class=MyJWTConfig, responses_class=MyJWTResponse)


@app.route("/index")
@protected()  # 保护该路由,只有授权用户才能访问
async def protected_route_index(req: request.Request):
    # 从 request 中获取 payload,然后返回给前端
    payload = await req.app.auth.extract_payload(req)
    return response.json({'payloadInfo': payload})


@app.route("/info")
@inject_user()  # 注入用户信息
@protected()    # 保护该路由,只有授权用户才能访问
async def protected_route_info(req: request.Request, user: User):
    if user.black_level == 0:
        return response.json({'userName': user.username, "personalInfo": user.personal_info})
    else:  # 进入黑名单等级之后限制查看
        return response.json({'userName': user.username, "personalInfo": ""})


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080, auto_reload=True)

 

Configuration 参数

 下面列出的参数可以根据需要在前面的 MyJWTConfig 这个类下进行添加设置。

参数 描述 默认 备注
access_token_name
标识访问令牌的key ‘access_token’
algorithm
生成标记的哈希算法 ‘HS256’
可选项:
HS256, HS384, HS512,
ES256, ES384, ES512, RS256,
RS384, RS512, PS256, PS384, PS512
auth_mode
是否启用 /auth 接口 True
authorization_header
HTTP请求 header 中令牌key ‘authorization’
authorization_header_prefix
HTTP请求header中JWT的前缀 ‘Bearer’
authorization_header_refresh_prefix
保留字,未使用 ‘Refresh’
claim_aud
面向的用户 None
claim_iat
是否启用生成令牌签发时间 False
claim_iss
令牌签发者 None
claim_nbf
是否 启用生成令牌在签发后多久生效 功能 False
claim_nbf_delta
令牌在签发后多久生效 60 * 3,即:3 分钟
cookie_access_token_name
使用cookie令牌时,cookie中令牌的名称 ‘access_token’
cookie_domain
cookie所在的域
cookie_httponly
是否启用 http only cookie True
cookie_refresh_token_name
使用cookie令牌,cookie中刷新令牌的名称 ‘refresh_token’
cookie_set
启用cookie令牌 False
cookie_strict
启用cookie令牌,cookie获取失败后是否禁用头部令牌 False
cookie_token_name
cookie_access_token_name 的别名,用于测试 False 实测该值无效,应使用 cookie_access_token_name
do_protection
启用@protected装饰器正常工作 True
expiration_delta
令牌有效期 60 * 5 * 6(30分钟)
generate_refresh_token
创建和返回刷新令牌的方法 sanic_jwt.utils.generate_refresh_token
leeway
系统时间配置中微小更改的回旋时间秒 60 * 3(3分钟)
path_to_authenticate
身份验证接口路径 ‘/’
path_to_refresh
刷新令牌接口路径 ‘/refresh’
path_to_retrieve_user
当前用户接口路径 ‘/me’
path_to_verify
令牌验证接口 ‘/verify’
private_key
用于生成令牌的私钥,依赖于使用的散列算法 None
public_key
secret的别名
query_string_access_token_name
查询字符串令牌,cookie中令牌的名称 ‘access_token’
query_string_refresh_token_name
查询字符串令牌,cookie中刷新令牌的名称 ‘refresh_token’
query_string_set
开启查询字符串令牌 False
query_string_strict
开启查询字符串令牌,查询字符串不存在是否禁用头部令牌 False
refresh_token_enabled
启用刷新令牌 False 如果开启,就需要存储refresh_token,
所以需要额外代码实现。
refresh_token_name
刷新令牌的key ‘refresh_token’
scopes_enabled
启用scope块并将作用域添加到jwt payload False
scopes_name
jwt payload中scope的key ‘scopes’
secret
用于哈希算法生成和签名JWT,每个应用应该设置自已的值 ‘This is a big secret. Shhhhh’
strict_slashes
启用对接口url执行严格的/匹配 False
url_prefix
sanic jwt默认接口的前缀 ‘/auth’
user_id
包含用户 id 的用户对象的键或属性 ‘user_id’
verify_exp 开启令牌过期验证 True

 

 

参考:

sanic-jwt

//blog.csdn.net/zhouping118/article/details/88736986