3分钟实践:Python语言在Serverless架构下实现敏感词过滤

前言

随着各种社交论坛等的日益火爆,敏感词过滤逐渐成了非常重要的也是值得重视的功能。那么在Serverless架构下,通过Python语言,敏感词过滤又有那些新的实现呢?我们能否是用最简单的方法,实现一个敏感词过滤的API呢?

敏感过滤入门

Replace方法

如果说敏感词过滤,其实不如说是文本的替换,以Python为例,说到词汇替换,不得不想到replace,我们可以准备一个敏感词库,然后通过replace进行敏感词替换:

def check_filter(keywords, text):      for eve in keywords:          text = text.replace(eve, "***")      return text  keywords = ("关键词1", "关键词2", "关键词3")  content = "这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。"  print(check_filter(keywords, content))

但是动动脑大家就会发现,这种做法在文本和敏感词库非常庞大的前提下,会有很严重的性能问题。例如我将代码进行修改,进行基本的性能测试:

import time    def check_filter(keywords, text):      for eve in keywords:          text = text.replace(eve, "***")      return text  keywords =[ "关键词" + str(i) for i in range(0,10000)]  startTime = time.time()  content = "这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。" * 10000  check_filter(keywords, content)  print(time.time()-startTime)

此时的输出结果是:1.235044002532959,可以看到性能非常差。

正则表达方法

与其用replace,还不如通过正则表达re.sub来的更加快速。

def check_filter(keywords, text):       return re.sub("|".join(keywords), "***", text)  keywords = ("关键词1", "关键词2", "关键词3")  content = "这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。"  print(check_filter(keywords, content))

我们同样增加性能测试,按照上面的方法进行改造测试,输出结果是0.47878289222717285。通过这样的例子,我们可以发现,这种做法在性能层面变高了很多,这少可以说提升了几倍,如果随着词库的增加,这个倍数会成倍增加。

DFA过滤敏感词

这种方法相对来说效率会更高一些。例如,我们认为坏人,坏孩子,坏蛋是敏感词,则他们的树关系可以表达:

用DFA字典来表示:

{      '坏': {          '蛋': {              'x00': 0          },          '人': {              'x00': 0          },          '孩': {              '子': {                  'x00': 0              }          }      }  }

使用这种树表示问题最大的好处就是可以降低检索次数,提高检索效率,基本代码实现:

import time      class DFAFilter(object):      def __init__(self):          self.keyword_chains = {}  # 关键词链表          self.delimit = 'x00'  # 限定        def add(self, keyword):          keyword = keyword.lower()  # 关键词英文变为小写          chars = keyword.strip()  # 关键字去除首尾空格和换行          if not chars:  # 如果关键词为空直接返回              return          level = self.keyword_chains          # 遍历关键字的每个字          for i in range(len(chars)):              # 如果这个字已经存在字符链的key中就进入其子字典              if chars[i] in level:                  level = level[chars[i]]              else:                  if not isinstance(level, dict):                      break                  for j in range(i, len(chars)):                      level[chars[j]] = {}                      last_level, last_char = level, chars[j]                      level = level[chars[j]]                  last_level[last_char] = {self.delimit: 0}                  break          if i == len(chars) - 1:              level[self.delimit] = 0        def parse(self, path):          with open(path, encoding='utf-8') as f:              for keyword in f:                  self.add(str(keyword).strip())        def filter(self, message, repl="*"):          message = message.lower()          ret = []          start = 0          while start < len(message):              level = self.keyword_chains              step_ins = 0              for char in message[start:]:                  if char in level:                      step_ins += 1                      if self.delimit not in level[char]:                          level = level[char]                      else:                          ret.append(repl * step_ins)                          start += step_ins - 1                          break                  else:                      ret.append(message[start])                      break              else:                  ret.append(message[start])              start += 1            return ''.join(ret)      startTime = time.time()  gfw = DFAFilter()  gfw.parse( "./sensitive_words.txt")  content = "这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。" * 10000  result = gfw.filter(content)  print(time.time()-startTime)

这里我们的字典库是:

with open("./sensitive_words", 'w') as f:      f.write("n".join( [ "关键词" + str(i) for i in range(0,10000)]))

执行结果:

4.9114227294921875e-05

可以看到性能进一步提升。

AC自动机过滤敏感词算法

接下来,我们来看一下 AC自动机过滤敏感词算法:

AC自动机:一个常见的例子就是给出n个单词,再给出一段包含m个字符的文章,让你找出有多少个单词在文章里出现过。

简单地讲,AC自动机就是字典树+kmp算法+失配指针

代码实现:

# AC自动机算法  class node(object):      def __init__(self):          self.next = {}          self.fail = None          self.isWord = False          self.word = ""      class ac_automation(object):        def __init__(self):          self.root = node()        # 添加敏感词函数      def addword(self, word):          temp_root = self.root          for char in word:              if char not in temp_root.next:                  temp_root.next[char] = node()              temp_root = temp_root.next[char]          temp_root.isWord = True          temp_root.word = word        # 失败指针函数      def make_fail(self):          temp_que = []          temp_que.append(self.root)          while len(temp_que) != 0:              temp = temp_que.pop(0)              p = None              for key, value in temp.next.item():                  if temp == self.root:                      temp.next[key].fail = self.root                  else:                      p = temp.fail                      while p is not None:                          if key in p.next:                              temp.next[key].fail = p.fail                              break                          p = p.fail                      if p is None:                          temp.next[key].fail = self.root                  temp_que.append(temp.next[key])        # 查找敏感词函数      def search(self, content):          p = self.root          result = []          currentposition = 0            while currentposition < len(content):              word = content[currentposition]              while word in p.next == False and p != self.root:                  p = p.fail                if word in p.next:                  p = p.next[word]              else:                  p = self.root                if p.isWord:                  result.append(p.word)                  p = self.root              currentposition += 1          return result        # 加载敏感词库函数      def parse(self, path):          with open(path, encoding='utf-8') as f:              for keyword in f:                  self.addword(str(keyword).strip())        # 敏感词替换函数      def words_replace(self, text):          """          :param ah: AC自动机          :param text: 文本          :return: 过滤敏感词之后的文本          """          result = list(set(self.search(text)))          for x in result:              m = text.replace(x, '*' * len(x))              text = m          return text    ah = ac_automation()  path = './sensitive_words'  ah.parse(path)  content = "这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。"  print(ah.words_replace(content))

词库同样是:

with open("./sensitive_words", 'w') as f:      f.write("n".join( [ "关键词" + str(i) for i in range(0,10000)]))

使用上面的方法,将content*10000测试结果为0.1727597713470459。

小结

可以看到这个所有算法中,在上述的基本算法中DFA过滤敏感词性能最高,但是实际上,对于后两者算法,并没有谁一定更好,可能某些时候,AC自动机过滤敏感词算法会得到更高的性能,所以在生产生活中,推荐时候用两者,可以根据自己的具体业务需要来做。

如何部署在Serverless架构下

很简单,以AC自动机过滤敏感词算法为例:我们只需要增加是几行代码就好,完整代码如下:

# -*- coding:utf-8 -*-    import json, uuid      # AC自动机算法  class node(object):      def __init__(self):          self.next = {}          self.fail = None          self.isWord = False          self.word = ""      class ac_automation(object):        def __init__(self):          self.root = node()        # 添加敏感词函数      def addword(self, word):          temp_root = self.root          for char in word:              if char not in temp_root.next:                  temp_root.next[char] = node()              temp_root = temp_root.next[char]          temp_root.isWord = True          temp_root.word = word        # 失败指针函数      def make_fail(self):          temp_que = []          temp_que.append(self.root)          while len(temp_que) != 0:              temp = temp_que.pop(0)              p = None              for key, value in temp.next.item():                  if temp == self.root:                      temp.next[key].fail = self.root                  else:                      p = temp.fail                      while p is not None:                          if key in p.next:                              temp.next[key].fail = p.fail                              break                          p = p.fail                      if p is None:                          temp.next[key].fail = self.root                  temp_que.append(temp.next[key])        # 查找敏感词函数      def search(self, content):          p = self.root          result = []          currentposition = 0            while currentposition < len(content):              word = content[currentposition]              while word in p.next == False and p != self.root:                  p = p.fail                if word in p.next:                  p = p.next[word]              else:                  p = self.root                if p.isWord:                  result.append(p.word)                  p = self.root              currentposition += 1          return result        # 加载敏感词库函数      def parse(self, path):          with open(path, encoding='utf-8') as f:              for keyword in f:                  self.addword(str(keyword).strip())        # 敏感词替换函数      def words_replace(self, text):          """          :param ah: AC自动机          :param text: 文本          :return: 过滤敏感词之后的文本          """          result = list(set(self.search(text)))          for x in result:              m = text.replace(x, '*' * len(x))              text = m          return text      def response(msg, error=False):      return_data = {          "uuid": str(uuid.uuid1()),          "error": error,          "message": msg      }      print(return_data)      return return_data      ah = ac_automation()  path = './sensitive_words'  ah.parse(path)      def main_handler(event, context):      try:          sourceContent = json.loads(event["body"])["content"]          return response({              "sourceContent": sourceContent,              "filtedContent": ah.words_replace(sourceContent)          })      except Exception as e:          return response(str(e), True)

最后,为了方便本地测试,我们可以增加:

def test():      event = {          "requestContext": {              "serviceId": "service-f94sy04v",              "path": "/test/{path}",              "httpMethod": "POST",              "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",              "identity": {                  "secretId": "abdcdxxxxxxxsdfs"              },              "sourceIp": "14.17.22.34",              "stage": "release"          },          "headers": {              "Accept-Language": "en-US,en,cn",              "Accept": "text/html,application/xml,application/json",              "Host": "service-3ei3tii4-251000691.ap-guangzhou.apigateway.myqloud.com",              "User-Agent": "User Agent String"          },          "body": "{"content":"这是一个测试的文本,我也就呵呵了"}",          "pathParameters": {              "path": "value"          },          "queryStringParameters": {              "foo": "bar"          },          "headerParameters": {              "Refer": "10.0.2.14"          },          "stageVariables": {              "stage": "release"          },          "path": "/test/value",          "queryString": {              "foo": "bar",              "bob": "alice"          },          "httpMethod": "POST"      }      print(main_handler(event, None))      if __name__ == "__main__":      test()

完成之后,我们就可以测试运行一下,例如我的字典是:

呵呵  测试

执行之后结果:

{'uuid': '9961ae2a-5cfc-11ea-a7c2-acde48001122', 'error': False, 'message': {'sourceContent': '这是一个测试的文本,我也就呵呵了', 'filtedContent': '这是一个**的文本,我也就**了'}}

接下来,我们将代码部署到云端,新建serverless.yaml:

sensitive_word_filtering:    component: "@serverless/tencent-scf"    inputs:      name: sensitive_word_filtering      codeUri: ./      exclude:        - .gitignore        - .git/**        - .serverless        - .env      handler: index.main_handler      runtime: Python3.6      region: ap-beijing      description: 敏感词过滤      memorySize: 64      timeout: 2      events:        - apigw:            name: serverless            parameters:              environment: release              endpoints:                - path: /sensitive_word_filtering                  description: 敏感词过滤                  method: POST                  enableCORS: true                  param:                    - name: content                      position: BODY                      required: 'FALSE'                      type: string                      desc: 待过滤的句子

然后通过sls --debug进行部署,部署结果:

最后,通过POSTMan进行测试:

额外的话

  • 对于敏感词库去那里获得问题,github上有很多,可以自己搜一下,中文的英文的都有。我这里只是一个例子;
  • 这个API使用场景,完全可以放在我们的社区跟帖系统/留言评论系统/博客发布系统中,防止出现敏感词汇,导致不必要的麻烦。