3分钟实践:Python语言在Serverless架构下实现敏感词过滤
- 2020 年 3 月 3 日
- 筆記
前言
随着各种社交论坛等的日益火爆,敏感词过滤逐渐成了非常重要的也是值得重视的功能。那么在Serverless架构下,通过Python语言,敏感词过滤又有那些新的实现呢?我们能否是用最简单的方法,实现一个敏感词过滤的API呢?
敏感过滤入门
Replace方法
如果说敏感词过滤,其实不如说是文本的替换,以Python为例,说到词汇替换,不得不想到replace
,我们可以准备一个敏感词库,然后通过replace
进行敏感词替换:
def check_filter(keywords, text): for eve in keywords: text = text.replace(eve, "***") return text keywords = ("关键词1", "关键词2", "关键词3") content = "这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。" print(check_filter(keywords, content))
但是动动脑大家就会发现,这种做法在文本和敏感词库非常庞大的前提下,会有很严重的性能问题。例如我将代码进行修改,进行基本的性能测试:
import time def check_filter(keywords, text): for eve in keywords: text = text.replace(eve, "***") return text keywords =[ "关键词" + str(i) for i in range(0,10000)] startTime = time.time() content = "这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。" * 10000 check_filter(keywords, content) print(time.time()-startTime)
此时的输出结果是:1.235044002532959
,可以看到性能非常差。
正则表达方法
与其用replace
,还不如通过正则表达re.sub
来的更加快速。
def check_filter(keywords, text): return re.sub("|".join(keywords), "***", text) keywords = ("关键词1", "关键词2", "关键词3") content = "这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。" print(check_filter(keywords, content))
我们同样增加性能测试,按照上面的方法进行改造测试,输出结果是0.47878289222717285
。通过这样的例子,我们可以发现,这种做法在性能层面变高了很多,这少可以说提升了几倍,如果随着词库的增加,这个倍数会成倍增加。
DFA过滤敏感词
这种方法相对来说效率会更高一些。例如,我们认为坏人,坏孩子,坏蛋是敏感词,则他们的树关系可以表达:

用DFA字典来表示:
{ '坏': { '蛋': { 'x00': 0 }, '人': { 'x00': 0 }, '孩': { '子': { 'x00': 0 } } } }
使用这种树表示问题最大的好处就是可以降低检索次数,提高检索效率,基本代码实现:
import time class DFAFilter(object): def __init__(self): self.keyword_chains = {} # 关键词链表 self.delimit = 'x00' # 限定 def add(self, keyword): keyword = keyword.lower() # 关键词英文变为小写 chars = keyword.strip() # 关键字去除首尾空格和换行 if not chars: # 如果关键词为空直接返回 return level = self.keyword_chains # 遍历关键字的每个字 for i in range(len(chars)): # 如果这个字已经存在字符链的key中就进入其子字典 if chars[i] in level: level = level[chars[i]] else: if not isinstance(level, dict): break for j in range(i, len(chars)): level[chars[j]] = {} last_level, last_char = level, chars[j] level = level[chars[j]] last_level[last_char] = {self.delimit: 0} break if i == len(chars) - 1: level[self.delimit] = 0 def parse(self, path): with open(path, encoding='utf-8') as f: for keyword in f: self.add(str(keyword).strip()) def filter(self, message, repl="*"): message = message.lower() ret = [] start = 0 while start < len(message): level = self.keyword_chains step_ins = 0 for char in message[start:]: if char in level: step_ins += 1 if self.delimit not in level[char]: level = level[char] else: ret.append(repl * step_ins) start += step_ins - 1 break else: ret.append(message[start]) break else: ret.append(message[start]) start += 1 return ''.join(ret) startTime = time.time() gfw = DFAFilter() gfw.parse( "./sensitive_words.txt") content = "这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。" * 10000 result = gfw.filter(content) print(time.time()-startTime)
这里我们的字典库是:
with open("./sensitive_words", 'w') as f: f.write("n".join( [ "关键词" + str(i) for i in range(0,10000)]))
执行结果:
4.9114227294921875e-05
可以看到性能进一步提升。
AC自动机过滤敏感词算法
接下来,我们来看一下 AC自动机过滤敏感词算法:
AC自动机:一个常见的例子就是给出n个单词,再给出一段包含m个字符的文章,让你找出有多少个单词在文章里出现过。
简单地讲,AC自动机就是字典树+kmp算法+失配指针
代码实现:
# AC自动机算法 class node(object): def __init__(self): self.next = {} self.fail = None self.isWord = False self.word = "" class ac_automation(object): def __init__(self): self.root = node() # 添加敏感词函数 def addword(self, word): temp_root = self.root for char in word: if char not in temp_root.next: temp_root.next[char] = node() temp_root = temp_root.next[char] temp_root.isWord = True temp_root.word = word # 失败指针函数 def make_fail(self): temp_que = [] temp_que.append(self.root) while len(temp_que) != 0: temp = temp_que.pop(0) p = None for key, value in temp.next.item(): if temp == self.root: temp.next[key].fail = self.root else: p = temp.fail while p is not None: if key in p.next: temp.next[key].fail = p.fail break p = p.fail if p is None: temp.next[key].fail = self.root temp_que.append(temp.next[key]) # 查找敏感词函数 def search(self, content): p = self.root result = [] currentposition = 0 while currentposition < len(content): word = content[currentposition] while word in p.next == False and p != self.root: p = p.fail if word in p.next: p = p.next[word] else: p = self.root if p.isWord: result.append(p.word) p = self.root currentposition += 1 return result # 加载敏感词库函数 def parse(self, path): with open(path, encoding='utf-8') as f: for keyword in f: self.addword(str(keyword).strip()) # 敏感词替换函数 def words_replace(self, text): """ :param ah: AC自动机 :param text: 文本 :return: 过滤敏感词之后的文本 """ result = list(set(self.search(text))) for x in result: m = text.replace(x, '*' * len(x)) text = m return text ah = ac_automation() path = './sensitive_words' ah.parse(path) content = "这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。" print(ah.words_replace(content))
词库同样是:
with open("./sensitive_words", 'w') as f: f.write("n".join( [ "关键词" + str(i) for i in range(0,10000)]))
使用上面的方法,将content
*10000测试结果为0.1727597713470459。
小结
可以看到这个所有算法中,在上述的基本算法中DFA过滤敏感词性能最高,但是实际上,对于后两者算法,并没有谁一定更好,可能某些时候,AC自动机过滤敏感词算法会得到更高的性能,所以在生产生活中,推荐时候用两者,可以根据自己的具体业务需要来做。
如何部署在Serverless架构下
很简单,以AC自动机过滤敏感词算法为例:我们只需要增加是几行代码就好,完整代码如下:
# -*- coding:utf-8 -*- import json, uuid # AC自动机算法 class node(object): def __init__(self): self.next = {} self.fail = None self.isWord = False self.word = "" class ac_automation(object): def __init__(self): self.root = node() # 添加敏感词函数 def addword(self, word): temp_root = self.root for char in word: if char not in temp_root.next: temp_root.next[char] = node() temp_root = temp_root.next[char] temp_root.isWord = True temp_root.word = word # 失败指针函数 def make_fail(self): temp_que = [] temp_que.append(self.root) while len(temp_que) != 0: temp = temp_que.pop(0) p = None for key, value in temp.next.item(): if temp == self.root: temp.next[key].fail = self.root else: p = temp.fail while p is not None: if key in p.next: temp.next[key].fail = p.fail break p = p.fail if p is None: temp.next[key].fail = self.root temp_que.append(temp.next[key]) # 查找敏感词函数 def search(self, content): p = self.root result = [] currentposition = 0 while currentposition < len(content): word = content[currentposition] while word in p.next == False and p != self.root: p = p.fail if word in p.next: p = p.next[word] else: p = self.root if p.isWord: result.append(p.word) p = self.root currentposition += 1 return result # 加载敏感词库函数 def parse(self, path): with open(path, encoding='utf-8') as f: for keyword in f: self.addword(str(keyword).strip()) # 敏感词替换函数 def words_replace(self, text): """ :param ah: AC自动机 :param text: 文本 :return: 过滤敏感词之后的文本 """ result = list(set(self.search(text))) for x in result: m = text.replace(x, '*' * len(x)) text = m return text def response(msg, error=False): return_data = { "uuid": str(uuid.uuid1()), "error": error, "message": msg } print(return_data) return return_data ah = ac_automation() path = './sensitive_words' ah.parse(path) def main_handler(event, context): try: sourceContent = json.loads(event["body"])["content"] return response({ "sourceContent": sourceContent, "filtedContent": ah.words_replace(sourceContent) }) except Exception as e: return response(str(e), True)
最后,为了方便本地测试,我们可以增加:
def test(): event = { "requestContext": { "serviceId": "service-f94sy04v", "path": "/test/{path}", "httpMethod": "POST", "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef", "identity": { "secretId": "abdcdxxxxxxxsdfs" }, "sourceIp": "14.17.22.34", "stage": "release" }, "headers": { "Accept-Language": "en-US,en,cn", "Accept": "text/html,application/xml,application/json", "Host": "service-3ei3tii4-251000691.ap-guangzhou.apigateway.myqloud.com", "User-Agent": "User Agent String" }, "body": "{"content":"这是一个测试的文本,我也就呵呵了"}", "pathParameters": { "path": "value" }, "queryStringParameters": { "foo": "bar" }, "headerParameters": { "Refer": "10.0.2.14" }, "stageVariables": { "stage": "release" }, "path": "/test/value", "queryString": { "foo": "bar", "bob": "alice" }, "httpMethod": "POST" } print(main_handler(event, None)) if __name__ == "__main__": test()
完成之后,我们就可以测试运行一下,例如我的字典是:
呵呵 测试
执行之后结果:
{'uuid': '9961ae2a-5cfc-11ea-a7c2-acde48001122', 'error': False, 'message': {'sourceContent': '这是一个测试的文本,我也就呵呵了', 'filtedContent': '这是一个**的文本,我也就**了'}}
接下来,我们将代码部署到云端,新建serverless.yaml
:
sensitive_word_filtering: component: "@serverless/tencent-scf" inputs: name: sensitive_word_filtering codeUri: ./ exclude: - .gitignore - .git/** - .serverless - .env handler: index.main_handler runtime: Python3.6 region: ap-beijing description: 敏感词过滤 memorySize: 64 timeout: 2 events: - apigw: name: serverless parameters: environment: release endpoints: - path: /sensitive_word_filtering description: 敏感词过滤 method: POST enableCORS: true param: - name: content position: BODY required: 'FALSE' type: string desc: 待过滤的句子
然后通过sls --debug
进行部署,部署结果:

最后,通过POSTMan进行测试:

额外的话
- 对于敏感词库去那里获得问题,github上有很多,可以自己搜一下,中文的英文的都有。我这里只是一个例子;
- 这个API使用场景,完全可以放在我们的社区跟帖系统/留言评论系统/博客发布系统中,防止出现敏感词汇,导致不必要的麻烦。