安全掃描調度系統實踐

  • 2019 年 11 月 20 日
  • 筆記

0×01 需求背景

日常掃描行為是一個常見的需求,同時我們希望,可以更方便的進行訂製自動化掃描任務制定與執行。我們不具體要求實用的掃描工具系統是什麼,開源與商業看具體自己的實際需求情況,我們只是用 AWVS 舉一個例子。

AWVS 本身提供了方便的 REST API 對外服務,如何通過設計封裝,讓 AWVS 執行的高效簡單,這篇要完成的一個任務。

如果把 AWVS 換成其它的安裝掃描工具,可否按同樣的思路降低工具使用的流程複雜度,讓安全工具的使用更自動化遍歷,最初構建這個項目時考慮的,這次我們通過 AWVS 這個例子,來實踐這種可能性。有一個這個基礎的設計可以延伸擴展到其它工具,按本案方法進行擴展驅動其功能。

下面是整體的設計,將 REST API 與 RPC 結合方式,對整個掃描工具進行封裝自動化。

現存在一個大家喜歡討論的問題是 RPC 和 REST 那個好,在我們這裡不討論那個好,按應用場景同時使用了兩個技術,REST 做業務邏輯和數據合法性檢查,PRC 做功能封裝驅動。在做規模的橫向擴展的時候,我們可以通過負載的形式,擴大 REST 和 RPC 服務的並性數和可用性。將混合的業務邏輯用 REST 和 RPC 分層的方式時行簡化,當然除了好處一定也有基於這種設計產生的其它問題。

本次程式碼層底核心是,封裝了 AWVS 的 auth 認證和指定掃描特定域名的處理過程,兩個主要的「mocker」就是 auth 和 scan, 時序圖很顯示的就是這些。

0×02 功能實現

具體的實現部分,將 Django Command、Django RPC、Django REST API、PyTEST、FSWatch 的部分進行介紹,會基於整套技術方案,產生其它的驅動方法,本案就是基於 AWVS 展開。最後達到的目地,就將 AWVS 對目標域名的操作掃描任務指定,簡化成了一條命令。如果之前還是說部署環境,現在就是具體的業務動作。

1. 功能使用

AWVS 本身提供了 REST API 的介面, 通過進一步的抽象,簡化和隱藏了複雜的調用過程。為了便於簡單實現對 AWVS 的操作,最後就變成了簡單的一條命令調用。

python manage.py dsl -d lua.ren

Django Command 的功能實現,是整個調用時序的入口,假設掃描的需求和設置很簡答,只有一個掃描域名的設定。

2. 功能函數

掃描功能實現,是靠整個時序鏈調用來完成的,如果直接從 Django Command 調用 Django RPC,參於的調用數據總體會比再加入一層 REST API 調用更簡單,而整個調用層級的構建,讓一個複雜的 API 調用,分層解耦簡單化。

對於 AWVS 最核心的驅動函數:一個是授權 auth,另一個就是添加測試任務。

2.1 授權

meta 數據結構中存放的是基本的授權用戶資訊, email 和 password。

def auth(self, meta):          import urllib2          import ssl          import json          ssl._create_default_https_context = ssl._create_unverified_context          url_login="https://localhost:3443/api/v1/me/login"            send_headers_login={                  'Host': 'localhost:3443',                  'Accept': 'application/json, text/plain, */*',                  'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',                  'Accept-Encoding': 'gzip, deflate, br',                  'Content-Type': 'application/json;charset=utf-8'                  }          data_login='{"email":"' +meta['email'] + '",' + '"password":"'+ meta['password']+'","remember_me":false}'          req_login = urllib2.Request(url_login,headers=send_headers_login)          response_login = urllib2.urlopen(req_login,data_login)          xauth = response_login.headers['X-Auth']          COOOOOOOOkie = response_login.headers['Set-Cookie']          print COOOOOOOOkie,xauth          return True

2.2 添加掃描任務

用 Auth 取回的 Cookie 資訊,再進行 API 的調用,來完玘任務註冊。

def addTarget(self, formaturl):          url="https://localhost:3443/api/v1/targets"          send_headers2={          'Host':'servers:3443',          'Accept': 'application/json, text/plain, */*',          'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',          'Content-Type':'application/json;charset=utf-8',          'X-Auth':xauth,          'Cookie':COOOOOOOOkie,          }            try:              for i in formaturl:                  target_url='http://'+i.strip()                  data='{"description":"222","address":"'+target_url+'","criticality":"10"}'                  req = urllib2.Request(url,headers=send_headers2)                  response = urllib2.urlopen(req,data)                  jo=eval(response.read())                  target_id=jo['target_id']                  url_scan="https://localhost:3443/api/v1/scans"              headers_scan={             'Host': 'localhost:3443',             'Accept': 'application/json, text/plain, */*',             'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',             'Accept-Encoding': 'gzip, deflate, br',             'Content-Type': 'application/json;charset=utf-8',             'X-Auth':xauth,             'Cookie':COOOOOOOOkie,              }              data_scan='{"target_id":'+'"'+target_id+'"'+',"profile_id":"11111111-1111-1111-1111-111111111111","schedule":{"disable":false,"start_date":null,"time_sensitive":false},"ui_session_id":"66666666666666666666666666666666"}'              req_scan=urllib2.Request(url_scan,headers=headers_scan)              response_scan=urllib2.urlopen(req_scan,data_scan)              print response_scan.read()            except Exception,e:              print e          return True

這兩個函數是最底層的函數,關於 AWVS 的 API 封裝 DEMO 網上有,大家可自行參考。

3. 測試用例

如果直接聯調,調試成本其實也不低,如果單體程式問題,聯調效率會有重複工作的問題。為了更好的理解這套 AWVS 的函數,是如何在當前設計結構中被調用的。我們用 PYTSET 把重點函數做了單體測試。

後續可能會加入其它模組的封裝調度,單體測試就變的必須起來。

3.1 測試認證過程

@pytest.mark.scan  def test_5(setup_module):      import awvs      ins = awvs.AWVS()      ins.auth({"email":"name", "password":"pwd"})      assert True == ret

3.2 測試添加掃描任務過程

@pytest.mark.scan  def test_6(setup_module):      import awvs      ins = awvs.AWVS()      ret = ins.addTarget(['lua.renn','candylab.netn'])      assert True == ret

3.3 添加認證並掃描的過程

@pytest.mark.scan  def test_7(setup_module):      import awvs      ins = awvs.AWVS()      ins.auth({"email":"name", "password":"pwd"})      ret = ins.addTarget(['lua.renn','candylab.netn'])      assert True == ret

其實認證和掃描的過程,前期是拆開測試的,如果不先認證,基本上就異常了,無法添加掃描任務。單測試用例是為了提供單體品質,提高結合測試的成功效率。

整體測試的還是 auth 函數用戶資訊字典入參的測試,與 addTarget 函數域名列表的測試。RPC 就更像一個代理人服務程式。

3.4 自動化測試

這個工程使用的測試工具是 pytest。我們想通過自動監聽 test.py 的 python 單體測試程式源碼的變更,自動調用 pytest 去掃行單體測試腳本。

如果在 linux 平台一下可以使用 tup,是一個很好用的工具。因我們在 mac 環境下掃行單體測試程式,我們使用 fswatch 完成這個功能。

3.4.1 安裝 fswatch

brew intall fswatch

如何在 Linux 平台用 tup 其實也很好。

3.4.2 監聽腳本

#!/bin/bash  DIR=$1  if [ ! -n "$DIR" ] ;then      echo "you have not choice Application directory !"      exit  fi    fswatch $DIR | while read file  do      #echo "${file} was modify" >> unittest.log 2>&1      echo "${file} was modify"      pytest -v -s -m"scan" ${file}  done

3.4.3 驅動腳本

#!/bin/bash  sh autotest.sh test.py

4. RPC 介面功能

當單體功能達到我們設想的要求時,需要封裝一個 RPC 服務對外提供服務。程式越複雜單體測試用例的量就同比量大。

@jsonrpc_method('myapp.autoscanner')  def auto_scanner(request, domain='lua.ren'):      import awvs      ins = awvs.AWVS()      ins.auth({"email":"name", "password":"pwd"})      ins.addTask(['lua.renn','candylab.netn'])      return True

RPC 功能相當於把單體調用集成到一個介面,正常一個完整的單體要做入參的檢查工作,過濾掉非法入參。

因為我們最開始是考慮用新加的 REST API 作與外部調用者進行通訊,在 REST API 做入參檢查,並且 REST API 不需求外部調用者調用時,要依賴安全 RPC 客戶端。

5. Django Command 功能實現

實現了單體對 AWVS 的封裝,並實現 RPC 服務,先不考慮 REST 和前端的控制,實際上我們想當於把 AWVS 的 REST 功能命令行化。

from django.core.management.base import BaseCommand, CommandError  import traceback  class Command(BaseCommand):      def add_arguments(self, parser):          parser.add_argument(              '-d',              '--domain',              action='store',              dest='domain',              default='lua.ren',              help='domain.',          )        def handle(self, *args, **options):          try:              if options['domain']:                  print 'scan domain, %s' % options['domain']                from jsonrpc.proxy import ServiceProxy              s = ServiceProxy('http://localhost:5000/json/')              s.myapp.autoscanner(options['domain'])                self.stdout.write(self.style.SUCCESS(u'命令%s 執行成功, 參數為%s' % (__file__, options['domain'])))          except Exception, ex:              traceback.print_exc()              self.stdout.write(self.style.ERROR(u'命令執行出錯'))

6. REST API 實現

將功能性的內容用 RPC 實現,將 check 業務劃分和檢查放到了 REST API 層,這樣後端服務調用依賴 RPC Server 和 RPC Client,而 REST API 調用層不用考慮這個問題。

@csrf_exempt  def addItem(request):      if request.method == 'GET':          return JSONResponse("GET")        if request.method == 'POST':          data = JSONParser().parse(request)          flg_key = data.has_key('key')          if not flg_key:              return JSONResponse('key is empty!')            access_key = data['key']          if cmp(access_key, "test"):              return JSONResponse("access key error.")            flg_domain = data.has_key('domain')          if not flg_domain:              result = {"error":"-1","errmsg":"domain is empty"}              return HttpResponse(json.dumps(result,ensure_ascii=False),content_type="application/json,charset=utf-8")              from jsonrpc.proxy import ServiceProxy          s = ServiceProxy('http://localhost:5000/json/')          import awvs          ins = awvs.AWVS()          ins.auth({"email":"name", "password":"pwd"})          ins.addTask(['lua.renn','candylab.netn'])            result = {"error":"0","errmsg":"none"}          return HttpResponse(json.dumps(result,ensure_ascii=False),content_type="application/json,charset=utf-8")  

REST API 路由可以快速建立。

urlpatterns = [      url(r'scanner/$', views.addItem),  ]

用 CURL 客戶端測試 REST API。

curl -l -H "Content-type: application/json" -X POST -d '{"key":"test","domain":"test.com"}'  127.0.0.1:8080/scanner/

7. 命令行

最終我們實現了 AWVS 的 REST API 的 RPC 和 REST 封裝,然後命令行化,當然的其中 RPC 和 REST API 可以其它的地方復用。

7.1 Django Command

python manage.py dsl -d lua.ren

7.2 CURL & REST API

curl -l -H "Content-type: application/json" -X POST -d '{"key":"test","domain":"test.com"}'  127.0.0.1:8080/scanner/

0×03 後記

本篇是聽取了Freebuf上老師和朋友們的建議回饋,然後產生了這個工程。這些老師朋友都是SDL的專家。特別是李老師給個工程起了一個名字叫 semaphore,並 PR。在這個工程的說明中引用了他們的對需求更精準的描述,還有以軟體本身的考慮。將 Semaphore 工程中有關 AWVS 的部分,抽出一個演示插件化 RPC 項目:semaphore-awvs-driver, 僅供參考。