Python非同步Web編程

  • 2019 年 12 月 5 日
  • 筆記

非同步編程適用於那些頻繁讀寫文件和頻繁與伺服器交互數據的任務,非同步程式以非阻塞的方式執行I/O操作。這樣意味著程式可以在等待客戶機返回數據的同時執行其他任務,而不是無所事事的等待,浪費資源和時間。

Python和其他許多編程一樣,默認不具備非同步特性。所幸的是,IT行業的快速發展,技術的不斷更新,是我們可以編寫非同步程式。近年來,對速度的要求越來越高甚至超過了硬體能力。為此,世界各地的組織聯合起來發表了《反應式宣言

非同步程式的非阻塞模式可以在Web應用程式的上下文中發揮顯著的性能優勢,有助於解決開發響應式應用程式中的問題。

Python3中加入了一些用於開發非同步應用程式的強大模組,本文中將介紹一些工具,特別是與web開發相關的工具。

本文將試著開發一個基於 aiohttp 的簡單反應式應用程式,根據用戶給定的地理坐標,顯示當前太陽系行星的天球坐標。

Python中的非同步

對於一些熟悉編寫傳統python程式碼的人來說,轉換到非同步程式可能有些不好接受。Python中的非同步程式依賴於 Coroutines(協程) ,它與event loop(事件循環)一同工作,寫出的程式碼像是執行多個小任務的片段。 協程可以看作是在程式碼中有一些帶點函數,這些帶點函數又是控制程式回調中的上下文,除了通過上下文交換數據,這些「yield」點還可以暫停和恢復協程執行。

事件循環決定了可以在任何指定時刻運行程式碼塊—它負責協程之間的暫停、恢復和通訊。 這意味著不同協程的最終可能以不同於它們之前被安排的順序執行。 這種不按固定順序運行不同程式碼塊的想法稱為非同步。

可以在 HTTP 請求的場景中闡述非同步的重要性。設想要向伺服器發大量的請求。比如,要查詢一個網站,以獲得指定賽季所有運動員的統計資訊。

我們可以按順序依次發出每個請求。然而,對於每個請求,可以想像到可能會花一些時間等待上一個請求被發送到伺服器,且收到伺服器響應。

但是有時,這些無用的花銷甚至可能需要幾秒鐘。因為程式可能會遇到網路延遲,訪問數量過多,又或者是對方伺服器的速度限制等問題。

如果我們的程式碼可以在等待伺服器響應的同時做其他事情呢?而且,如果它只在響應數據到達後才處理返回數據呢?如果我們不必等到每個單獨的請求都完成之後才繼續處理列表中的下一個請求,那麼我們可以快速地連續發出許多請求。

具有event loop的協程就可以讓我們的程式碼支援以這樣的形式運行。

asyncio

asyncio是Python3.4版本引入的標準庫,直接內置了對非同步IO的支援。使用 asyncio 我們可以通過協程來完成某些任務,創建的協程(使用 asyncio 的語法 asyncio.Task 對象)只有在所有組成協程的任務完成執行後完成。

和其他非同步程式語言不同,Python並不強制開發者使用語言自帶的事件循環。正如在Python 3.5中async/await是如何工作的指出的,Python協程構建的一個非同步API允許我們使用任何事件循環。有些項目實現了完全不同的事件循環,比如curio,或者允許為 asyncio 引入其他的事件循環策略(事件循環策略指是「在幕後」管理事件循環),比如uvloop。

使用 asyncio 並行運行兩個協程的程式碼片段,每個協程在一秒鐘後列印一條消息:

# test.py  import asyncio    async def wait_around(n, name):      for i in range(n):          print(f"{name}: iteration {i}")          await asyncio.sleep(1)    async def main():      await asyncio.gather(*[wait_around(2, "coroutine 0"), wait_around(5, "coroutine 1")])    loop = asyncio.get_event_loop()  loop.run_until_complete(main())
jhaosMBP:Cenarius jhao$ time python test.py  coroutine 0: iteration 0  coroutine 1: iteration 0  coroutine 0: iteration 1  coroutine 1: iteration 1  coroutine 1: iteration 2  coroutine 1: iteration 3  coroutine 1: iteration 4    real    0m5.264s  user    0m0.078s  sys    0m0.043s

這段程式碼以非同步方式大約5秒執行完畢。事件循環在遇到 asyncio.sleep 協程點時,會跳到其他程式碼繼續執行。使用 asyncio.gather 告訴事件循環要調度兩個 wait_around 實例。

asyncio.gather 接收一組「awaitables」(即協程或 asyncio.Task對象),然後返回單個 asyncio.Task對像。其只在所有組成的 tasks/coroutines 完成時才完成。最後兩行是 asyncio 的標準用法,用於運行指定的協程程式,直到執行完畢。

協程和函數不同,不會在調用後立即開始執行。await 關鍵字是用來告訴事件循環調度執行協同程式。

如果去掉 asyncio.sleep 前面的 await。程式幾乎會立即完成,因為沒有告訴事件循環要執行這個協程,在本例中,使用 await 調用協程使之休眠一段時間。

在了解了Python基本的非同步程式碼之後,下面繼續討論web開發上的非同步。

安裝aiohttp

aiohttp 是用於處理非同步 HTTP 請求的三方庫。此外,它還提供了用於web服務的組件。可以通過 pip 安裝 aiohttp,它要求Python版本大於3.5.3。

pip install aiohttp

客戶端:發送請求

下面的示例演示了如何使用 aiohttp 下載「baidu.com」網站的HTML內容:

import asyncio  import aiohttp      async def make_request():      url = "https://www.baidu.com"      print(f"making request to {url}")      async with aiohttp.ClientSession() as session:          async with session.get(url) as resp:              if resp.status == 200:                  print(await resp.text())    loop = asyncio.get_event_loop()  loop.run_until_complete(make_request())
  • 有幾點需要強調:
  • 和前面的 await asyncio.sleep 一樣,要獲取HTML頁面的內容,必須在 resp.text() 前面使用 await 。否則程式列印出來的內容會是這樣:
making request to https://www.baidu.com  <coroutine object ClientResponse.text at 0x109b8ddb0>
  • async with 是一個上下文管理器,它接收的是協程而不是函數。在這裡的兩處使用,是用於在內部自動關閉到伺服器的連接釋放資源。
  • aiohttp.ClientSession 具有和 HTTP 方法相同的方法,session.get 發送 GET 請求,session.post 發送 POST 請求。

這個例子本身並不比同步HTTP請求有多大性能優勢。aiohttp 客戶端真正優勢在於多個請求並發:

import asyncio  import aiohttp      async def make_request(session, req_n):      url = "https://www.baidu.com"      print(f"making request to {req_n} to {url}")      async with session.get(url) as resp:          if resp.status == 200:              print(await resp.text())      async def main():      n_request = 100      async with aiohttp.ClientSession() as session:          await asyncio.gather(*[make_request(session, i) for i in range(n_request)])      loop = asyncio.get_event_loop()  loop.run_until_complete(main())

上面程式碼不是一個一個地發出請求,而是利用 asyncioasycio.gather 實現並發。

Web應用"行星定位"

下面將從頭開始,開發一個web應用程式,報告用戶所在位置上天空中行星的坐標(天象)。

我們使用Geolocation API來獲取用戶的當前位置。

PyEphem天象計算

一個天體的天象是指在地球上指定地點和時間觀察到在天空中的位置。PyEphem是一個計算精度很高的天文歷算Python庫。使用 pip 安裝:

pip install ephem

使用 Observer 類計算格林威治某個時間點火星的天象(天體的高度和方位):

import ephem  import math      greenwich = ephem.Observer()  greenwich.lat = "51.4769"  greenwich.lon = "-0.0005"  greenwich.date = "2018/9/22 22:30:00"    mars = ephem.Mars()  mars.compute(greenwich)  az_deg, alt_deg = mars.az, mars.alt*convert    convert = math.pi / 180.  print(f"Mars當前的方位和高度為: {az_deg:.2f} {alt_deg:.2f}")

為了更加方便地獲取行星的天象,我們寫一個 PlanetTracker 類,帶有一個返回指定行星當前高度和方位的方法。單位是度(PyEphem內部默認使用弧度而不是角度來表示度數值)

# planet_tracker.py  import math  import ephem      class PlanetTracker(ephem.Observer):        def __init__(self):          super(PlanetTracker, self).__init__()          self.planets = {              "mercury": ephem.Mercury(),              "venus": ephem.Venus(),              "mars": ephem.Mars(),              "jupiter": ephem.Jupiter(),              "saturn": ephem.Saturn(),              "uranus": ephem.Uranus(),              "neptune": ephem.Neptune()          }        def calc_planet(self, planet_name, when=None):          convert = 180. / math.pi          if when is None:              when = ephem.now()            self.date = when          if planet_name in self.planets:              planet = self.planets[planet_name]              planet.compute(self)              return {                  "az": float(planet.az) * convert,                  "alt": float(planet.alt) * convert,                  "name": planet_name              }          else:              raise KeyError(f"Couldn't find {planet_name} in planets dict")

這樣就可以很方便地得到太陽系中其他七顆行星在任意地點的位置:

tracker = PlanetTracker()  tracker.lat = "51.4769"  tracker.lon = "-0.0005"  print(tracker.calc_planet("mars"))

輸出:

{'az': 26.646611886328866, 'alt': -35.063254217502354, 'name': 'mars'}

aiohttp服務端

現在指定一組緯度和經度,我們可以得到行星當前的高度和方位。接下來,建立一個aiohttp服務,接收客戶端發送的用戶位置,返回其行星天象。

# aiohttp_app.py  from aiohttp import web    from planet_tracker import PlanetTracker    routes = web.RouteTableDef()      @routes.get("/planets/{name}")  async def get_planet_ephmeris(request):      planet_name = request.match_info['name']      data = request.query      try:          geo_location_data = {              "lon": str(data["lon"]),              "lat": str(data["lat"]),              "elevation": float(data["elevation"])          }      except KeyError as err:          # 預設 格林威治 Observatory          geo_location_data = {              "lon": "-0.0005",              "lat": "51.4769",              "elevation": 0.0,          }      print(f"get_planet_ephmeris: {planet_name}, {geo_location_data}")      tracker = PlanetTracker()      tracker.lon = geo_location_data["lon"]      tracker.lat = geo_location_data["lat"]      tracker.elevation = geo_location_data["elevation"]      planet_data = tracker.calc_planet(planet_name)      return web.json_response(planet_data)      app = web.Application()  app.add_routes(routes)    web.run_app(app, host="localhost", port=8000)

這裡,給 get_planet_ephmeris 加上 route.get 裝飾器以監聽處理 GET 請求。

直接運行此py文件啟動應用:

python aiohttp_app.py

成功啟動後,在瀏覽器中訪問 http://localhost:8000/planets/mars ,可以看到類似如下的響應內容:

{"az": 98.72414165963292, "alt": -18.720718647020792, "name": "mars"}

你也可以使用 curl 命令進行測試:

me@local:~$ curl localhost:8000/planets/mars  {"az": 98.72414165963292, "alt": -18.720718647020792, "name": "mars"}

它響應給我們英國格林威治天文台的火星高度和方位。

也可以通過url參數傳入經緯度位置,來獲取其他地方的火星天象(注意使用引號將URL括起):

me@local:~$ curl "localhost:8000/planets/mars?lon=145.051&lat=-39.754&elevation=0"  {"az": 102.30273048280189, "alt": 11.690380174890928, "name": "mars"

這個還沒有結束,web.run_app 函數是以阻塞的方式運行應用程式。這顯然不是我們想要的方式!

要想以非同步的形式運行起來,需要修改一點程式碼:

# aiohttp_app.py  import asyncio  ...    # web.run_app(app)    async def start_app():      _runner = web.AppRunner(app)      await _runner.setup()      _site = web.TCPSite(          _runner, "localhost", 8080      )      await _site.start()      print(f"Serving up app on localhost:8080")      return _runner, _site    loop = asyncio.get_event_loop()  runner, site = loop.run_until_complete(start_app())  try:      loop.run_forever()  except KeyboardInterrupt as err:      loop.run_until_complete(runner.cleanup())

注意這裡使用的是 loop.run_forever,而不是前面的 loop.run_until_complete。因為這裡並不是為了執行一定數量的協程,而是希望我們的服務掛起處理請求,直到使用 ctrl+c 退出,這時才優雅地關閉伺服器。

前端 HTML/JavaScript

aiohttp 支援載入HTML和JavaScript文件。但是並不鼓勵使用 aiohttp 服務載入CSS和JavaScript等"靜態"資源,但是我們這裡只是做一個演示程式。

接下來 aiohttp_app.py 添加幾行程式碼。載入JavaScript文件的HTML文件:

# aiohttp_app.py  ...  @routes.get('/')  async def hello(request):      return web.FileResponse("./index.html")      app = web.Application()  app.add_routes(routes)  app.router.add_static("/", "./")  ...

hello 協程監聽 localhost:8000/ 上的GET 請求,返回 index.html。該文件位於運行服務的同目錄下。項目目錄結構:

--aiphttp_api.py  --app.js  --index.html  --planet_tracker.py

app.router.add_static 這一行行聲明在 localhost:8000/ 上設置了一個路由,用於在運行伺服器的同目錄中載入靜態文件。這樣瀏覽器將才能夠找到在 index.html 中引用的JavaScript文件。

注意:在生產環境中,務必將HTML、CSS和JS文件放到單獨的目錄中。這樣可以避免一些好奇的用戶訪問伺服器上的程式碼。

index.html 內容非常簡單:

<!DOCTYPE html>  <html lang='en'>    <head>      <meta charset="utf-8">      <meta name="viewport" content="width=device-width, initial-scale=1">      <title>Planet Tracker</title>  </head>  <body>      <div id="app">          <label id="lon">Longitude: <input type="text"/></label><br/>          <label id="lat">Latitude: <input type="text"/></label><br/>          <label id="elevation">Elevation: <input type="text"/></label><br/>      </div>      <script src="/app.js"></script>  </body>  

不過,app.js 文件稍微複雜一些:

var App = function() {        this.planetNames = [          "mercury",          "venus",          "mars",          "jupiter",          "saturn",          "uranus",          "neptune"      ]        this.geoLocationIds = [          "lon",          "lat",          "elevation"      ]        this.keyUpInterval = 500      this.keyUpTimer = null      this.planetDisplayCreated = false      this.updateInterval = 2000 // update very second and a half      this.updateTimer = null      this.geoLocation = null        this.init = function() {          this.getGeoLocation().then((position) => {              var coords = this.processCoordinates(position)              this.geoLocation = coords              this.initGeoLocationDisplay()              this.updateGeoLocationDisplay()              return this.getPlanetEphemerides()          }).then((planetData) => {              this.createPlanetDisplay()              this.updatePlanetDisplay(planetData)          }).then(() => {              return this.initUpdateTimer()          })      }        this.update = function() {          if (this.planetDisplayCreated) {              this.getPlanetEphemerides().then((planetData) => {                  this.updatePlanetDisplay(planetData)              })          }      }        this.get = function(url, data) {          var request = new XMLHttpRequest()          if (data !== undefined) {              url += `?${data}`          }          // console.log(`get: ${url}`)          request.open("GET", url, true)          return new Promise((resolve, reject) => {              request.send()              request.onreadystatechange = function(){                  if (this.readyState === XMLHttpRequest.DONE && this.status === 200) {                      resolve(this)                  }              }              request.onerror = reject          })      }        this.processCoordinates = function(position) {          var coordMap = {              'longitude': 'lon',              'latitude': 'lat',              'altitude': 'elevation'          }          var coords = Object.keys(coordMap).reduce((obj, name) => {              var coord = position.coords[name]              if (coord === null || isNaN(coord)) {                  coord = 0.0              }              obj[coordMap[name]] = coord              return obj          }, {})          return coords      }        this.coordDataUrl = function (coords) {          postUrl = Object.keys(coords).map((c) => {              return `${c}=${coords[c]}`          })          return postUrl      }        this.getGeoLocation = function() {          return new Promise((resolve, reject) => {              navigator.geolocation.getCurrentPosition(resolve)          })      }        this.getPlanetEphemeris = function(planetName) {          var postUrlArr = this.coordDataUrl(this.geoLocation)          return this.get(`/planets/${planetName}`, postUrlArr.join("&")).then((req) => {              return JSON.parse(req.response)          })      }        this.getPlanetEphemerides = function() {          return Promise.all(              this.planetNames.map((name) => {                  return this.getPlanetEphemeris(name)              })          )      }        this.createPlanetDisplay = function() {          var div = document.getElementById("app")          var table = document.createElement("table")          var header = document.createElement("tr")          var headerNames = ["Name", "Azimuth", "Altitude"]          headerNames.forEach((headerName) => {              var headerElement = document.createElement("th")              headerElement.textContent = headerName              header.appendChild(headerElement)          })          table.appendChild(header)          this.planetNames.forEach((name) => {              var planetRow = document.createElement("tr")              headerNames.forEach((headerName) => {                  planetRow.appendChild(                      document.createElement("td")                  )              })              planetRow.setAttribute("id", name)              table.appendChild(planetRow)          })          div.appendChild(table)          this.planetDisplayCreated = true      }        this.updatePlanetDisplay = function(planetData) {          planetData.forEach((d) => {              var content = [d.name, d.az, d.alt]              var planetRow = document.getElementById(d.name)              planetRow.childNodes.forEach((node, idx) => {                  var contentFloat = parseFloat(content[idx])                  if (isNaN(contentFloat)) {                      node.textContent = content[idx]                  } else {                      node.textContent = contentFloat.toFixed(2)                  }              })          })      }        this.initGeoLocationDisplay = function() {          this.geoLocationIds.forEach((id) => {              var node = document.getElementById(id)              node.childNodes[1].onkeyup = this.onGeoLocationKeyUp()          })          var appNode = document.getElementById("app")          var resetLocationButton = document.createElement("button")          resetLocationButton.setAttribute("id", "reset-location")          resetLocationButton.onclick = this.onResetLocationClick()          resetLocationButton.textContent = "Reset Geo Location"          appNode.appendChild(resetLocationButton)      }        this.updateGeoLocationDisplay = function() {          Object.keys(this.geoLocation).forEach((id) => {              var node = document.getElementById(id)              node.childNodes[1].value = parseFloat(                  this.geoLocation[id]              ).toFixed(2)          })      }        this.getDisplayedGeoLocation = function() {          var displayedGeoLocation = this.geoLocationIds.reduce((val, id) => {              var node = document.getElementById(id)              var nodeVal = parseFloat(node.childNodes[1].value)              val[id] = nodeVal              if (isNaN(nodeVal)) {                  val.valid = false              }              return val          }, {valid: true})          return displayedGeoLocation      }        this.onGeoLocationKeyUp = function() {          return (evt) => {              // console.log(evt.key, evt.code)              var currentTime = new Date()              if (this.keyUpTimer !== null){                  clearTimeout(this.keyUpTimer)              }              this.keyUpTimer = setTimeout(() => {                  var displayedGeoLocation = this.getDisplayedGeoLocation()                  if (displayedGeoLocation.valid) {                      delete displayedGeoLocation.valid                      this.geoLocation = displayedGeoLocation                      console.log("Using user supplied geo location")                  }              }, this.keyUpInterval)          }      }        this.onResetLocationClick = function() {          return (evt) => {              console.log("Geo location reset clicked")              this.getGeoLocation().then((coords) => {                  this.geoLocation = this.processCoordinates(coords)                  this.updateGeoLocationDisplay()              })          }      }        this.initUpdateTimer = function () {          if (this.updateTimer !== null) {              clearInterval(this.updateTimer)          }          this.updateTimer = setInterval(              this.update.bind(this),              this.updateInterval          )          return this.updateTimer      }        this.testPerformance = function(n) {          var t0 = performance.now()          var promises = []          for (var i=0; i<n; i++) {              promises.push(this.getPlanetEphemeris("mars"))          }          Promise.all(promises).then(() => {              var delta = (performance.now() - t0)/1000              console.log(`Took ${delta.toFixed(4)} seconds to do ${n} requests`)          })      }  }    var app  document.addEventListener("DOMContentLoaded", (evt) => {      app = new App()      app.init()  })

該應用程式將定時(間隔2秒)更新顯示行星的方位和高度資訊。Web Geolocation API 會默認讀取用戶當前地理位置,也可以自己手動輸入地理坐標置。如果用戶停止輸入半秒以上時間,就會開始自動更新行星位置數據。

雖然這不是JavaScript教程,但是這裡可以簡單講解下JS腳本的部分內容:

  • createPlanetDisplay 動態創建HTML元素並綁定到Document Object Model(DOM);
  • updatePlanetDisplay 用來從伺服器接收數據,使用 createPlanetDisplay 顯示;
  • getGeoLocation 使用 Web Geolocation API 獲取用戶當前的地理坐標。但要求在「安全的上下文中」使用(即必須使用HTTPS而不是HTTP)
  • getPlanetEphemerisgetPlanetEphemerides 都是向伺服器發出GET請求,分別獲取指定行星和所有行星的位置資訊。

結語

在本文中,簡單介紹了Python中的非同步web開發是什麼樣子的——它的優點和用途。之後,構建了一個簡單的基於 aiohttp 的響應式應用程式,在用戶給定地理坐標的情況下,動態顯示當前太陽系行星的相關天空位置。