Flask中请求数据的优雅传递

当一个请求到来时,浏览器会携带很多信息发送发送服务端。在Django中,每一个处理函数都要传入一个request的参数,该参数携带所有请求的信息,也就是服务端程序封装的environ(不明白该参数可以参见上一篇flask初探之WSGI)。简单示例如下

from django.shortcuts import render

def index(request):
    context = {}
    return render(request, "index.html", context)

每一个请求携带的数据都可以从request传入到处理函数中,这种处理方法可以称之为显示传递。
接收请求数据在Flask中有一种更巧妙的实现:当有请求到来时request就会变成一个全局变量,所有的处理函数可以直接使用request这个全局变量,而不需要显示传入参数。简单示例如下:

import time
from flask import Flask, request

app = Flask(__name__)

@app.route('/')
def hello_world():
    return 'Hello '+request.args.get("name")

这种设计减少了每个函数需要传入的参数,比起Django的显示传参更加优雅。
但是这种全局变量也会自己的问题,多线程的情况下同一时间能够处理多个请求,每个处理函数都需要自己的请求信息,如何保证处理函数和请求一一对应呢?Flask主要使用本地线程技术来保证请求信息和处理函数相互的对应。下面主要介绍本地线程技术。

本地线程

在多线程编程中,全局变量不可避免的会竞争,通常使用加锁来解决竞争。此外有一种本地线程技术可以让每一个线程都拥有自己的私有的变量。比如全局变量a,使用本地线程技术可以让每一个线程对a处理时都是互相隔离的,彼此之间不影响。下面从局部变量、全局变量和本地线程三个例子对比说明本地线程技术。

局部变量
开启多线程,每个子线程完成不同的计算任务,x是线程中的局部变量。
每个子线程都有独立的空间。每次压栈,局部变量x的作用域地址是不同的(线程独享),计算结果互不干扰。

import time
import threading

 
def worker():
    x = 0
    for i in range(100):
        time.sleep(0.0001)
        x += 1
    print(threading.current_thread(),x)
 
for i in range(10):
    threading.Thread(target=worker).start()

运行结果:

<Thread(Thread-2, started 123145372971008)> 100
<Thread(Thread-6, started 123145393991680)> 100
<Thread(Thread-1, started 123145367715840)> 100
<Thread(Thread-3, started 123145378226176)> 100
<Thread(Thread-5, started 123145388736512)> 100
<Thread(Thread-7, started 123145399246848)> 100
<Thread(Thread-4, started 123145383481344)> 100
<Thread(Thread-10, started 123145415012352)> 100
<Thread(Thread-8, started 123145404502016)> 100
<Thread(Thread-9, started 123145409757184)> 100

全局变量
当多线程使用全局变量时就会发生抢占和竞争

import threading
import time
 
x = 0
def worker():
    global x
    x = 0
    for i in range(100):
        time.sleep(0.0001)
        x += 1
    print(threading.current_thread(),x)
 
for i in range(10):
    threading.Thread(target=worker).start()

运行结果:

<Thread(Thread-2, started 123145483571200)> 888
<Thread(Thread-5, started 123145499336704)> 908
<Thread(Thread-3, started 123145488826368)> 930
<Thread(Thread-4, started 123145494081536)> 937
<Thread(Thread-1, started 123145478316032)> 941
<Thread(Thread-6, started 123145504591872)> 947
<Thread(Thread-7, started 123145509847040)> 949
<Thread(Thread-8, started 123145515102208)> 955
<Thread(Thread-9, started 123145520357376)> 962
<Thread(Thread-10, started 123145525612544)> 964

希望的结果是100,最后却远大于100。原因在于第一个线程将全局变量+1之后,第二个线程在这个基础上继续+1,第三个线程在继续对x+1,每个线程都对全局变量+1,最终结果就不符合预期。

本地线程
本地线程可以避免上面全局变量竞争问题。标准库threading中就自带本地线程对象。

import time
import threading

a = threading.local() # 全局对象
 
def worker():
    a.x = 0
    for i in range(100):
        time.sleep(0.0001)
        a.x += 1
    print(threading.current_thread(),a.x)
 
for i in range(10):
    threading.Thread(target=worker).start()

运行结果:

<Thread(Thread-4, started 123145570172928)> 100
<Thread(Thread-6, started 123145580683264)> 100
<Thread(Thread-1, started 123145554407424)> 100
<Thread(Thread-2, started 123145559662592)> 100
<Thread(Thread-8, started 123145591193600)> 100
<Thread(Thread-5, started 123145575428096)> 100
<Thread(Thread-3, started 123145564917760)> 100
<Thread(Thread-7, started 123145585938432)> 100
<Thread(Thread-10, started 123145601703936)> 100
<Thread(Thread-9, started 123145596448768)> 100

本质上本地线程对象就是一个字典的子类,为每一个线程创建一个键值对,key是线程id,value是值。当某一个线程操作变量时就是操作自己的id对象的值。
如上例中本地线程是a,可将其看做一个字典a = {“线程id”: x}。线程1中a={“123145570172928”:44},线程2中a={“123145559662592”: 55}。所以各个线程之间虽然引用了同名变量,但实际上是互相不干扰的。

LocalStack

本地栈和本地线程类似的功能,本地线程常用来处理数字或字符串等简单数据结构,维护了{“线程id”:值}这样一个关系。本地栈是一个可以当做栈来使用的结构,本质上也是一个字典,结构为{“线程id”:{“stack”:[]}。这个数据结构的主要是能够使用压栈和出栈等操作,方便先进后出的场景。
简单使用

import time
from werkzeug.local import LocalStack

local_stack = LocalStack()
local_stack.push("abc")
local_stack.push("xyz")

# 获取栈顶元素,不弹出元素
print(local_stack.top)

# 弹出栈顶元素,出栈
print(local_stack.pop())

# 再次获取栈顶,栈顶元素已变化
print(local_stack.top)

运行结果:

xyz
xyz
abc

线程互不干扰

import threading
from werkzeug.local import LocalStack

def worker(local_stack):
    print(local_stack.top) # 主线程中压栈了数据,但是在子线线程中取不到,线程互相隔离。

if __name__ == "__main__":
    local_stack = LocalStack()
    local_stack.push("主线程")
    
    threading.Thread(target=worker, args=(local_stack,)).start()
    print(local_stack.top)

运行结果:
None
主线程

request的线程隔离实现

通过本地线程技术,request虽然是全局变量,但是在每一个线程中都是互相隔离的。
但需要说明的是Flask中并不是使用标准线程库的本地线程对象,因为还需要兼容协程,所以flask使用了werkzeug中的本地线程对象werkzeug.local.Local()。werkzeug的本地线程对象增加了对Greenlet的优先支持。
werkzeug中本地线程的实现

# since each thread has its own greenlet we can just use those as identifiers
# for the context.  If greenlets are not available we fall back to the
# current thread ident depending on where it is.
try:
    from greenlet import getcurrent as get_ident
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
        from _thread import get_ident
        
        
class Local(object):
    __slots__ = ("__storage__", "__ident_func__")

    def __init__(self):
        object.__setattr__(self, "__storage__", {})
        object.__setattr__(self, "__ident_func__", get_ident)

    def __iter__(self):
        return iter(self.__storage__.items())

    def __call__(self, proxy):
        """Create a proxy for a name."""
        return LocalProxy(self, proxy)

    def __release_local__(self):
        self.__storage__.pop(self.__ident_func__(), None)

    def __getattr__(self, name):
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, value):
        ident = self.__ident_func__()
        storage = self.__storage__
        try:
            storage[ident][name] = value
        except KeyError:
            storage[ident] = {name: value}

    def __delattr__(self, name):
        try:
            del self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

从import可以看出,首先是从协程导入,如果报错再从线程导入。在__setattr__函数添加变量时,首先是通过get_ident方法获取了线程id,然后将线程id作为key,value又是一个字典{name:value}。类似于{“线程id”:{“name”: “value”}}。