安全掃描調度系統實踐
- 2019 年 11 月 20 日
- 筆記
0×01 需求背景
日常掃描行為是一個常見的需求,同時我們希望,可以更方便的進行訂製自動化掃描任務制定與執行。我們不具體要求實用的掃描工具系統是什麼,開源與商業看具體自己的實際需求情況,我們只是用 AWVS 舉一個例子。
AWVS 本身提供了方便的 REST API 對外服務,如何通過設計封裝,讓 AWVS 執行的高效簡單,這篇要完成的一個任務。
如果把 AWVS 換成其它的安裝掃描工具,可否按同樣的思路降低工具使用的流程複雜度,讓安全工具的使用更自動化遍歷,最初構建這個項目時考慮的,這次我們通過 AWVS 這個例子,來實踐這種可能性。有一個這個基礎的設計可以延伸擴展到其它工具,按本案方法進行擴展驅動其功能。
下面是整體的設計,將 REST API 與 RPC 結合方式,對整個掃描工具進行封裝自動化。

現存在一個大家喜歡討論的問題是 RPC 和 REST 那個好,在我們這裡不討論那個好,按應用場景同時使用了兩個技術,REST 做業務邏輯和數據合法性檢查,PRC 做功能封裝驅動。在做規模的橫向擴展的時候,我們可以通過負載的形式,擴大 REST 和 RPC 服務的並性數和可用性。將混合的業務邏輯用 REST 和 RPC 分層的方式時行簡化,當然除了好處一定也有基於這種設計產生的其它問題。

本次程式碼層底核心是,封裝了 AWVS 的 auth 認證和指定掃描特定域名的處理過程,兩個主要的「mocker」就是 auth 和 scan, 時序圖很顯示的就是這些。
0×02 功能實現
具體的實現部分,將 Django Command、Django RPC、Django REST API、PyTEST、FSWatch 的部分進行介紹,會基於整套技術方案,產生其它的驅動方法,本案就是基於 AWVS 展開。最後達到的目地,就將 AWVS 對目標域名的操作掃描任務指定,簡化成了一條命令。如果之前還是說部署環境,現在就是具體的業務動作。
1. 功能使用
AWVS 本身提供了 REST API 的介面, 通過進一步的抽象,簡化和隱藏了複雜的調用過程。為了便於簡單實現對 AWVS 的操作,最後就變成了簡單的一條命令調用。
python manage.py dsl -d lua.ren
Django Command 的功能實現,是整個調用時序的入口,假設掃描的需求和設置很簡答,只有一個掃描域名的設定。
2. 功能函數
掃描功能實現,是靠整個時序鏈調用來完成的,如果直接從 Django Command 調用 Django RPC,參於的調用數據總體會比再加入一層 REST API 調用更簡單,而整個調用層級的構建,讓一個複雜的 API 調用,分層解耦簡單化。
對於 AWVS 最核心的驅動函數:一個是授權 auth,另一個就是添加測試任務。
2.1 授權
meta 數據結構中存放的是基本的授權用戶資訊, email 和 password。
def auth(self, meta): import urllib2 import ssl import json ssl._create_default_https_context = ssl._create_unverified_context url_login="https://localhost:3443/api/v1/me/login" send_headers_login={ 'Host': 'localhost:3443', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/json;charset=utf-8' } data_login='{"email":"' +meta['email'] + '",' + '"password":"'+ meta['password']+'","remember_me":false}' req_login = urllib2.Request(url_login,headers=send_headers_login) response_login = urllib2.urlopen(req_login,data_login) xauth = response_login.headers['X-Auth'] COOOOOOOOkie = response_login.headers['Set-Cookie'] print COOOOOOOOkie,xauth return True
2.2 添加掃描任務
用 Auth 取回的 Cookie 資訊,再進行 API 的調用,來完玘任務註冊。
def addTarget(self, formaturl): url="https://localhost:3443/api/v1/targets" send_headers2={ 'Host':'servers:3443', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', 'Content-Type':'application/json;charset=utf-8', 'X-Auth':xauth, 'Cookie':COOOOOOOOkie, } try: for i in formaturl: target_url='http://'+i.strip() data='{"description":"222","address":"'+target_url+'","criticality":"10"}' req = urllib2.Request(url,headers=send_headers2) response = urllib2.urlopen(req,data) jo=eval(response.read()) target_id=jo['target_id'] url_scan="https://localhost:3443/api/v1/scans" headers_scan={ 'Host': 'localhost:3443', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/json;charset=utf-8', 'X-Auth':xauth, 'Cookie':COOOOOOOOkie, } data_scan='{"target_id":'+'"'+target_id+'"'+',"profile_id":"11111111-1111-1111-1111-111111111111","schedule":{"disable":false,"start_date":null,"time_sensitive":false},"ui_session_id":"66666666666666666666666666666666"}' req_scan=urllib2.Request(url_scan,headers=headers_scan) response_scan=urllib2.urlopen(req_scan,data_scan) print response_scan.read() except Exception,e: print e return True
這兩個函數是最底層的函數,關於 AWVS 的 API 封裝 DEMO 網上有,大家可自行參考。
3. 測試用例
如果直接聯調,調試成本其實也不低,如果單體程式問題,聯調效率會有重複工作的問題。為了更好的理解這套 AWVS 的函數,是如何在當前設計結構中被調用的。我們用 PYTSET 把重點函數做了單體測試。
後續可能會加入其它模組的封裝調度,單體測試就變的必須起來。
3.1 測試認證過程
@pytest.mark.scan def test_5(setup_module): import awvs ins = awvs.AWVS() ins.auth({"email":"name", "password":"pwd"}) assert True == ret
3.2 測試添加掃描任務過程
@pytest.mark.scan def test_6(setup_module): import awvs ins = awvs.AWVS() ret = ins.addTarget(['lua.renn','candylab.netn']) assert True == ret
3.3 添加認證並掃描的過程
@pytest.mark.scan def test_7(setup_module): import awvs ins = awvs.AWVS() ins.auth({"email":"name", "password":"pwd"}) ret = ins.addTarget(['lua.renn','candylab.netn']) assert True == ret
其實認證和掃描的過程,前期是拆開測試的,如果不先認證,基本上就異常了,無法添加掃描任務。單測試用例是為了提供單體品質,提高結合測試的成功效率。
整體測試的還是 auth 函數用戶資訊字典入參的測試,與 addTarget 函數域名列表的測試。RPC 就更像一個代理人服務程式。
3.4 自動化測試
這個工程使用的測試工具是 pytest。我們想通過自動監聽 test.py 的 python 單體測試程式源碼的變更,自動調用 pytest 去掃行單體測試腳本。
如果在 linux 平台一下可以使用 tup,是一個很好用的工具。因我們在 mac 環境下掃行單體測試程式,我們使用 fswatch 完成這個功能。
3.4.1 安裝 fswatch
brew intall fswatch
如何在 Linux 平台用 tup 其實也很好。
3.4.2 監聽腳本
#!/bin/bash DIR=$1 if [ ! -n "$DIR" ] ;then echo "you have not choice Application directory !" exit fi fswatch $DIR | while read file do #echo "${file} was modify" >> unittest.log 2>&1 echo "${file} was modify" pytest -v -s -m"scan" ${file} done
3.4.3 驅動腳本
#!/bin/bash sh autotest.sh test.py
4. RPC 介面功能
當單體功能達到我們設想的要求時,需要封裝一個 RPC 服務對外提供服務。程式越複雜單體測試用例的量就同比量大。
@jsonrpc_method('myapp.autoscanner') def auto_scanner(request, domain='lua.ren'): import awvs ins = awvs.AWVS() ins.auth({"email":"name", "password":"pwd"}) ins.addTask(['lua.renn','candylab.netn']) return True
RPC 功能相當於把單體調用集成到一個介面,正常一個完整的單體要做入參的檢查工作,過濾掉非法入參。
因為我們最開始是考慮用新加的 REST API 作與外部調用者進行通訊,在 REST API 做入參檢查,並且 REST API 不需求外部調用者調用時,要依賴安全 RPC 客戶端。
5. Django Command 功能實現
實現了單體對 AWVS 的封裝,並實現 RPC 服務,先不考慮 REST 和前端的控制,實際上我們想當於把 AWVS 的 REST 功能命令行化。
from django.core.management.base import BaseCommand, CommandError import traceback class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument( '-d', '--domain', action='store', dest='domain', default='lua.ren', help='domain.', ) def handle(self, *args, **options): try: if options['domain']: print 'scan domain, %s' % options['domain'] from jsonrpc.proxy import ServiceProxy s = ServiceProxy('http://localhost:5000/json/') s.myapp.autoscanner(options['domain']) self.stdout.write(self.style.SUCCESS(u'命令%s 執行成功, 參數為%s' % (__file__, options['domain']))) except Exception, ex: traceback.print_exc() self.stdout.write(self.style.ERROR(u'命令執行出錯'))
6. REST API 實現
將功能性的內容用 RPC 實現,將 check 業務劃分和檢查放到了 REST API 層,這樣後端服務調用依賴 RPC Server 和 RPC Client,而 REST API 調用層不用考慮這個問題。
@csrf_exempt def addItem(request): if request.method == 'GET': return JSONResponse("GET") if request.method == 'POST': data = JSONParser().parse(request) flg_key = data.has_key('key') if not flg_key: return JSONResponse('key is empty!') access_key = data['key'] if cmp(access_key, "test"): return JSONResponse("access key error.") flg_domain = data.has_key('domain') if not flg_domain: result = {"error":"-1","errmsg":"domain is empty"} return HttpResponse(json.dumps(result,ensure_ascii=False),content_type="application/json,charset=utf-8") from jsonrpc.proxy import ServiceProxy s = ServiceProxy('http://localhost:5000/json/') import awvs ins = awvs.AWVS() ins.auth({"email":"name", "password":"pwd"}) ins.addTask(['lua.renn','candylab.netn']) result = {"error":"0","errmsg":"none"} return HttpResponse(json.dumps(result,ensure_ascii=False),content_type="application/json,charset=utf-8")
REST API 路由可以快速建立。
urlpatterns = [ url(r'scanner/$', views.addItem), ]
用 CURL 客戶端測試 REST API。
curl -l -H "Content-type: application/json" -X POST -d '{"key":"test","domain":"test.com"}' 127.0.0.1:8080/scanner/
7. 命令行
最終我們實現了 AWVS 的 REST API 的 RPC 和 REST 封裝,然後命令行化,當然的其中 RPC 和 REST API 可以其它的地方復用。
7.1 Django Command
python manage.py dsl -d lua.ren
7.2 CURL & REST API
curl -l -H "Content-type: application/json" -X POST -d '{"key":"test","domain":"test.com"}' 127.0.0.1:8080/scanner/
0×03 後記
本篇是聽取了Freebuf上老師和朋友們的建議回饋,然後產生了這個工程。這些老師朋友都是SDL的專家。特別是李老師給個工程起了一個名字叫 semaphore,並 PR。在這個工程的說明中引用了他們的對需求更精準的描述,還有以軟體本身的考慮。將 Semaphore 工程中有關 AWVS 的部分,抽出一個演示插件化 RPC 項目:semaphore-awvs-driver, 僅供參考。