Django(64)頻率認證源碼分析與自定義頻率認證
前言
有時候我們發送手機驗證碼,會發現1分鐘只能發送1次,這是做了頻率限制,限制的時間次數,都由開發者自己決定
頻率認證源碼分析
def check_throttles(self, request):
"""
檢查是否應限制請求。如果請求受到限制,則引發適當的異常。
"""
throttle_durations = []
# 1.遍歷配置的頻率認證類,初始化一個個頻率認證類對象(會調用頻率認證類的__init__()方法)
# 2.頻率認證類對象調用allow_request()方法,頻率是否限次(沒有限次可訪問,限次不可訪問)
# 3.頻率認證類限次後,調用wait方法,獲取還需多長時間可以進行下一次訪問
for throttle in self.get_throttles():
if not throttle.allow_request(request, self):
throttle_durations.append(throttle.wait())
if throttle_durations:
# Filter out `None` values which may happen in case of config / rate
# changes, see #1438
durations = [
duration for duration in throttle_durations
if duration is not None
]
duration = max(durations, default=None)
self.throttled(request, duration)
get_throttles()
我們首先來查看get_throttles()源碼
def get_throttles(self):
"""
實例化並返回此視圖使用的節流閥列表。
"""
return [throttle() for throttle in self.throttle_classes]
然後點擊throttle_classes,跳轉到APIView後查看源碼
throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
接著我們去settings.py文件中查看,發現'DEFAULT_THROTTLE_CLASSES': [],默認是一個空列表,那麼我們就知道了for throttle in self.get_throttles()其實是去遍歷列表中配置的頻率認證,至於列表中需要填寫什麼,我們後續再看
allow_request
接下來我們查看allow_request方法,它是drf中的throtting.py文件中BaseThrottle類中的方法,我們查看下BaseThrottle源碼
class BaseThrottle:
"""
Rate throttling of requests.
"""
def allow_request(self, request, view):
"""
如果應該允許請求,則返回 `True`,否則返回 `False`。
"""
raise NotImplementedError('.allow_request() must be overridden')
def get_ident(self, request):
"""
Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
if present and number of proxies is > 0. If not use all of
HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
"""
xff = request.META.get('HTTP_X_FORWARDED_FOR')
remote_addr = request.META.get('REMOTE_ADDR')
num_proxies = api_settings.NUM_PROXIES
if num_proxies is not None:
if num_proxies == 0 or xff is None:
return remote_addr
addrs = xff.split(',')
client_addr = addrs[-min(num_proxies, len(addrs))]
return client_addr.strip()
return ''.join(xff.split()) if xff else remote_addr
def wait(self):
"""
返回推薦的在下一個請求之前等待的秒數
"""
return None
可以看到BaseThrottle類下有3個方法
allow_request:如果需要繼承該類,必須重寫此方法get_ident:獲取身份wait:返回等待的秒數
SimpleRateThrottle
而throtting中有個SimpleRateThrottle繼承自BaseThrottle,我們大多數情況下都會自定義SimpleRateThrottle類,讓我們查看下源碼,看他幹了哪些事情
class SimpleRateThrottle(BaseThrottle):
"""
一個簡單的快取實現,只需要提供get_cache_key方法即可
速率(requests / seconds)由 View 類上的 `rate` 屬性設置。該屬性是「number_of_requests/period」形式的字元串。
period應該是以下之一:('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')
用於限制的先前請求資訊存儲在快取中
"""
cache = default_cache
timer = time.time
cache_format = 'throttle_%(scope)s_%(ident)s'
scope = None
THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.get_rate()
self.num_requests, self.duration = self.parse_rate(self.rate)
def get_cache_key(self, request, view):
def get_rate(self):
def parse_rate(self, rate):
def allow_request(self, request, view):
def throttle_success(self):
def throttle_failure(self):
def wait(self):
我們可以看到SimpleRateThrottle有5個屬性
cache:默認的django中的快取timer:當前時間cache_format:快取的格式throttle_%(scope)s_%(ident)sscope:範圍THROTTLE_RATES:默認的頻率
除了屬性,還有8個方法,我們依次查看源碼
init
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.get_rate()
self.num_requests, self.duration = self.parse_rate(self.rate)
程式碼講解:如果沒有獲取到rate屬性,那麼rate屬性就從get_rate()方法中獲取,拿到後,從parse_rate方法中解析出一個元組,包含2個元素num_requests和duration
num_request:請求次數duration:持續時間
get_rate
既然上面用到了此方法,我們就來看看
def get_rate(self):
"""
確定允許的請求速率用字元串表示形式。
"""
if not getattr(self, 'scope', None):
msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
self.__class__.__name__)
raise ImproperlyConfigured(msg)
try:
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
程式碼講解:如果沒有獲取到scope屬性,會拋出異常資訊,如果有scope就從THROTTLE_RATES[self.scope]中返回它,THROTTLE_RATES默認值如下:
'DEFAULT_THROTTLE_RATES': {
'user': None,
'anon': None,
},
所以get_rate方法返回的是THROTTLE_RATES中key為scope所對應的值,scope屬性我們可以自定義的時候隨意設置,如果我們自定義scope為user,那麼get_rate方法返回的就是None,所以self.rate也就為None
parse_rate
獲取到rate,用此方法解析
def parse_rate(self, rate):
"""
提供請求速率字元串,返回一個二元組
允許請求的次數, 以秒為單位的時間段
"""
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration)
程式碼講解:如果rate為None,那麼就返回(None, None),如果不為None,會把rate以/分割,從這裡我們就知道了rate的字元串的形式就是num/period,比如3/min,最終會把他分割,然後返回一個元組
- num_requests:請求的次數
- duration:取
period中的下標為0的,然後從字典中取出對應的key的值,比如min,第一個開頭字母為m,最後從字典中取m的值,就是60
所以示例3/min代表的就是1分鐘可以訪問3次
get_cache_key
def get_cache_key(self, request, view):
"""
應該返回可用於限制的唯一cache-key。必須被覆蓋。
如果不限制請求,則可能返回「None」。
"""
raise NotImplementedError('.get_cache_key() must be overridden')
這個方法很簡單,就是獲取唯一的快取key,如果請求不做限制,則返回None
allow_request
由於父類BaseThrottle的allow_request方法沒有實現具體的邏輯,所以SimpleRateThrottle中實現了具體的細節
def allow_request(self, request, view):
"""
如果請求應該被節流,那麼實行檢查以便查看
成功時調用`throttle_success`.
失敗時調用`throttle_failure`.
"""
if self.rate is None:
return True
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
self.history = self.cache.get(self.key, [])
self.now = self.timer()
# 從歷史記錄中刪除現在已經超過節流持續時間的任何請求
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.throttle_success()
程式碼講解:如果rate為None就返回True,代表允許請求,如果key也為None則返回True,代表允許請求,如果rate和key都有值,history就從快取中獲取key所對應的列表,now代表當前時間。如果history有值,並且列表history的最後一個元素≤當前時間-持續時間,那麼history列表就會刪除這個元素,如果列表長度≥請求次數,就會調用throttle_failure,如果列表長度<請求次數,則調用throttle_success。
舉例:如果self.now假設為晚上20:00,duration和num_requests就用之前3/min的示例,duration表示60s,num_requests表示3次,那麼self.now-self.duration就代表19:59分,如果history列表中的最後一個元素的時間值≤19:59,那麼就刪除它,我們的需求是3/min一分鐘只能訪問3次,而你超過了1分鐘,就沒必要限制了,所以將時間從history刪除,如果history列表長度≥3,一開始是空列表的時候不滿足條件,會返回throttle_success,第二次訪問列表長度會增加到1,但還是不滿足條件,會繼續調用throttle_success,第三次訪問列表長度為2,仍然不滿足會繼續調用throttle_success,第四次訪問滿足條件,就會調用throttle_failure,代表不能再請求了
throttle_success
def throttle_success(self):
"""
將當前請求的時間戳與鍵一起插入快取中。
"""
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration)
return True
程式碼詳解:將當前時間插入到history列表的頭部,給快取設置key的值為當前時間,超時時間為duration,最後返回True,代表可以訪問
throttle_failure
def throttle_failure(self):
"""
當對 API 的請求由於節流而失敗時調用。
"""
return False
返回False,代表請求節流失敗,不允許訪問
wait
def wait(self):
"""
以秒為單位返回推薦的下一個請求時間。
"""
if self.history:
remaining_duration = self.duration - (self.now - self.history[-1])
else:
remaining_duration = self.duration
available_requests = self.num_requests - len(self.history) + 1
if available_requests <= 0:
return None
return remaining_duration / float(available_requests)
程式碼解析:如果history列表存在,remaining_duration剩餘時間就等於持續時間減去(當期時間-列表最後一個元素的時間),如果self.now為晚上20:00,history的最後一個元素值為19:59:30,而持續時間duration設置為60s,那麼remaining_duration就代表還剩30s就可以進行訪問了,而available_requests可用請求等於(設置好的請求次數-history列表+1)
自定義頻率認證
- 自定義一個繼承
SimpleRateThrottle類的頻率類 - 設置一個
scope類屬性,屬性值為任意見名知意的字元串 - 在
settings配置文件中,配置drf的DEFAULT_THROTTLE_RATES,格式為{scope對應的字元串值:’次數/時間’} - 在自定義頻率類中重寫
get_cache_key方法
限制的對象返回與限制資訊有關的字元串
不限制的對象返回None
需求:用戶訪問簡訊驗證碼1分鐘只能發送1次驗證碼
我們創建一個throttles.py文件,然後定義SMSRateThrottle類,程式碼如下:
from rest_framework.throttling import SimpleRateThrottle
class SMSRateThrottle(SimpleRateThrottle):
scope = "sms"
def get_cache_key(self, request, view):
phone = request.query_params.get('phone') or request.data.get('phone')
# 沒有手機號,就不做頻率限制
if not phone:
return None
# 返回可以根據手機號動態變化,且不易重複的字元串,作為操作快取的key
return f"throttle_{self.scope}_{phone}"
在settings.py文件中配置DEFAULT_THROTTLE_RATES,程式碼如下:
'DEFAULT_THROTTLE_RATES': {
'sms': '1/min'
},
最後再視圖函數中,局部配置自定義認證類
class TestView(APIView):
throttle_classes = [SMSRateThrottle]
def get(self, request, *args, **kwargs):
return APIResponse(data_msg="get 獲取驗證碼")
def post(self, request, *args, **kwargs):
return APIResponse(data_msg="post 獲取驗證碼")
具體測試細節過程就不再描述了,這裡只講述結果,當我們使用get或者post請求時,攜帶請求參數phone第一次發送請求,請求成功,第二次就會出現以下提示
{
"detail": "請求超過了限速。 Expected available in 58 seconds."
}
58 seconds代表還剩58秒可以再次訪問,至於58s是怎麼算出來的,就是SimpleRateThrottle類中的wait方法實現的

