安全扫描调度系统实践
- 2019 年 11 月 20 日
- 筆記
0×01 需求背景
日常扫描行为是一个常见的需求,同时我们希望,可以更方便的进行定制自动化扫描任务制定与执行。我们不具体要求实用的扫描工具系统是什么,开源与商业看具体自己的实际需求情况,我们只是用 AWVS 举一个例子。
AWVS 本身提供了方便的 REST API 对外服务,如何通过设计封装,让 AWVS 执行的高效简单,这篇要完成的一个任务。
如果把 AWVS 换成其它的安装扫描工具,可否按同样的思路降低工具使用的流程复杂度,让安全工具的使用更自动化遍历,最初构建这个项目时考虑的,这次我们通过 AWVS 这个例子,来实践这种可能性。有一个这个基础的设计可以延伸扩展到其它工具,按本案方法进行扩展驱动其功能。
下面是整体的设计,将 REST API 与 RPC 结合方式,对整个扫描工具进行封装自动化。

现存在一个大家喜欢讨论的问题是 RPC 和 REST 那个好,在我们这里不讨论那个好,按应用场景同时使用了两个技术,REST 做业务逻辑和数据合法性检查,PRC 做功能封装驱动。在做规模的横向扩展的时候,我们可以通过负载的形式,扩大 REST 和 RPC 服务的并性数和可用性。将混合的业务逻辑用 REST 和 RPC 分层的方式时行简化,当然除了好处一定也有基于这种设计产生的其它问题。

本次代码层底核心是,封装了 AWVS 的 auth 认证和指定扫描特定域名的处理过程,两个主要的「mocker」就是 auth 和 scan, 时序图很显示的就是这些。
0×02 功能实现
具体的实现部分,将 Django Command、Django RPC、Django REST API、PyTEST、FSWatch 的部分进行介绍,会基于整套技术方案,产生其它的驱动方法,本案就是基于 AWVS 展开。最后达到的目地,就将 AWVS 对目标域名的操作扫描任务指定,简化成了一条命令。如果之前还是说部署环境,现在就是具体的业务动作。
1. 功能使用
AWVS 本身提供了 REST API 的接口, 通过进一步的抽象,简化和隐藏了复杂的调用过程。为了便于简单实现对 AWVS 的操作,最后就变成了简单的一条命令调用。
python manage.py dsl -d lua.ren
Django Command 的功能实现,是整个调用时序的入口,假设扫描的需求和设置很简答,只有一个扫描域名的设定。
2. 功能函数
扫描功能实现,是靠整个时序链调用来完成的,如果直接从 Django Command 调用 Django RPC,参于的调用数据总体会比再加入一层 REST API 调用更简单,而整个调用层级的构建,让一个复杂的 API 调用,分层解耦简单化。
对于 AWVS 最核心的驱动函数:一个是授权 auth,另一个就是添加测试任务。
2.1 授权
meta 数据结构中存放的是基本的授权用户信息, email 和 password。
def auth(self, meta): import urllib2 import ssl import json ssl._create_default_https_context = ssl._create_unverified_context url_login="https://localhost:3443/api/v1/me/login" send_headers_login={ 'Host': 'localhost:3443', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/json;charset=utf-8' } data_login='{"email":"' +meta['email'] + '",' + '"password":"'+ meta['password']+'","remember_me":false}' req_login = urllib2.Request(url_login,headers=send_headers_login) response_login = urllib2.urlopen(req_login,data_login) xauth = response_login.headers['X-Auth'] COOOOOOOOkie = response_login.headers['Set-Cookie'] print COOOOOOOOkie,xauth return True
2.2 添加扫描任务
用 Auth 取回的 Cookie 信息,再进行 API 的调用,来完玘任务注册。
def addTarget(self, formaturl): url="https://localhost:3443/api/v1/targets" send_headers2={ 'Host':'servers:3443', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', 'Content-Type':'application/json;charset=utf-8', 'X-Auth':xauth, 'Cookie':COOOOOOOOkie, } try: for i in formaturl: target_url='http://'+i.strip() data='{"description":"222","address":"'+target_url+'","criticality":"10"}' req = urllib2.Request(url,headers=send_headers2) response = urllib2.urlopen(req,data) jo=eval(response.read()) target_id=jo['target_id'] url_scan="https://localhost:3443/api/v1/scans" headers_scan={ 'Host': 'localhost:3443', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/json;charset=utf-8', 'X-Auth':xauth, 'Cookie':COOOOOOOOkie, } data_scan='{"target_id":'+'"'+target_id+'"'+',"profile_id":"11111111-1111-1111-1111-111111111111","schedule":{"disable":false,"start_date":null,"time_sensitive":false},"ui_session_id":"66666666666666666666666666666666"}' req_scan=urllib2.Request(url_scan,headers=headers_scan) response_scan=urllib2.urlopen(req_scan,data_scan) print response_scan.read() except Exception,e: print e return True
这两个函数是最底层的函数,关于 AWVS 的 API 封装 DEMO 网上有,大家可自行参考。
3. 测试用例
如果直接联调,调试成本其实也不低,如果单体程序问题,联调效率会有重复工作的问题。为了更好的理解这套 AWVS 的函数,是如何在当前设计结构中被调用的。我们用 PYTSET 把重点函数做了单体测试。
后续可能会加入其它模块的封装调度,单体测试就变的必须起来。
3.1 测试认证过程
@pytest.mark.scan def test_5(setup_module): import awvs ins = awvs.AWVS() ins.auth({"email":"name", "password":"pwd"}) assert True == ret
3.2 测试添加扫描任务过程
@pytest.mark.scan def test_6(setup_module): import awvs ins = awvs.AWVS() ret = ins.addTarget(['lua.renn','candylab.netn']) assert True == ret
3.3 添加认证并扫描的过程
@pytest.mark.scan def test_7(setup_module): import awvs ins = awvs.AWVS() ins.auth({"email":"name", "password":"pwd"}) ret = ins.addTarget(['lua.renn','candylab.netn']) assert True == ret
其实认证和扫描的过程,前期是拆开测试的,如果不先认证,基本上就异常了,无法添加扫描任务。单测试用例是为了提供单体质量,提高结合测试的成功效率。
整体测试的还是 auth 函数用户信息字典入参的测试,与 addTarget 函数域名列表的测试。RPC 就更像一个代理人服务程序。
3.4 自动化测试
这个工程使用的测试工具是 pytest。我们想通过自动监听 test.py 的 python 单体测试程序源码的变更,自动调用 pytest 去扫行单体测试脚本。
如果在 linux 平台一下可以使用 tup,是一个很好用的工具。因我们在 mac 环境下扫行单体测试程序,我们使用 fswatch 完成这个功能。
3.4.1 安装 fswatch
brew intall fswatch
如何在 Linux 平台用 tup 其实也很好。
3.4.2 监听脚本
#!/bin/bash DIR=$1 if [ ! -n "$DIR" ] ;then echo "you have not choice Application directory !" exit fi fswatch $DIR | while read file do #echo "${file} was modify" >> unittest.log 2>&1 echo "${file} was modify" pytest -v -s -m"scan" ${file} done
3.4.3 驱动脚本
#!/bin/bash sh autotest.sh test.py
4. RPC 接口功能
当单体功能达到我们设想的要求时,需要封装一个 RPC 服务对外提供服务。程序越复杂单体测试用例的量就同比量大。
@jsonrpc_method('myapp.autoscanner') def auto_scanner(request, domain='lua.ren'): import awvs ins = awvs.AWVS() ins.auth({"email":"name", "password":"pwd"}) ins.addTask(['lua.renn','candylab.netn']) return True
RPC 功能相当于把单体调用集成到一个接口,正常一个完整的单体要做入参的检查工作,过滤掉非法入参。
因为我们最开始是考虑用新加的 REST API 作与外部调用者进行通信,在 REST API 做入参检查,并且 REST API 不需求外部调用者调用时,要依赖安全 RPC 客户端。
5. Django Command 功能实现
实现了单体对 AWVS 的封装,并实现 RPC 服务,先不考虑 REST 和前端的控制,实际上我们想当于把 AWVS 的 REST 功能命令行化。
from django.core.management.base import BaseCommand, CommandError import traceback class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument( '-d', '--domain', action='store', dest='domain', default='lua.ren', help='domain.', ) def handle(self, *args, **options): try: if options['domain']: print 'scan domain, %s' % options['domain'] from jsonrpc.proxy import ServiceProxy s = ServiceProxy('http://localhost:5000/json/') s.myapp.autoscanner(options['domain']) self.stdout.write(self.style.SUCCESS(u'命令%s 执行成功, 参数为%s' % (__file__, options['domain']))) except Exception, ex: traceback.print_exc() self.stdout.write(self.style.ERROR(u'命令执行出错'))
6. REST API 实现
将功能性的内容用 RPC 实现,将 check 业务划分和检查放到了 REST API 层,这样后端服务调用依赖 RPC Server 和 RPC Client,而 REST API 调用层不用考虑这个问题。
@csrf_exempt def addItem(request): if request.method == 'GET': return JSONResponse("GET") if request.method == 'POST': data = JSONParser().parse(request) flg_key = data.has_key('key') if not flg_key: return JSONResponse('key is empty!') access_key = data['key'] if cmp(access_key, "test"): return JSONResponse("access key error.") flg_domain = data.has_key('domain') if not flg_domain: result = {"error":"-1","errmsg":"domain is empty"} return HttpResponse(json.dumps(result,ensure_ascii=False),content_type="application/json,charset=utf-8") from jsonrpc.proxy import ServiceProxy s = ServiceProxy('http://localhost:5000/json/') import awvs ins = awvs.AWVS() ins.auth({"email":"name", "password":"pwd"}) ins.addTask(['lua.renn','candylab.netn']) result = {"error":"0","errmsg":"none"} return HttpResponse(json.dumps(result,ensure_ascii=False),content_type="application/json,charset=utf-8")
REST API 路由可以快速建立。
urlpatterns = [ url(r'scanner/$', views.addItem), ]
用 CURL 客户端测试 REST API。
curl -l -H "Content-type: application/json" -X POST -d '{"key":"test","domain":"test.com"}' 127.0.0.1:8080/scanner/
7. 命令行
最终我们实现了 AWVS 的 REST API 的 RPC 和 REST 封装,然后命令行化,当然的其中 RPC 和 REST API 可以其它的地方复用。
7.1 Django Command
python manage.py dsl -d lua.ren
7.2 CURL & REST API
curl -l -H "Content-type: application/json" -X POST -d '{"key":"test","domain":"test.com"}' 127.0.0.1:8080/scanner/
0×03 后记
本篇是听取了Freebuf上老师和朋友们的建议反馈,然后产生了这个工程。这些老师朋友都是SDL的专家。特别是李老师给个工程起了一个名字叫 semaphore,并 PR。在这个工程的说明中引用了他们的对需求更精准的描述,还有以软件本身的考虑。将 Semaphore 工程中有关 AWVS 的部分,抽出一个演示插件化 RPC 项目:semaphore-awvs-driver, 仅供参考。