阿里雲IoT流轉到postgresql資料庫方案

之前寫過一篇如使用阿里雲上部署.NET 3.1自定義運行時的文章,吐槽一下,雖然現在已經2022年了,但是阿里雲函數計算的支援依然停留在.NET Core 2.1,更新緩慢,由於程式解包大小的限制,也不能放太複雜的東西的上去,雖然現在.NET 6裁剪包能挺好地解決這個問題,但是心裡還是不爽。

需求

言歸正傳,有這麼一個情景:發送數據想接入阿里雲的IoT平台,然後直接插入postgresQL資料庫中。正常來說,只要數據發送到了IoT平台,然後定義轉發到RDS就可以了,不過阿里雲有幾個限制:

  1. 數據流轉到RDS資料庫,只能支援mysqlsql server
  2. 數據流轉只支援json形式的數據流轉,如果發送的是透傳的數據,那麼發送不了(更新:現在新版的數據流轉已經支援了。)

思前想後,可能只能掏出阿里雲的函數計算服務了,運用函數計算作為中轉,將透傳的數據流轉給函數計算,然後在函數計算中執行sql語句。

IoT平台接收設置

阿里雲的物聯網平台,設置了基本的產品和設備之後,如果是物模型的話,那麼自行設置好對應的物模型。對於透傳就比較簡單了,支援MQTT的設備方只需要定義:

  • 透傳的消息發送到/{productKey}/{deviceName}/user/update
  • 訂閱阿里雲的/{productKey}/{deviceName}/user/get
  • 設置阿里雲的Mqtt IoT實例終端節點:({YourProductKey}.iot-as-mqtt.{YourRegionId}.aliyuncs.com:1883
  • 設置設備的ProductKey和ProductSecret

設置好之後,即可傳輸數據到阿里雲IoT端,數據傳輸過來,看下日誌,如果能看到:

img

那說明就已經發送OK了,接收到的是普通的字元串(不是json),需要進行進一步解析。

IoT流轉設置

雲產品流轉中,新建解析器,設置好數據源,數據目的選擇函數計算:
img
解析器腳本比較簡單:

var data = payload(); 
writeFc(1000, data);  

注意,payload函數payload(textEncoding)是內建的函數:

  • 不傳入參數:默認按照UTF-8編碼轉換為字元串,即payload()等價於payload(‘utf-8’)。
  • ‘json’:將payload數據轉換成Map格式變數。如果payload不是JSON格式,則返回異常。
  • ‘binary’:將payload數據轉換成二進位變數進行透傳。
    這裡我使用文本透傳的形式,將數據轉成UTF8文本傳輸。writeFc指將轉換的內容傳遞給1000編號的目標函數。詳情見文檔

當然還可以使用更為複雜的腳本,實現對腳本數據的初步處理,由於我這裡後面還有函數計算,我就直接將數據轉到下一個節點。

函數計算配置

按照官方文檔新建函數,請注意不需要新建觸發器!我們這裡的函數使用python語言,通過psycopg2實現數據插入到postgres資料庫中。

由於函數計算中,默認並沒有該包,需要手動添加引用,官方建議使用Serverless Devs工具安裝部署,這個玩意非常不好用,嗯,我不接受他的建議。推薦大家使用vscode,安裝阿里雲serverless的插件,這樣其實更加方便。

按照插件的文檔,自己建立好服務與函數,默認會給一個函數入口:

# To enable the initializer feature (//help.aliyun.com/document_detail/158208.html)
# please implement the initializer function as below:
# def initializer(context):
#   logger = logging.getLogger()
#   logger.info('initializing')

def handler(event, context):
  logger = logging.getLogger()
  logger.info(event)
  return 'hello world'

我們首先在函數上右鍵,然後選擇Install Package,選擇pip安裝psycopg2,依賴就自動被安裝上了,這個非常方便。

請注意,通過IOT流轉過來的字元串,是b'data'這樣的形式的形式,需要先decode一下,然後在處理,修改函數為:

# -*- coding: utf-8 -*-
import logging
import psycopg2
import uuid
import time

def insert_database(device_id,data):
    timest = int(time.time()*1000)
    guid = str(uuid.uuid1())
    conn = psycopg2.connect(database="", user="", password="", host="", port="")
    cur = conn.cursor()
    sql = 'INSERT INTO "data"("Id","DeviceId","Timestamp", "DataArray") VALUES (\'{id}\', \'{deviceid}\', \'{timestamp}\', array{data})'
    sql = sql.format(id= guid, deviceid= device_id, timestamp= timest, data= data)
    cur.execute(sql)
    conn.commit()
    print(" Records inserted successfully")
    conn.close() 
    
def extract_string_array(data: bytes):
    arr = data.decode().strip().split(' ')
    # 寫自己的邏輯
    return deviceid, resu   

def handler(event, context):
  logger = logging.getLogger()
  logger.info(event)
  device_id, result = extract_string_array(event)
  insert_database(device_id, result)
  return 'OK'

保存,然後在vscode中deploy即可。

提示:vscode中也可以進行本地的debug,還是比較方便的,不過這些功能依賴docker,所以還是提前裝好比較好。

弄完了之後,應該是能看見這樣的畫面:

img

至此,數據就正常流轉成功。

要點

  1. 不要設置觸發器,當時為了配置這個觸發器弄了非常長的時間
  2. 函數計算與資料庫的VPC應該相同,並且賦予許可權,否則無法訪問。
  3. 函數計算默認無法保持狀態,如果有這個需求,最好試試別的方案,或者看下函數計算的預留實例(常駐實例)
  4. 提前在本地安裝好Docker,要不會有各種各樣的問題出現。
  5. Postgresql插入數組格式的數據,需要注意格式,可以參考這篇文檔
  6. 如果長時間不用docker,導致docker無法啟動,可以參考這篇文章