遗传编程(GA,genetic programming)算法初探,以及用遗传编程自动生成符合题解的正则表达式的实践
- 2019 年 10 月 3 日
- 筆記
1. 遗传编程简介
0x1:什么是遗传编程算法,和传统机器学习算法有什么区别
传统上,我们接触的机器学习算法,都是被设计为解决某一个某一类问题的确定性算法。对于这些机器学习算法来说,唯一的灵活性体现在参数搜索空间上,向算法输入样本,算法借助不同的优化手段,对参数进行调整,以此来得到一个对训练样本和测试样本的最佳适配参数组。
遗传编程算法完全走了另一外一条路,遗传编程算法的目标是编写一个程度,这个程序会尝试自动构造出解决某一问题的最佳程度。从本质上看,遗传编程算法构造的是一个能够构造算法的算法。
另一方面,我们曾经讨论过遗传算法,遗传算法是一种优化技术,就优化技术而言,无论是何种形式的优化,算法或度量都是预先设定好的,而优化算法所做的工作就是尝试为其找到最佳参数。和优化算法一样,遗传编程也需要一种方法来度量题解的优劣程度。但与优化算法不同的是,遗传编程中的题解并不仅仅是一组用于给定算法的参数,相反,在遗传编程中,连同算法本身及其所有参数,都是需要搜索确定的。
从某种程度上来说,遗传编程和遗传算法的区别在于,进化的基本单位不同,
- 遗传优化:进化的基本单位是模型可变参数
- 遗传编程:进化的基本单位是新算法以及新算法的参数
0x2:遗传编程和进化论的关系
遗传算法是受达尔文的进化论的启发,借鉴生物进化过程而提出的一种启发式搜索算法,因此遗传算法 ( GA , Genetic Algorithm ) 也称进化算法 。 因此,在讨论遗传编程的时候,会大量借用进化论中的术语和概念,为了更好地讨论遗传算法,我们先介绍一些基本生物进化概念,
- 基因 ( Gene ):一个遗传因子,种群中的最基本单元。
- 染色体 ( Chromosome ):一组的基因。
- 个体 ( individual ):单个生物。在遗传算法中,个体一般只包含一条染色体。
- 种群 ( Population ):由个体组成的群体。生物的进化以种群的形式进化。
- 适者生存 ( The survival of the fittest ):对环境适应度高的个体参与繁殖的机会比较多,后代就会越来越多。适应度低的个体参与繁殖的机会比较少,后代就会越来越少。
生物所处的环境起到一个提供生存压的作用(反馈),虽然纵观整个地球历史,环境的因素是在不断变化的(有时甚至变化的还很快),但是在某个时间段内(例如5000年内)是基本保持不变的,而物种进化的目的就是通过一代代的繁衍,逐渐适应(拟合)当前的环境,并和其他物种达到最优平衡(纳什均衡)。
遗传编程算法就是模拟了生物进化的过程,简单说来说,
- 生物进化的环境由一个用户定义的任务(user-defined task)所决定,算法由一组初始的题解(程序)开始展开竞争。这里所谓的任务可以是多种形式,
- 一种竞赛(game):各个题解(程序)在竞赛中直接展开竞争
- 个体测试:测出哪个题解(程序)的执行效果更好
- 遗传算法将基因抽象为题解中最小的随机变量因子(例如模型中的可变参数)
- 一个问题的解由很多这样的随机变化因子组成,算法将问题的解编码成个体的染色体(染色体是基因的集合)
- 单个个体包含若干个染色体,个体包含的染色体(题解)越多和越好,则个体的适应度就越好。在实际工程中,为了简化算法,常常假设一个个体只有一条染色体
- 多个个体组成种群,种群中适应度(Fitness)高的个体获得较高概率的繁殖机会,从而导致适应度高的个体会越来越多,经过N代的自然选择后,保存下来的个体都是适应度很高的
- 繁殖过程中,算法会评估并挑选出本轮表现最好的一部分题解题解(程序),并对程序的某些部分以随机(一定概率)的方式进行修改,包括:
- 基因交叉(Acrossover):在最优题解之间,挑选部分随机变量因子进行彼此互换。遗传算法交叉比人体内染色体交叉要简单许多。遗传算法的染色体是单倍体,而人体内的真正的染色体是双倍体。下图是遗传算法中两条染色体在中间进行交叉的示意图,
- 基因突变(Mutation):在最优题解上,直接对某些随机变量因子(基因位)进行随机修改。下图是遗传算法中一条染色体在第二位发生基因变异的示意图,
- 基因交叉(Acrossover):在最优题解之间,挑选部分随机变量因子进行彼此互换。遗传算法交叉比人体内染色体交叉要简单许多。遗传算法的染色体是单倍体,而人体内的真正的染色体是双倍体。下图是遗传算法中两条染色体在中间进行交叉的示意图,
- 经过繁殖过程,新的种群(即新的一组解)产生,称为“下一代”,理论上,这些新的题解基于原来的最优程序,但又不同于它们。这些新产生的题解和旧的最优题解会一起进入下一轮自然选择阶段
- 上述繁殖过程重复多次,直到达到收敛条件,包括,
- 找到了全局最优解
- 找到了表现足够好的解
- 题解在历经数代之后都没有得到任何改善
- 繁衍的代数达到了规定的限制
- 最终,历史上适应度最高个体所包含的解,作为遗传算法的输出
下图是遗传算法的流程图,
0x3:遗传编程的不同类型
从大的方面看,遗传编程的两个重要概念是基因型和表现型,
- 基因型就是种群个体的编码;
- 表现型是种群个体所表示的程序片段;
其实遗传算法领域的研究中,这两个方面的研究都有,但是,因为遗传编程很难直接处理程序片段(表现型)(例如一段C++可执行代码、或者是一段python代码),因为基于随机变异得到的新代码很可能无法通过编译器语法检查。
但是相比之下,遗传算法反而容易处理程序片段的内在结构(基因型)(例如C++代码的AST抽象语法树)。
所以,笔者认为基因型的遗传算法研究才是更有研究价值的一个方向,本文的讨论也会围绕基因型的遗传算法展开。
根据基因型形态的不同,遗传编程方法可以分为三种:
- 线性遗传编程
- 基于树的遗传编程
- 基于图的遗传编程
1. 线性遗传编程
线性遗传编程有广义和狭义之分,
- 广义线性遗传编程将候选程序编码进定长或者变长的字符串,即基因型是线性字符串,包括
- Multi-Expression Programming (MEP)
- Grammatical Evolution (GE)
- Gene Expression Programming (GEP)
- Cartesian Genetic Programming (CGP):该算法是一种很适合电路设计的遗传编程算法,比如我们要用两个加操作两个减操作和两个乘操作得到如下运算,
- 笛卡尔遗传编程将下面的一个候选程序编写进字符串”001 100 131 201 044 254 2573″。字符串中的三位数字“xyz”表示x操作的输入是y和z两个连线,字符串中最后的四位数字”opqr”表示输出opqr四个连线。笛卡尔遗传编程只用变异操作,而不用交叉操作。
- Genetic Algorithm for Deriving Software (GADS)
- 狭义线性遗传编程中的候选程序是汇编语言或者高级编程语言程序(例如C程序)。一个狭义线性遗传编程的个体可以是一段简单 C 语言指令,这些指令作用在一定数量预先定义的变量或者常量上(变量数量一般为指令个数的4倍)。下图是一个狭义线性遗传编程候选程序的示例,
- ,可以看到,变量数量和指令数量都是固定的,通过不同的排列组合方式得到不同的代码表现形式
http://www.doc88.com/p-630428999834.html https://pdfs.semanticscholar.org/958b/f0936eda72c3fc03a09a0e6af16c072449a1.pdf
2. 基于树的遗传编程
基于树的遗传编程的基因型是树结构。基于树的遗传编程是遗传编程最早的形态,也是遗传编程的主流方法。
大多数编程语言,在编译或解释时,首先会被转换成一棵解析树(Lisp编程语言及其变体,本质上就是一种直接访问解析树的方法),例如下图,
树上的节点有可能是枝节点也可能是叶节点,
- 枝节点代表了应用于其子节点之上的某一种操作
- 叶节点代表了某个参数或常量值
例如上图中,圆形节点代表了应用于两个分支(Y变量和常量值5)之上的求和操作。一旦我们求出了此处的计算值,就会将计算结果赋予上方的节点处。相应的,这一计算过程会一直向下传播,直到遍历所有的叶子节点(深度优先递归遍历)。
如果对整棵树进行遍历,我们会发现它相当于下面这个python函数:
在遗传变异方面,基于树的遗传编程的演化操作有两种,变异和交叉,
- 变异:基于树的遗传编程的变异操作有两种(区别在于变异的范围不同),
- 一种是随机变换树中的符号或者操作符
- 另一种是随机变换子树
- ,该图左下角是变换符号或者操作符的结果,右下角是变换子树的结果。
- 交叉:两个颗树之间随机交换子树
- ,两棵树之间的部分节点发生了随机互换
3. 基于图的遗传编程
树是一种特殊的图,因此人们很自然地想到将基于树的遗传编程扩展到基于图的遗传编程。下图就是基于图的遗传编程的基因型的一个示例。
Relevant Link:
《Adaptation in Natural and Artificial Systems》 John Henry Holland 1992 http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%b8%80%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%ae%80%e4%bb%8b 《Evolving Evolutionary Algorithms using Linear Genetic Programming (2005)》 《A comparison of several linear genetic programming techniques》Oltean, Mihai, and Crina Grosan. Complex Systems 14.4 (2003): 285-314. https://www.jianshu.com/p/a953066cb2eb
2. 遗传编程的数学基础
这个章节,我们用数学的形式化视角,来重新审视一下遗传算法。
0x1:基本数学符号定义
I | 种群中的个体 |
m | 所有可能个体的数量 |
n | 种群大小 |
pm | 变异概率 |
pc | 交叉概率 |
f(I) | 个体I的适应度。 |
p(I)t | 第t代种群中,个体I出现的概率 |
第t代种群平均适应度。第t代种群中个体适应度的平均值。 |
因为遗传算法中有各种各样的编码方式、变异操作、交叉操作和选择操作,遗传算法的形态呈现多样性。
为了简化分析,我们这里假设一个典型遗传算法,即,
- 编码方式是二进制编码:基因的取值只能是0或者1
- 变异操作将所有染色体所有基因位以恒定 pm 的概率翻转
- 交叉操作选择选择相邻的个体,以 pc 的概率决定是否需要交叉。如果需要交叉,随机选择一个基因位,并交换这个基因位以及之后的所有基因
- 每一代的新种群选择操作采用轮盘赌算法(依据概率大小):有放回地采样出原种群大小的新一代种群,个体 Ii 的采样概率如下所示,
0x2:模式定理 – 概率视角看基因模式的遗传
模式定理是遗传算法创始人 J.Holland 在其突破性著作《Adaptation in Natural and Artificial Systems》引入的,用于分析遗传算法的工作原理。
模式是指基因编码空间中,由一类相似的基因组抽象得到的pattern,比如 [0,*,*,1] 就是一个模式。染色体[0,1,0,1]和[0,0,0,1]都包含该模式。
在具体讨论模式定理之前,我们先介绍一些符号,
L(H) | 模式的长度。第一固定基因位和最后一个固定基因位的距离,其中L([0,*,*,1])=3。 |
O(H) | 模式的阶。固定基因位的个数,其中O([0,*,*,1])=2。 |
模式平均适应度。种群中包含该模式的个体适应度的平均值。 | |
p(H)t | 在第t代种群中,模式H出现的概率。 |
【模式定理】
在本章定义的典型遗传算法中,下面公式成立:
这个公式看起来有点复杂,其实非常简单,我们逐个部分来分解,
- 选择操作对模式H在下一代出现的影响是固定的,即:
- 某个模式在繁衍中,既有可能发生变异,也有可能发生交叉,所以有后面两个括号相乘
- 某个模式在变异中,变异操作将所有基因位以 pm 的概率翻转,因此模式H不被破坏的概率为(1−pm)O(H)。当0<=x<=1和n=1,…时,不等式(1−pm)O(H) >= 1−O(H)∗pm成立,从而经过变异操作,模式H的出现概率为,
- 某个模式在交叉中,交叉操作选择选择相邻的个体,以 pc 的概率决定是否需要交叉。如果需要交叉,随机选择一个基因位,并交换这个基因位以及之后的所有基因。因此模式H不被破坏的概率为(1−pc)(1−L(H)/L−1) >= 1 − pc∗L(H)/L−1。经过交叉操作,模式H的出现概率为,
总体来说,遗传算法需要在,选择操作引起的收敛性和变异交叉操作引起的多样性之间取得平衡。
模式定理的通俗说法是这样的,低阶、短序以及平均适应度高于种群平均适应度的模式在子代中呈指数增长。
低阶、短长以及平均适应度高于种群平均适应度的模式H,
此时,
即模式H呈现指数增长。
0x3:马尔柯夫链分析 – 遗传编程收敛性分析
这个小节我们来讨论一个有些理论化的问题,即:遗传编程算法经过一定次数的迭代后,是否会收敛到某个稳态?如果会达到稳态,遗传编程的收敛速度是多少?
要解决这个问题,我们需要引入数学工具,马尔柯夫链,有如下定义。
- 用 pt 表示第 t 时刻的不同状态的概率
- P 表示转移概率矩阵,其中 Pi,j表示从第 i 个状态转移到第 j 个状态的概率
- 齐次马尔科夫链的第 t+1 时刻的状态只和第 t 时刻有关,可以用公式 pt+1=ptP 表示
- 若存在一个自然数 k,使得 Pk 中的所有元素大于0,则称 P 为素矩阵。随着 k 趋近于无穷,Pk 收敛于 P∞=1Tp∞, 其中p∞=p0 limk→∞Pk=p0 是和初始状态无关的唯一值,并且所有元素大于0。这其实是由马尔柯夫链稳态定理决定的。
我们把整个种群的状态看成马尔科夫链的一个状态 s,交叉、变异和选择操作则构建了一个概率转移矩阵。一般情况下,0<pm<1,0<=pc<=1,即物种变异一定会发生,但不是必然100%发生。我们来分析一下这时的概率转移矩阵的性质。
- 让 C,M,S 分别表示交叉、变异和选择操作带来的概率转移,整体概率转移矩阵 P=CMS
- 经过变异操作,种群状态 si 转化成种群状态 sj 的概率 Mi,j=(pm)h(1−pm)n∗l-h>0,其中h是两个种群之间不同值的基因位数量。也就是说,M 是素矩阵
- 经过选择操作,种群状态 si 保持不变的概率,也就是说, S 的所有列必定有一元素大于0。我们也可以知道概率转移矩阵 P 是素矩阵
标准的优化算法分析第一个要关心的问题是,优化算法能不能收敛到全局最优点。假设全局最优点的适应度值为maxf,收敛到全局最优点的定义如下,
一言以蔽之,典型遗传算法并不收敛。
根据概率转移矩阵收敛定理,我们可以知道典型遗传算法会收敛到一个所有种群状态概率都大于0的概率分布上(稳态)。因此之后,不包含全局最优解的种群一定会不停出现,从而导致上面的公式不成立。
但是笔者这里要强调的是,这章讨论的典型遗传算法在实际工程中是几乎不存在的,实际上,几乎所有遗传算法代码都会将保持已发现最优解。加了这个变化之后的遗传算法是收敛的。
还是根据上述概率转移矩阵收敛定理,我们可以知道遗传算法会收敛到一个所有种群状态概率都大于0的概率分布上,那么包含全局最优解的种群一定会不停出现,保持已发现最优解的做法会使得上面的公式成立。
Relevant Link:
Adaptation in Natural and Artificial Systems: An Introductory Analysis with Applications to Biology, Control, and Artificial Intelligence Rudolph, Günter. “Convergence analysis of canonical genetic algorithms.” Neural Networks, IEEE Transactions on 5.1 (1994): 96-101. http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%b8%89%e6%95%b0%e5%ad%a6%e6%91%86%e6%91%86%e6%89%8b%ef%bc%8c%e5%be%88%e6%83%ad%e6%84%a7%ef%bc%8c%e5%8f%aa%e5%81%9a%e4%ba%86
3. 典型遗传算法的一些变种衍生算法
自 John Henry Holland 在1992年提出《Adaptation in Natural and Artificial Systems》论文后,遗传编程又得到了大量研究者的关注和发展,提出了很多改进型的衍生算法。虽然这些算法在工业场景中不一定都适用,但是笔者觉得我们有必要了解和学习一下它们各自的算法思想,有利于我们在遇到各自的实际问题的时候,举一反三。
0x1:交叉变种
典型交叉变异随机选择两条染色体,按照pc的概率决定是否交叉,如果选择交叉则随机选择一点并将这点之后的基因交换。这种交叉方式被称为单点杂交。
1. 多点杂交
多点杂交指定了多个交换点用于父本的基因交换重组,具体的执行过程如下图所示,
多点杂交改进的是突变率。
2. 均匀杂交
单点和多点杂交算法存在一个问题,杂交的染色体中某些部分的基因会被过早地舍弃,这是由于在交换前它们必须确定交换父本染色体交换位前面还是后面的基因,从而对于那些无关的基因段在交换前就已经收敛了。
均匀杂交算法(Uniform Crossover)就可以解决上述算法的这种局限性,该算法的主要过程如下:
- 首先随机选择染色体上的交换位
- 然后随机确定交换的基因是父本染色体上交换位的前部分基因,还是后部分基因(随机过程)
- 最后对父本染色体的基因进行重组从而产生新的下一代个体
3. 洗牌杂交
洗牌杂交的最大特点是通常将染色体的中点作为基因的交换点,即从每个父本中取它们一半的基因重组成新的个体。
另外针对于实值编码方式,还有离散杂交、中间杂交、线性杂交和扩展线性杂交等算法。
0x2:选择策略变种
精英保留策略是一种典型的选择策略。精英保留策略是指每次迭代都保留已发现的最优解。这个策略是显而易见的,我们不可能舍弃已发现的最优解,而只使用最后一代种群的最优解。同时,采用精英保留策略的典型遗传算法是保证收敛到全局最优解的。
1. 轮盘赌选择策略
轮盘赌选择策略是基于概率进行选择策略。轮盘赌算法有放回地采样出原种群大小的新一代种群,个体 Ii 的采样概率如下所示,
从概率上看,在某一轮中,即使是适应度最差的个体,也存在一定的几率能进入到下一轮,这种策略提高了多样性,但减缓了收敛性。
2. 锦标赛选择策略
锦标赛法从大小为 n 的种群随机选择 k(k小于n) 个个体,然后在 k 个个体中选择适应度最大的个体作为下一代种群的一个个体。反复多次,直到下一代种群有 n 个个体。
0x3:种群繁衍策略变种 – 多种群并行
在大自然,物种的进化是以多种群的形式并发进行的。一般来说,一个物种只有一个种群了,意味着这个物种有灭亡的危险(例如恐龙)。
受此启发,人们提出了多种群遗传算法。多种群遗传算法保持多个种群同时进化,具体流程如下图所示,
多种群遗传算法和遗传算法执行多次的区别在于移民,种群之间会通过移民的方式交换基因。这种移民操作会带来更多的多样性。
0x4:自适应遗传算法
遗传算法中,决定个体变异长度的主要因素有两个:交叉概率pc,和变异概率pm。
在实际工程问题中,需要针对不同的优化问题和目标,反复实验来确定pc和pm,调参成本很高。
而且在遗传算法训练的不同阶段,我们需要不同的pc和pm,
- 当种群中各个个体适应度趋于一致或者趋于局部最优时,使pc和pm增加,增加扰动。使得种群具有更大的多样性,跳出潜在的局部最优陷阱
- 当群体适应度比较分散时,使pc和pm减少。使得适应度高的个体和适应度低的个体保持分开,加快种群收敛速度
- 不同个体也应该有不同的pc和pm:
- 对于适应度高的个体,我们应该减少pc和pm以保护他进入下一代
- 反之对适应度低的个体,我们应该增加pc和pm以增加扰动,提高个体多样性
Srinivas.M and Patnaik.L.M (1994) 为了让遗传算法具备更好的自适应性,提出来自适应遗传算法。在论文中,pc和pm的计算公式如下:
0x5:混合遗传算法
遗传算法的全局搜索能力强,但局部搜索能力较弱。这句话怎么理解呢?
比如对于一条染色体,遗传算法并不会去看看这条染色体周围局部的染色体适应度怎么样,是否比这条染色体好。遗传算法会通过变异和交叉产生新的染色体,但新产生的染色体可能和旧染色差的很远。因此遗传算法的局部搜索能力差。
相对的,梯度法、爬山法和贪心法等算法的局部搜索能力强,运算效率也高。
受此启发,人们提出了混合遗传算法,将遗传算法和这些算法结合起来。混合遗传算法的框架是遗传算法的,只是生成新一代种群之后,对每个个体使用局部搜索算法寻找个体周围的局部最优点。
总体来说,遗传算法和梯度法分别代表了随机多样性优化和渐进定向收敛性优化的两种思潮,取各自的优点是一种非常好的思路。
Relevant Link:
Srinivas M, Patnaik L M. Adaptive probabilities of crossover and mutation in genetic algorithms[J]. Systems, Man and Cybernetics, IEEE Transactions on, 1994, 24(4): 656-667. http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e5%9b%9b%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%9a%84%e5%8f%98%e7%a7%8d
4. 用遗传编程自动生成一个能够拟合特定数据集的函数
0x1:用多项式回归拟合一个数据集
这个章节,我们来完成一个小实验,我们现在有一个数据集,数据集的生成算法如下:
# -*- coding: utf-8 -*- from random import random,randint,choice def hiddenfunction(x,y): return x**2 + 2*y + 3*x + 5 def buildhiddenset(): rows = [] for i in range(200): x=randint(0, 40) y=randint(0, 40) rows.append([x, y, hiddenfunction(x, y)]) return rows if __name__ == '__main__': print buildhiddenset()
部分数据例如下图:
可视化如下,
# -*- coding: utf-8 -*- import matplotlib.pyplot as plt import numpy as np from sklearn.preprocessing import PolynomialFeatures from sklearn.linear_model import LinearRegression from random import random,randint,choice from mpl_toolkits.mplot3d import axes3d def hiddenfunction(x,y): return x**2 + 2*y + 3*x + 5 def buildhiddenset(): X = [] y = [] for i in range(200): x_ = randint(0, 40) y_ = randint(0, 40) X.append([x_, y_]) y.append(hiddenfunction(x_, y_)) return np.array(X), np.array(y) if __name__ == '__main__': # generate a dataset X, y = buildhiddenset() print "X:", X print "y:", y fig = plt.figure() ax = fig.gca(projection='3d') ax.set_title("3D_Curve") ax.set_xlabel("x") ax.set_ylabel("y") ax.set_zlabel("z") # draw the figure, the color is r = read figure = ax.plot(X[:, 0], X[:, 1], y, c='r') plt.show()
很显然,必定存在一些函数,可以将(X,Y)映射为结果栏对应的数字,现在问题是这个(些)函数到底是什么?
从数据分析和数理统计分析的角度来看,这个问题似乎也不是很复杂。我们可以用多元线性回归来尝试拟合数据集。
# -*- coding: utf-8 -*- import matplotlib.pyplot as plt import numpy as np from sklearn.preprocessing import PolynomialFeatures from sklearn.linear_model import LinearRegression from random import random,randint,choice from mpl_toolkits.mplot3d import axes3d from sklearn.metrics import accuracy_score from sklearn.metrics import classification_report, confusion_matrix import numpy as np np.set_printoptions(threshold=np.inf) def hiddenfunction(x,y): return x**2 + 2*y + 3*x + 5 def buildhiddenset(): X = [] y = [] for i in range(100): x_ = randint(0, 40) y_ = randint(0, 40) X.append([x_, y_]) y.append(hiddenfunction(x_, y_)) return np.array(X), np.array(y) if __name__ == '__main__': # generate a dataset X, y = buildhiddenset() print "X:", X print "y:", y fig = plt.figure() ax = fig.gca(projection='3d') ax.set_title("3D_Curve") ax.set_xlabel("x") ax.set_ylabel("y") ax.set_zlabel("z") # draw the figure, the color is r = read #figure = ax.plot(X[:, 0], X[:, 1], y, c='r') #plt.show() # use Scikit-Learn PolynomialFeature class to constructing parameter terms. # a,b,degree=2: [a, b, a^2, ab, b^2] # a,b,degree=3: [a, b, a^2, ab, b^2, a^3, a^2b, ab^2, b^3] # a,b,c,degree=3: [a, b, c, a^2, ab, ac, b^2, bc, c^2, a^3, a^2b, a^2c, ab^2, ac^2, abc, b^3, b^2c, bc^2, c^3] poly_features = PolynomialFeatures(degree=2, include_bias=False) # fit the dataset with Polynomial Regression Function, and X_poly is the fitting X result X_poly = poly_features.fit_transform(X) lin_reg = LinearRegression() lin_reg.fit(X_poly, y) print(lin_reg.intercept_, lin_reg.coef_) y_predict = lin_reg.predict(X_poly) y_predict = [int(i) for i in y_predict] print "y_predict: ", y_predict print "y: ", y print 'accuracy for LinearRegression is: {0}'.format(accuracy_score(y, y_predict)) print 'error for LinearRegression is: {0}'.format(confusion_matrix(y, y_predict)) # draw the prediction curve X_new_1 = np.linspace(0, 40, 100).reshape(100, 1) X_new_2 = np.linspace(0, 40, 100).reshape(100, 1) X_new = np.hstack((X_new_1, X_new_2)) # fit the X_new dataset with Polynomial Regression Function, and X_new_poly is the fitting X result X_new_poly = poly_features.transform(X_new) y_new = lin_reg.predict(X_new_poly) y_new = [int(i) for i in y_new] #print "X_new: ", X_new #print "y_new: ", y_new #print "y: ", y # draw the prediction line figure = ax.plot(X[:, 0], X[:, 1], y, c='r') figure = ax.plot(X_new[:, 0], X_new[:, 1], y_new, c='b', linewidth=2, label="Predictions") plt.show()
红色是数据集,蓝色线是多项式拟合结果
多项式回归的拟合结果如下:
- mean_squared_error for LinearRegression is: 0.55
- accuracy for LinearRegression is: 0.45
因为这是一个回归分析问题,因此精度的要求非常高,虽然实际的误差并不是很高(0.55),使用多项式拟合获得了0.45的拟合精确度,不算特别高,出错的点还挺多的。
这么一看,好像多项式回归的效果很差了?并不是这样的。
我们来对比下模型学习到的多项式参数翻译成多项式函数,和原始数据集背后的目标函数形式的差距:
print(lin_reg.intercept_, lin_reg.coef_) (5.0000000000009095, array([ 3.00000000e+00, 2.00000000e+00, 1.00000000e+00, , ])) a,b,degree=2: [a, b, a^2, ab, b^2] 模型学习到的:H = 3*X + 2*Y + X^2 + 4.70974246e-17*X*Y -7.81622966e-16*Y^2 + 5 原始数据集目标函数:H = 3*X + 2*Y + X^2 + 5
可以看到,多项式回归出现了一些过拟合的现象,多出了两项:X*Y、和Y^2,如果没有这两项,那么多项式回归得到的方程就是完美的原始方程。
从上面的实验中,我们可以得到几点启发:
- 过拟合问题普遍存在:即使在一个简单的数据集上,用这么简单的多项式回归参数,都还是出现了过拟合问题。可见过拟合问题在实际机器学习工程项目中,的确是可能大量存在的。
- 冗余结构普遍存在:在上面的例子中,X*Y、和Y^2这两项是多出来的项,模型给这两个项分配的参数都是一个非常小的数字,小到几乎可以忽略(分别是4.70974246e-17和7.81622966e-16)。基本上来说,这两项对最终模型的预测结果的影响是很微小的,如果我们的场景是一个分类任务,那么这么小的权重几乎不会影响到最终的分类预测结果。因此,我们将这种被模型分配权重极小的项,称为冗余结构,冗余结构对分类任务的影响往往非常小。某种程度上来说,冗余结构缓解了过拟合问题
从这个问题,继续延伸思考,相信读者朋友在研究和工程项目中常常会遇到一个有趣的现象:针对某个固定的数据集,我们设计出一个非常精巧的神经网络,拥有上千万的参数。和另一个人用一个只有十几万参数的简单经典神经网络进行benchmark对比,发现性能并没有太大的提升,甚至没有提升,当然,性能也没有下降。咋一看,复杂模型和简单的效果一样好。
对这个问题的研究和解释,已经开始有非常多的学者和科研机构在进行,目前提出了很多的解释框架(例如teacher-student模型),笔者自己在项目中也同样遇到了这个问题。一个初步的看法是,对于一个特定场景的数据集来说,是存在一个确定的复杂度上限的。现在如果有两个模型,一个模型刚刚好达到这个复杂度上限,另一个模型远远超过了这个复杂度上限,那么后面那个模型就会出现所谓的过拟合现象,但是,模型会通过权重分配,将高出复杂度上限之外的多余项(神经元)都分配0或者非常低的权重,让这部分神经元称为冗余项。最终,不管我们用多么复杂的模型,起作用的永远只有那一小部分神经元在发挥作用,预测的效果也一样好,过拟合问题并没有影响最终的分类预测结果。
笔者小节:
使用多项式回归对一个数据集进行拟合这种做法,本质上是一种先天经验主义思维,它的科学假设是”复杂可约性“。所谓复杂可约性是复杂理论中的一个概念,指的是一个复杂的系统可以通过一个简化的模型进行约简概括,通过历史的样本学习这个约简系统的结构参数,同时假设这个约简系统能够比真实的复杂系统推算的更快,从而借助约简系统对复杂系统进行未来预测。
上面的话说的有一些绕,简单来说就是,不管数据集背后的真实目标函数是什么,我们都可以用像低阶多项式函数这种简单模型来进行拟合学习,并利用学习到的模型对未来可能出现的新数据集进行预测。
这种基于先验的简化建模思想在今天的机器学习学术界和工业界应用非常广泛,也发挥了很好的效果。但其实我们还有另一种看世界的角度,那就是随机过程。随机过程事先不做任何假设,而是基于随机或者遵循某种策略下的随机,不断进行自我迭代与优化,从一种混沌状态逐渐收敛到目标状态。
0x2:用遗传编程拟合同一个数据集
在这个章节,我们要讨论遗传编程,还是上面那个问题,我们现在换一种思维来想:我们所见的数据集背后无非是一个函数公式来决定的,而函数公式又是由一些基本的”数学符号“以及”数学运算符“号组合而成的,数学符号和数学运算符的组合方式我们将其视为一个符号搜索空间,我们直接去搜索这个符号搜索空间即可,通过数据集作为反馈,直到收敛为止,即找到了完美的目标函数。
接下来我们逐个小节来分解任务,一步步达到我们的目标。
1. 构造树状程序(tree programs)基础数据结构及操作函数
我们可以用python构造出树状程序的基础数据结构。这棵树由若干节点组成,根据与之关联的函数的不同,这些节点又可以拥有一定数量的子节点。
有些节点将会返回传递给程序的参数;另一些则会返回常量;还有一些则会返回应用于其子节点之上的操作。
1)fwrapper
一个封装类,对应于”函数型“节点上的函数。其成员变量包括了函数名称、函数本身、以及该函数接受的参数个数(子节点)。
class fwrapper: def __init__(self,function,params,name): self.function=function self.childcount=param self.name=name
2)node
对应于函数型节点(带子节点的节点)。我们以一个fwrapper类对其进行初始化。当evaluate被调用时,我们会对各个子节点进行求值运算,然后再将函数本身应用于求得的结果。
class node: def __init__(self,fw,children): self.function=fw.function self.name=fw.name self.children=children def evaluate(self,inp): results=[n.evaluate(inp) for n in self.children] return self.function(results) def display(self,indent=0): print (' '*indent)+self.name for c in self.children: c.display(indent+1)
3)paramnode
这个类对应的节点只返回传递给程序的某个参数。其evaluate方法返回的是由idx指定的参数。
class paramnode: def __init__(self,idx): self.idx=idx def evaluate(self,inp): return inp[self.idx] def display(self,indent=0): print '%sp%d' % (' '*indent,self.idx)
4)constnode
返回常量值的节点。其evaluate方法仅返回该类被初始化时所传入的值。
class constnode: def __init__(self,v): self.v=v def evaluate(self,inp): return self.v def display(self,indent=0): print '%s%d' % (' '*indent,self.v)
5)节点操作函数
除了基础数学符号数据结构之外,我们还需要定义一些针对节点的操作函数。
一些简单的符号运算符(例如add、subtract),可以用lambda内联方式定义,另外一些稍微复杂的运算符则需要在单独的语句块中定义,不论哪种情况,都被会封装在一个fwrapper类中。
addw=fwrapper(lambda l:l[0]+l[1],2,'add') subw=fwrapper(lambda l:l[0]-l[1],2,'subtract') mulw=fwrapper(lambda l:l[0]*l[1],2,'multiply') def iffunc(l): if l[0]>0: return l[1] else: return l[2] ifw=fwrapper(iffunc,3,'if') def isgreater(l): if l[0]>l[1]: return 1 else: return 0 gtw=fwrapper(isgreater,2,'isgreater') flist=[addw,mulw,ifw,gtw,subw]
现在,我们可以利用前面创建的节点类来构造一个程序树了,我们来尝试写一个符号推理程序,
# -*- coding: utf-8 -*-
import gp
def exampletree():
# if arg[0] > 3:
# return arg[1] + 5
# else:
# return arg[1] - 2
return gp.node(
gp.ifw, [
gp.node(gp.gtw, [gp.paramnode(0), gp.constnode(3)]),
gp.node(gp.addw, [gp.paramnode(1), gp.constnode(5)]),
gp.node(gp.subw, [gp.paramnode(1), gp.constnode(2)])
]
)
if __name__ == '__main__':
exampletree = exampletree()
# expected result = 1
print exampletree.evaluate([2, 3])
# expected result = 8
print exampletree.evaluate([5, 3])
至此,我们已经成功在python中构造出了一个以树为基础的语言和解释器。
2. 初始化一个随机种群(函数初始化)
现在我们已经有能力进行形式化符号编程了,回到我们的目标,生成一个能够拟合数据集的函数。首先第一步是需要随机初始化一个符号函数,即初始化种群。
创建一个随机程序的步骤包括:
- 创建根节点并为其随机指定一个关联函数,然后再随机创建尽可能多的子节点
- 递归地,父节点创建的子节点也可能会有它们自己的随机关联子节点
def makerandomtree(pc,maxdepth=4,fpr=0.5,ppr=0.6): if random()<fpr and maxdepth>0: f=choice(flist) children=[makerandomtree(pc,maxdepth-1,fpr,ppr) for i in range(f.childcount)] return node(f,children) elif random()<ppr: return paramnode(randint(0,pc-1)) else: return constnode(randint(0,10))
该函数首先创建了一个节点并为其随机选了一个函数,然后它遍历了随机选中的函数所需的子节点,针对每一个子节点,函数通过递归调用makerandomtree来创建新的节点。通过这样的方式,一颗完整的树就被构造出来了。
仅当被随机选中的函数不再要求新的子节点时(即如果函数返回的是一个常量或输入参数时),向下创建分支的过程才会结束。
3. 衡量种群个体的好坏
按照遗传编程算法的要求,每一轮迭代中都要对种群个体进行定量评估,得到一个个体适应性的排序。
与优化技术一样,我摩恩必须找到一种衡量题解优劣程度的方法,很多场景下,优劣程度并不容易定量评估(例如网络安全中常常是非黑即白的二分类)。但是在本例中,我们是在一个数值型结果的基础上对程序进行测试,因此可以很容易通过绝对值误差进行评估。
def scorefunction(tree,s): dif=0 for data in s: v=tree.evaluate([data[0],data[1]]) dif+=abs(v-data[2]) return dif
我们来试一试初始化的随机种群的适应性评估结果,
# -*- coding: utf-8 -*- import gp if __name__ == '__main__': hiddenset = gp.buildhiddenset() random1 = gp.makerandomtree(2) random2 = gp.makerandomtree(2) print gp.scorefunction(random1, hiddenset) print gp.scorefunction(random2, hiddenset)
随机初始化的函数种群的适应性并不是很好,这符合我们的预期。
4. 对程序进行变异
当表现最好的程序被选定之后,它们就会被复制并修改以进入到下一代。前面说到,遗传变异有两种方式,mutation和crossover,
1)mutation
变异的做法是对某个程序进行少量的修改,一个树状程序可以有多种修改方式,包括:
- 改变节点上的函数
- 改变节点的分支
- 改变节点所需子节点数目
- 删除旧分支
- 增加新的分支
- 用全新的树来替换某一子树
需要注意的是,变异的次数不宜过多(基因突变不能太频繁)。例如,我们不宜对整棵树上的大多数节点都实施变异,相反,我们可以位任何需要进行修改的节点定义一个相对较小的概率。从树的根节点开始,如果每次生成的随机数小于该概率值,就以如上所述的某种方式对节点进行变异。
def mutate(t,pc,probchange=0.1): if random()<probchange: return makerandomtree(pc) else: result=deepcopy(t) if hasattr(t,"children"): result.children=[mutate(c,pc,probchange) for c in t.children] return result
2)crossover
除了变异,另一种修改程序的方法被称为交叉或配对,即:从本轮种群中的优秀适应着中,选出两个将其进行部分子树交换。执行交叉操作的函数以两棵树作为输入,并同时开始向下遍历,当到达某个随机选定的阈值时,该函数便会返回前一棵树的一份拷贝,树上的某个分支会被后一棵树上的一个分支所取代。通过同时对两棵树的即时遍历,函数会在每棵树上大致位于相同层次的节点处实施交叉操作。
def crossover(t1,t2,probswap=0.7,top=1): if random()<probswap and not top: return deepcopy(t2) else: result=deepcopy(t1) if hasattr(t1,'children') and hasattr(t2,'children'): result.children=[crossover(c,choice(t2.children),probswap,0) for c in t1.children] return result
读者朋友可能会注意到,对于某次具体的变异或者交叉来说,新的种群个体并不一定会带来更好的性能,实际上,新种群个体的性能几乎完全是随机的。从生物进化论的角度来说,遗传变异是无方向的,随机的,遗传变异的目标仅仅是引入多样性,造成演化的是环境选择压(数据集的误差反馈)。
5. 持续迭代演化
现在,我们将上面的步骤串起来,让遗传演化不断的循环进行。本质上,我们的思路是要生成一组随机程序并择优复制和修改,然后一直重复这一过程直到终止条件满足为止。
def getrankfunction(dataset):
def rankfunction(population):
scores=[(scorefunction(t,dataset),t) for t in population]
scores.sort()
return scores
return rankfunction
def evolve(pc,popsize,rankfunction,maxgen=500,
mutationrate=0.1,breedingrate=0.4,pexp=0.7,pnew=0.05):
# Returns a random number, tending towards lower numbers. The lower pexp
# is, more lower numbers you will get
def selectindex():
return int(log(random())/log(pexp))
# Create a random initial population
population=[makerandomtree(pc) for i in range(popsize)]
for i in range(maxgen):
scores=rankfunction(population)
print "function score: ", scores[0][0]
if scores[0][0]==0: break
# The two best always make it
newpop=[scores[0][1],scores[1][1]]
# Build the next generation
while len(newpop)<popsize:
if random()>pnew:
newpop.append(mutate(
crossover(scores[selectindex()][1],
scores[selectindex()][1],
probswap=breedingrate),
pc,probchange=mutationrate))
else:
# Add a random node to mix things up
newpop.append(makerandomtree(pc))
population=newpop
scores[0][1].display()
return scores[0][1]
上述函数首先创建一个随机种群,然后循环至多maxgen次,每次循环都会调用rankfunction对程序按表现从优到劣的顺序进行排列。表现优者会不加修改地自动进入到下一代,我们称这样的方法为精英选拔发(elitism)。
至于下一代中的其他程序,则是通过随机选择排名靠前者,再经过交叉和变异之后得到的。
这个过程是一直重复下去,知道某个程序达到了完美的拟合适配(损失为0),或者重复次数达到了maxgen次为止。
evolve函数有多个参数,用以从不同方面对竞争环境加以控制,说明如下:
- rankfunction:对应一个函数,将一组程序从优到劣的顺序进行排列
- mutationrate:代表发生变异的概率
- breedingrate:代表发生交叉的概率
- popsize:初始种群的大小
- probexp:表示在构造新种群时,”选择评价较低的程序“这一概率的递减比例。该值越大,相应的筛选过程就越严格,即只选择评价最高的多少比例的个体作为复制对象
- probnew:表示在构造新种群时,”引入一个全新的随机程序“的概率,该参数和probexp是”种群多样性“的重要决定参数
# -*- coding: utf-8 -*- import gp if __name__ == '__main__': rf = gp.getrankfunction(gp.buildhiddenset()) gp.evolve(2, 500, rf, mutationrate=0.2, breedingrate=0.1)
程序运行的非常慢,在笔者的mac上运行了15min才最终收敛到0。有意思的是,尽管这里给出的解是完全正确的,但是它明显比我们数据集背后的真实目标函数要复杂得多,也就是说发生了过拟合。
但是,我们如果运用一些代数知识,将遗传编程得到函数进行约简,会发现它和目标函数其实是等价的(p0为X,p1为Y)。
((X+6)+Y)+X + (if( (X*Y)>0 ){X}else{X} + X*X) + (X + (Y - (X + if(6>0){1}else{0})) ) # X*Y恒大于0 2*X + Y + 6 + X + X**2 + (X + (Y - (X + if(6>0){1}else{0})) ) # 6恒大于0 2*X + Y + 6 + X + X**2 + X + Y - X + 1 X**2 + 3*X + 2*Y + 5
可以看到,遗传编程这种基于内显性的构造方式,可以在形式上得到一个全局最优解,这点上比基于优化算法的逼近方法要好。
同时,上述例子告诉我们遗传编程的一个重要特征:遗传算法找到的题解也许是完全正确的,亦或是非常不错的。但是通常这些题解远比真实的目标函数要复杂得多。在遗传编程得到的题解中,我们发现有很多部分是不做任何工作的,或者对应的是形式复杂,但始终都只返回同一结果的公式,例如”if(6>0){1}else{0})”,只是1的一种多余的表达方式而已。
0x3:从遗传编程优化结果看过拟合问题的本质
从上一章的遗传优化结果我们可以看到,在数据集是充分典型集的情况下,过拟合是不影响模型的收敛的。
遗传编程的这种冗余性和优化算法和深度神经网络中的冗余结构本质上是一致的,这是一个冗余的过拟合现象,即程序的题解非常复杂,但是却对最终的决策没有影响,唯一的缺点就是浪费了很多cpu时钟。
但是另一方面,这种冗余过拟合带来的一个额外的风险是,”如果数据集是非典型的,那么过拟合就会导致严重的后果“。
我们需要明白的是,过拟合的罪魁祸首不是模型和优化算法,而恰恰是数据集本身。在本例中我们清楚地看到,当我们能够得到一个完整的典型集训练数据时,过拟合问题就会退化成一个冗余鲁棒可约结构。
但是反之,如果我们的数据集因为系统噪声或采样不完全等原因,没有拿到目标函数的典型集,那么由于复杂模型带来的过拟合问题就会引发很严重的预测偏差。我们来稍微修改一下代码,将原始数据集中随机剔除1/10的数据,使数据的充分性和典型性下降,然后再来看遗传编程最后的函数优化结果,
# -*- coding: utf-8 -*- import gp if __name__ == '__main__': hiddenset = gp.buildhiddenset() # 按照 5% 模来采样,即剔除1/10的数据,模拟采样不完全的情况 cnt = 0 for i in hiddenset: if cnt % 10 == 0: hiddenset.remove(i) cnt += 1 print hiddenset rf = gp.getrankfunction(hiddenset) gp.evolve(2, 500, rf, mutationrate=0.2, breedingrate=0.1)
得到的函数约简结果为:
if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} + (Y+4) + X^2 + (Y + (X + (X+X))) # if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} 不可约 if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} + X**2 + 3*X + 2*Y + 4 # 真实的目标函数为: X**2 + 3*X + 2*Y + 5
可以看到,在数据集不完整的情况下,遗传算法就算完美拟合了训练集,但是也无法真正逼近目标函数,但这不是遗传算法的问题,而是受到数据集的制约。
更重要的是,因为数据集的非典型性(数据概率分布缺失),导致模型引入了真正的”过拟合复杂结构“,即”if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0}“,这是一个区间函数。要知道,这仅仅是一个非常小的例子,尚且引入了如此的不确定性,在更复杂和更复杂的问题中,数据集的概率分布缺失会引发更大的”多余过拟合复杂结构问题“,影响程度的多少,根据数据缺失的程度而定。
这反过来提醒了我们这些数据科学工作者,在解决一个实际问题的时候,不要过分纠结你的模型是不是足够好,层数是不是足够深,而要更多地关注你的数据集,数据集的质量直接决定了最终的模型效果。更进一步地说,如果你能找到一种方法,能100%拿到目标函数的数据全集,那么恭喜你,随便用一个机器学习模型都可以取得很好的效果。
0x4:多样性的重要性
对于遗传编程,我们还需要再谈一个关键问题,即多样性问题。
我们看到evolve函数中,会将最优的个体直接进入下一代,除此之外,对排名之后的个体也会按照比例和概率选择性地进行复制和修改以形成新的种群,这种做法有什么意义呢?
最大的问题在于,仅仅选择表现最优异的少数个体,很快就会使种群变得极端同质化(homogeneous),或称为近亲交配。尽管种群中所包含的题解,表现都非常不错,但是它们彼此间不会有太大的差异,因为在这些题解间进行的交叉操作最终会导致群内的题解变得越来越相似。我们称这一现象为达到局部最大化(local maxima)。
对于种群而言,局部最大化是一种不错的状态(即收敛了),但还称不上最佳的状态。因为处于这种状态的种群里,任何细小的变化都不会对最终的结果产生太大的变化。这就是一个哲学上的矛盾与对立,收敛稳定与发散变化是彼此对立又统一的,完全偏向任何一方都是不对的。
事实表明,将表现极为优异的题解和大量成绩尚可的题解组合在一起,往往能够得到更好的结果。基于这个原因,evolve提供了两个额外的参数,允许我们对筛选进程中的多样性进行调整。
- 通过降低probexp的值,我们允许表现较差的题解进入最终的种群之中,从而将”适者生存(survival of fittest)“的筛选过程调整为”最适应者及其最幸运者生存(survival of the fittest and luckiest)“
- 通过增加probnew的值,我们还允许全新的程序被随机地加入到种群中
这两个参数都会有效地增加进化过程中的多样性,同时又不会对进程有过多的扰乱,因为,表现最差的程序最终总是会被剔除掉的(遗传编程的马尔科夫收敛性)。
Relevant Link:
《集体智慧编程》
5. 用遗传编程自动得到正则表达式生成器 – Regex Golf Problem
0x1:问题描述
我们需要生成一段正则表达式,这个正则表达式需要能够匹配到所有的M数据集,同时不匹配所有的U数据集,且同时还要尽量短,即不能是简单的M数据集的并集拼接。
定义一个目标(损失)函数来评估每次题解的好坏,
,其中nM代表匹配M的个数,nU代表匹配U的个数,wI代表奖励权重,r代表该正则表达式的长度
算法优化的目标是使上式尽量大。
0x2:从一个贪婪算法说起
在讨论遗传编程之前,我们先从常规思路,用一种贪婪迭代算法来解决这个问题。我们的数据集如下,
# -*- coding: utf-8 -*- from __future__ import division import re import itertools def words(text): return set(text.split()) if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') print M & U
首先,确认了U和M之间不存在交集,这道题理论上是有解的,否则无解。
1. 定义可行解判断条件
我们先准确定义出什么时候意味着得到了一个可行解,
# -*- coding: utf-8 -*- from __future__ import division import re import itertools def words(text): return set(text.split()) def mistakes(regex, M, U): "The set of mistakes made by this regex in classifying M and U." return ({"Should have matched: " + W for W in M if not re.search(regex, W)} | {"Should not have matched: " + L for L in U if re.search(regex, L)}) def verify(regex, M, U): assert not mistakes(regex, M, U) return True if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') some_answer = "a*" print mistakes(some_answer, M, U)
可以看到,当我们输入正则”a*“的时候出现了很多错误,显然”a*“不是我们要的答案。读者朋友可以试着输入”foo“试试。
2. 寻找可行解的策略
- 对M中的每个词(短语)都进行一次正则候选集构造,包括以下步骤:
- 遍历M中的每一个词的每一次字符,并过滤掉特殊字符(*+?^$.[](){}|\),然后在中间遍历插入”*+?“,这是字符级别的混合交叉
- 对M中的每一个词都加上首尾定界符,例如”^it$“,得到一个wholes词集
- 对wholes词集进行ngram切分,得到一个ngram词集,例如对于词”^it$“来说,可以得到{‘^’, ‘i’, ‘t’, ‘$’, ‘^i’, ‘it’, ‘t$’, ‘^it’, ‘it$’, ‘^it$’},作为一个正则串池。可以这么理解,这个池中的每个正则串都至少能命中一个M中的元素
- 遍历上一步得到的正则串池中所有元素,逐字符用”.“字符进行替换,例如对于”^it$”来说,可以得到{‘^it$’, ‘^i.$’, ‘^.t$’, ‘^..$’}
- 遍历上一步dotify的词集,逐字符遍历插入”*+?“这种repetition控制符,例如对于”a.c“来说,可以得到{‘a+.c’, ‘a*.c’, ‘a?.c’,’a.c+’, ‘a.*c’, ‘a.?c’,’a.+c’, ‘a.c*’, ‘a.c?’},需要注意的是,在首位定界符前后不要加repetition控制符,同时不要同时加入2个repetition控制符
- 从题解候选集中,筛选出至少能够匹配一个以上M,但是不匹配U的正则子串,这一步得到一个题解正则候选子集。这是一个贪婪迭代式的思想,它不求一步得到一条能够匹配所有M的正则,而是寻找一些能够解决一部分问题的正则子串,将困难问题分而治之
- 使用OR拼接将题解正则候选子集拼接起来,例如”ab | cd“
上面构造正则候选集的过程说的可能有些抽象,这里通过代码示例来说明一下,
# -*- coding: utf-8 -*- from __future__ import division import re import itertools OR = '|'.join # Join a sequence of strings with '|' between them cat = ''.join # Join a sequence of strings with nothing between them Set = frozenset # Data will be frozensets, so they can't be mutated. def words(text): return set(text.split()) def mistakes(regex, M, U): "The set of mistakes made by this regex in classifying M and U." return ({"Should have matched: " + W for W in M if not re.search(regex, W)} | {"Should not have matched: " + L for L in U if re.search(regex, L)}) def verify(regex, M, U): assert not mistakes(regex, M, U) return True def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=4): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def dotify(part): "Return all ways to replace a subset of chars in part with '.'." choices = map(replacements, part) return {cat(chars) for chars in itertools.product(*choices)} def replacements(c): return c if c in '^$' else c + '.' def regex_covers(M, U): """Generate regex components and return a dict of {regex: {winner...}}. Each regex matches at least one winner and no loser.""" losers_str = 'n'.join(U) wholes = {'^'+winner+'$' for winner in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} reps = {r for p in parts for r in repetitions(p)} pool = wholes | parts | pairs(M) | reps searchers = {p:re.compile(p, re.MULTILINE).search for p in pool} return {p: Set(filter(searchers[p], M)) for p in pool if not searchers[p](losers_str)} def pairs(winners, special_chars=Set('*+?^$.[](){}|\')): chars = Set(cat(winners)) - special_chars return {A+'.'+q+B for A in chars for B in chars for q in '*+?'} def repetitions(part): """Return a set of strings derived by inserting a single repetition character ('+' or '*' or '?'), after each non-special character. Avoid redundant repetition of dots.""" splits = [(part[:i], part[i:]) for i in range(1, len(part)+1)] return {A + q + B for (A, B) in splits # Don't allow '^*' nor '$*' nor '..*' nor '.*.' if not (A[-1] in '^$') if not A.endswith('..') if not (A.endswith('.') and B.startswith('.')) for q in '*+?'} def tests(): assert subparts('^it$') == {'^', 'i', 't', '$', '^i', 'it', 't$', '^it', 'it$', '^it$'} assert subparts('this') == {'t', 'h', 'i', 's', 'th', 'hi', 'is', 'thi', 'his', 'this'} assert dotify('it') == {'it', 'i.', '.t', '..'} assert dotify('^it$') == {'^it$', '^i.$', '^.t$', '^..$'} assert dotify('this') == {'this', 'thi.', 'th.s', 'th..', 't.is', 't.i.', 't..s', 't...', '.his', '.hi.', '.h.s', '.h..', '..is', '..i.', '...s', '....'} assert repetitions('a') == {'a+', 'a*', 'a?'} assert repetitions('ab') == {'a+b', 'a*b', 'a?b', 'ab+', 'ab*', 'ab?'} assert repetitions('a.c') == {'a+.c', 'a*.c', 'a?.c', 'a.c+', 'a.*c', 'a.?c', 'a.+c', 'a.c*', 'a.c?'} assert repetitions('^a..d$') == {'^a+..d$', '^a*..d$', '^a?..d$', '^a..d+$', '^a..d*$', '^a..d?$'} assert pairs({'ab', 'c'}) == { 'a.*a', 'a.*b', 'a.*c', 'a.+a', 'a.+b', 'a.+c', 'a.?a', 'a.?b', 'a.?c', 'b.*a', 'b.*b', 'b.*c', 'b.+a', 'b.+b', 'b.+c', 'b.?a', 'b.?b', 'b.?c', 'c.*a', 'c.*b', 'c.*c', 'c.+a', 'c.+b', 'c.+c', 'c.?a', 'c.?b','c.?c'} assert len(pairs({'1...2...3', '($2.34)', '42', '56', '7-11'})) == 8 * 8 * 3 return 'tests pass' if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') some_answer = "a*" # print mistakes(some_answer, M, U) print tests()
笔者思考:
集合覆盖问题(set cover problem)是一个NP问题,几乎没有办法直接得到全局最优解。对这类复杂问题,一个有效的优化逼近方式就是贪婪迭代逼近,每次都求解一个局部最优值(例如每次生成一个能够覆盖最大M集合,但是不匹配U集合的正则子串),最后通过将所有局部最优解Ensemble起来得到一个最终题解(集成学习思想)。
3. 穷举得到最终题解
我们已经有了生成题解候选集的函数,也有了评估题解是否正确的损失函数,我们现在可以来将他们组合起来,用于生成我们的目标题解。
前面说过,我们的算法是一个迭代式的贪婪算法,因此,我们每次寻找一个能够最大程度匹配尽量多M的正则子串,然后将本轮已经匹配到的M子串删除,并对余下的M子串继续搜索答案,直到所有的M子串都被成功匹配为止。
# -*- coding: utf-8 -*- from __future__ import division import re import itertools OR = '|'.join # Join a sequence of strings with '|' between them cat = ''.join # Join a sequence of strings with nothing between them Set = frozenset # Data will be frozensets, so they can't be mutated. def words(text): return set(text.split()) def mistakes(regex, M, U): "The set of mistakes made by this regex in classifying M and U." return ({"Should have matched: " + W for W in M if not re.search(regex, W)} | {"Should not have matched: " + L for L in U if re.search(regex, L)}) def verify(regex, M, U): assert not mistakes(regex, M, U) return True def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=4): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def dotify(part): "Return all ways to replace a subset of chars in part with '.'." choices = map(replacements, part) return {cat(chars) for chars in itertools.product(*choices)} def replacements(c): return c if c in '^$' else c + '.' def regex_covers(M, U): """Generate regex components and return a dict of {regex: {winner...}}. Each regex matches at least one winner and no loser.""" losers_str = 'n'.join(U) wholes = {'^'+winner+'$' for winner in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} reps = {r for p in parts for r in repetitions(p)} pool = wholes | parts | pairs(M) | reps searchers = {p:re.compile(p, re.MULTILINE).search for p in pool} return {p: Set(filter(searchers[p], M)) for p in pool if not searchers[p](losers_str)} def pairs(winners, special_chars=Set('*+?^$.[](){}|\')): chars = Set(cat(winners)) - special_chars return {A+'.'+q+B for A in chars for B in chars for q in '*+?'} def repetitions(part): """Return a set of strings derived by inserting a single repetition character ('+' or '*' or '?'), after each non-special character. Avoid redundant repetition of dots.""" splits = [(part[:i], part[i:]) for i in range(1, len(part)+1)] return {A + q + B for (A, B) in splits # Don't allow '^*' nor '$*' nor '..*' nor '.*.' if not (A[-1] in '^$') if not A.endswith('..') if not (A.endswith('.') and B.startswith('.')) for q in '*+?'} def tests(): assert subparts('^it$') == {'^', 'i', 't', '$', '^i', 'it', 't$', '^it', 'it$', '^it$'} assert subparts('this') == {'t', 'h', 'i', 's', 'th', 'hi', 'is', 'thi', 'his', 'this'} assert dotify('it') == {'it', 'i.', '.t', '..'} assert dotify('^it$') == {'^it$', '^i.$', '^.t$', '^..$'} assert dotify('this') == {'this', 'thi.', 'th.s', 'th..', 't.is', 't.i.', 't..s', 't...', '.his', '.hi.', '.h.s', '.h..', '..is', '..i.', '...s', '....'} assert repetitions('a') == {'a+', 'a*', 'a?'} assert repetitions('ab') == {'a+b', 'a*b', 'a?b', 'ab+', 'ab*', 'ab?'} assert repetitions('a.c') == {'a+.c', 'a*.c', 'a?.c', 'a.c+', 'a.*c', 'a.?c', 'a.+c', 'a.c*', 'a.c?'} assert repetitions('^a..d$') == {'^a+..d$', '^a*..d$', '^a?..d$', '^a..d+$', '^a..d*$', '^a..d?$'} assert pairs({'ab', 'c'}) == { 'a.*a', 'a.*b', 'a.*c', 'a.+a', 'a.+b', 'a.+c', 'a.?a', 'a.?b', 'a.?c', 'b.*a', 'b.*b', 'b.*c', 'b.+a', 'b.+b', 'b.+c', 'b.?a', 'b.?b', 'b.?c', 'c.*a', 'c.*b', 'c.*c', 'c.+a', 'c.+b', 'c.+c', 'c.?a', 'c.?b','c.?c'} assert len(pairs({'1...2...3', '($2.34)', '42', '56', '7-11'})) == 8 * 8 * 3 return 'tests pass' def findregex(winners, losers, k=4, addRepetition=False): "Find a regex that matches all winners but no losers (sets of strings)." # Make a pool of regex parts, then pick from them to cover winners. # On each iteration, add the 'best' part to 'solution', # remove winners covered by best, and keep in 'pool' only parts # that still match some winner. if addRepetition: pool = regex_covers(winners, losers) else: pool = regex_parts(winners, losers) solution = [] def score(part): return k * len(matches(part, winners)) - len(part) while winners: best = max(pool, key=score) solution.append(best) winners = winners - matches(best, winners) pool = {r for r in pool if matches(r, winners)} return OR(solution) if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') solution = findregex(M, U, addRepetition=True) if verify(solution, M, U): print len(solution), solution solution = findregex(M, U, addRepetition=False) if verify(solution, M, U): print len(solution), solution
4. 尝试生成一段描述恶意webshell样本的零误报正则
我们来做一个和网络安全相关的实验,我们现在有黑白两份样本,分别代表M和U,我们现在尝试用本节讨论的算法来生成一段正则。
但是笔者在实际操作中发现,用regex golf这种问题的搜索空间是非常巨大的,当M和U的规模扩大时(例如大量webshell文件),所产生的正则子串候选集会是一个巨量的天文数字,该算法本质上还是相当于在进行穷举搜索,搜索效率十分低下。
更进一步地,笔者尝试继续扩大黑白样本量(超过10的时候),算法已经无法搜索出有效的正则题解,这说明,当黑白样本超过一定数量的时候,alpha字符空间中黑白样本已经存在交叉,全局解不存在。
0x3:用遗传编程来自动搜索最优正则题解
还是上一小节的Regex golf问题,现在我们来尝试用遗传编程来优化搜索效率。
1. 解题策略分析
原论文的策略是基于遗传算法生成一个”xx | xx“的双正则子串,即每次得到的个体最多有两个子串,然后按照上一小节中类似的贪婪策略进行逐步OR拼接。
笔者这里决定修改一下思路,直接基于遗传编程对整个题解空间进行搜索,即构造一个完整题解的regex tree,这是一种全局最优解搜索的优化思路。
2. 基础数据结构定义
我们的总体策略依然是贪婪分段策略,也就说,我们要寻找的最终正则题解是由很多个”|“组成的分段正则表达式。现在我们来定义我们的题解中可能出现的基本元素,这里,我们依然采用树结构作为基础数据结构的承载:
- ”ROOT“:根节点,一棵树有且只有一个根节点,根节点一定是一个”|“分裂节点,即一个题解必须包含2个及2个以上的正则子串
- ”|“:代表一个分裂符号,树结构从这里分裂一次,分裂符号的参数分别是两个placeholder占位符
- dot(”.“):代表一个占位符,用于保存子节点信息,每个占位符解析完毕后都会在头尾加入定界符”^“和”$“,例如”^ab$ | ^cd$“
- 字符串:由M序列的ngram序列组成的集合(2 <= n <= 4),例如”foo“
- 修饰符:包括
- ”.*+“
- ”.++“
- ”.?+“
- ”.{.,.}+“:花括号内部包含两个占位符,定义域为正整数,且参数2大于等于参数1
- ”(.)“:组,括号内部包含一个占位符
- ”[.]“:中括号内部包含一个占位符
- ”[^.]“:取非的字符类
- ”..“:连接符,代表一种操作,将两个子节点拼接起来
基于上述基本元素定义,我们可以将题解正则表达式抽象为一个树结构(regex tree),如下图,
(foo) | (ba++r)
该树结构可以通过深度优先遍历,打印出最终的题解正则串,如上图的标题所示。
# -*- coding: utf-8 -*-
from random import random, randint, choice
import re
import itertools
# "ROOT"
class rootnode:
def __init__(self, left_child_node, right_child_node):
if left_child_node and right_child_node:
self.left_child_node = left_child_node
self.right_child_node = right_child_node
else:
self.left_child_node = node
self.right_child_node = node
def display(self):
return "|"
# universal child node
class node:
def __init__(self, node):
self.node = node
# "|"
class spliternode:
def __init__(self, left_child_dotplaceholder, right_child_dotplaceholder):
if left_child_dotplaceholder and right_child_dotplaceholder:
self.left_child_dotplaceholder = left_child_dotplaceholder
self.right_child_dotplaceholder = right_child_dotplaceholder
else:
self.left_child_dotplaceholder = node
self.right_child_dotplaceholder = node
def display(self):
return "|"
# "(.)"
class dotplaceholdernode:
def __init__(self, childnode=None):
if childnode:
self.childnode = childnode
else:
self.childnode = node
# "foo"
class charnode:
def __init__(self, charstring):
if charstring:
self.charstring = charstring
else:
self.charstring = node
def display(self):
return self.charstring
# ".."
class concat_node:
def __init__(self, left_child_node, right_child_node):
if left_child_node and right_child_node:
self.left_child_node = left_child_node
self.right_child_node = right_child_node
else:
self.left_child_node = node
self.right_child_node = node
# "++"
class qualifiernode:
def __init__(self, qualifierstrig):
if qualifierstrig:
self.qualifierstrig = qualifierstrig
else:
self.qualifierstrig = node
def display(self):
return self.qualifierstrig
def exampletree():
return rootnode(
dotplaceholdernode(
charnode("foo")
),
dotplaceholdernode(
concat_node(
concat_node(
charnode("ba"),
qualifiernode("++")
),
charnode("r")
)
)
)
# left child deep first travel
def printregextree(rootnode_i):
if rootnode_i is None:
return ""
if isinstance(rootnode_i, rootnode):
# concat the finnal regex str
finnal_regexstr = ""
finnal_regexstr += printregextree(rootnode_i.left_child_node)
finnal_regexstr += rootnode_i.display()
finnal_regexstr += printregextree(rootnode_i.right_child_node)
return finnal_regexstr
if isinstance(rootnode_i, spliternode):
# concat the finnal regex str
split_regexstr = ""
split_regexstr += printregextree(rootnode_i.left_child_dotplaceholder)
split_regexstr += rootnode_i.display()
split_regexstr += printregextree(rootnode_i.right_child_dotplaceholder)
return split_regexstr
if isinstance(rootnode_i, dotplaceholdernode):
return printregextree(rootnode_i.childnode)
if isinstance(rootnode_i, charnode):
return rootnode_i.display()
if isinstance(rootnode_i, concat_node):
concat_str = ""
concat_str += printregextree(rootnode_i.left_child_node)
concat_str += printregextree(rootnode_i.right_child_node)
return concat_str
if isinstance(rootnode_i, qualifiernode):
return rootnode_i.display()
def matches(regex, strings):
"Return a set of all the strings that are matched by regex."
return {s for s in strings if re.search(regex, s)}
def regex_parts(M, U):
"Return parts that match at least one winner, but no loser."
wholes = {'^' + w + '$' for w in M}
parts = {d for w in wholes for p in subparts(w) for d in p}
return wholes | {p for p in parts if not matches(p, U)}
def subparts(word, N=5):
"Return a set of subparts of word: consecutive characters up to length N (default 4)."
return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N))
def words(text):
return set(text.split())
def makerandomtree(M, U, parentnode=None, splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0):
if curren_level > maxdepth:
print "curren_level > maxdepth: ", curren_level
return
# ROOT node
if isinstance(parentnode, rootnode):
curren_level = 0
print "curren_level: ", curren_level
# init root node
print "init rootnode: ", curren_level
rootnode_i = rootnode(
dotplaceholdernode(None),
dotplaceholdernode(None)
)
# create left child node
print "new dotplaceholdernode"
rootnode_i.left_child_node = makerandomtree(M, U, rootnode_i.left_child_node, splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
print "new dotplaceholdernode"
# create right child node
rootnode_i.right_child_node = makerandomtree(M, U, rootnode_i.right_child_node, splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
return rootnode_i
# ".." dot placeholder node
if isinstance(parentnode, dotplaceholdernode):
curren_level += 1
print "curren_level: ", curren_level
# "|"
if random() < splitrate:
print "new spliternode"
return makerandomtree(M, U, spliternode(None, None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# ".."
elif random() < concatrate:
print "new concat_node"
return makerandomtree(M, U, concat_node(None, None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "foo"
elif random() < charrate:
print "new charnode"
return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "|" split node
if isinstance(parentnode, spliternode):
curren_level += 1
print "curren_level: ", curren_level
print "init spliternode"
splitnode_i = spliternode(
dotplaceholdernode(None),
dotplaceholdernode(None)
)
print "new dotplaceholdernode"
splitnode_i.left_child_dotplaceholder = makerandomtree(M, U, splitnode_i.left_child_dotplaceholder,
splitrate, concatrate, charrate, qualifierate,
maxdepth, curren_level)
print "new dotplaceholdernode"
splitnode_i.right_child_dotplaceholder = makerandomtree(M, U, splitnode_i.right_child_dotplaceholder,
splitrate, concatrate, charrate, qualifierate,
maxdepth, curren_level)
return splitnode_i
# ".." concat node
if isinstance(parentnode, concat_node):
curren_level += 1
print "curren_level: ", curren_level
# "foo"
if random() < charrate:
print "new charnode"
return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "++"
if random() < qualifierate:
print "new qualifiernode"
return makerandomtree(M, U, qualifiernode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "foo" char node
if isinstance(parentnode, charnode):
curren_level += 1
print "curren_level: ", curren_level
charnode_str = choice(list(regex_parts(M, U)))
print "charnode_str: ", charnode_str
print "new charnode"
charnode_i = charnode(charnode_str)
return charnode_i
# "++" qualifierate node
if isinstance(parentnode, qualifiernode):
curren_level += 1
print "curren_level: ", curren_level
qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?'])
print "qualifiernode_str: ", qualifiernode_str
print "new qualifiernode"
qualifiernode_i = qualifiernode(qualifiernode_str)
return qualifiernode_i
if __name__ == '__main__':
exampletree = exampletree()
print type(exampletree), exampletree
print printregextree(exampletree)
3. Regex Tree生长策略
有了基本的数据结构,现在定义一下regex tree的生长准则,
- 每棵树都从ROOT根节点开始生长,根节点就是一个“|”节点
- ”|“的左右子节点必须是”.“dot placeholder节点
- 字符串节点
- ”..“:concat节点
- ”|“:新的分裂节点
- 字符串节点从M的ngram词汇表中随机选取,ngram list生成原理参考上一小节
- “..”拼接节点的左右子节点可以是以下几种节点类型:
- 字符串节点
- 修饰符节点
”.“dot placeholder节点的子节点可以是以下几种节点类型:
4. 损失函数定义
这里需要用到代价敏感学习的训练思路,如果直接按照下面公式进行损失训练,
那么很快就会收敛到最优解:”|“上,原因很显然,全字符匹配中,nm和nu都是相等的,相减为0,然后减去字符串长度1,就是-1,这是算法能找到的最好答案了。
为了解决这个问题,我们需要对TP和FP采取不同的惩罚力度,
def scorefunction(tree, M, U, w=1):
dif = 0
regex_str = printregextree(tree)
M_cn, U_cn = 0, 0
for s in list(M):
try:
if re.search(regex_str, s):
M_cn += 1
except Exception, e:
print e.message, "regex_str: ", regex_str
# this regex tree is illegal, low socre!!
return -8
for u in list(U):
if re.search(regex_str, u):
U_cn += 1
# print "M_cn: ", M_cn
# print "U_cn: ", U_cn
dif = w * (M_cn - U_cn) - len(regex_str)
return dif
上面代码中有一点值得注意,由于regex tree的生成具有一定的随机性,因此很可能产生不合法的正则串,因此对不合法的正则串给予较低的分值,驱使它淘汰。
有了损失函数的定义,就可以很容易算出一个种群中所有个体的适应度排名。
def rankfunction(M, U, population):
scores = [(scorefunction(t, M, U), t) for t in population]
scores.sort()
return scores
5. 随机初始化regex tree
按照遗传编程的定义,我们先随机初始化一棵符合题解规约的regex tree,
# -*- coding: utf-8 -*- from random import random, randint, choice import re import itertools # "ROOT" class rootnode: def __init__(self, left_child_node, right_child_node): if left_child_node and right_child_node: self.left_child_node = left_child_node self.right_child_node = right_child_node else: self.left_child_node = node self.right_child_node = node def display(self): return "|" # universal child node class node: def __init__(self, node): self.node = node # "|" class spliternode: def __init__(self, left_child_dotplaceholder, right_child_dotplaceholder): if left_child_dotplaceholder and right_child_dotplaceholder: self.left_child_dotplaceholder = left_child_dotplaceholder self.right_child_dotplaceholder = right_child_dotplaceholder else: self.left_child_dotplaceholder = node self.right_child_dotplaceholder = node def display(self): return "|" # "(.)" class dotplaceholdernode: def __init__(self, childnode=None): if childnode: self.childnode = childnode else: self.childnode = node # "foo" class charnode: def __init__(self, charstring): if charstring: self.charstring = charstring else: self.charstring = node def display(self): return self.charstring # ".." class concat_node: def __init__(self, left_child_node, right_child_node): if left_child_node and right_child_node: self.left_child_node = left_child_node self.right_child_node = right_child_node else: self.left_child_node = node self.right_child_node = node # "++" class qualifiernode: def __init__(self, qualifierstrig): if qualifierstrig: self.qualifierstrig = qualifierstrig else: self.qualifierstrig = node def display(self): return self.qualifierstrig def exampletree(): return rootnode( dotplaceholdernode( charnode("foo") ), dotplaceholdernode( concat_node( concat_node( charnode("ba"), qualifiernode("++") ), charnode("r") ) ) ) # left child deep first travel def printregextree(rootnode_i): if rootnode_i is None: return "" if isinstance(rootnode_i, rootnode): # concat the finnal regex str finnal_regexstr = "" finnal_regexstr += printregextree(rootnode_i.left_child_node) finnal_regexstr += rootnode_i.display() finnal_regexstr += printregextree(rootnode_i.right_child_node) return finnal_regexstr if isinstance(rootnode_i, spliternode): # concat the finnal regex str split_regexstr = "" split_regexstr += printregextree(rootnode_i.left_child_dotplaceholder) split_regexstr += rootnode_i.display() split_regexstr += printregextree(rootnode_i.right_child_dotplaceholder) return split_regexstr if isinstance(rootnode_i, dotplaceholdernode): return printregextree(rootnode_i.childnode) if isinstance(rootnode_i, charnode): return rootnode_i.display() if isinstance(rootnode_i, concat_node): concat_str = "" concat_str += printregextree(rootnode_i.left_child_node) concat_str += printregextree(rootnode_i.right_child_node) return concat_str if isinstance(rootnode_i, qualifiernode): return rootnode_i.display() def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in p} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=5): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def words(text): return set(text.split()) def makerandomtree(M, U, parentnode=None, splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0): if curren_level > maxdepth: print "curren_level > maxdepth: ", curren_level return # ROOT node if isinstance(parentnode, rootnode): curren_level = 0 print "curren_level: ", curren_level # init root node print "init rootnode: ", curren_level rootnode_i = rootnode( dotplaceholdernode(None), dotplaceholdernode(None) ) # create left child node print "new dotplaceholdernode" rootnode_i.left_child_node = makerandomtree(M, U, rootnode_i.left_child_node, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) print "new dotplaceholdernode" # create right child node rootnode_i.right_child_node = makerandomtree(M, U, rootnode_i.right_child_node, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return rootnode_i # ".." dot placeholder node if isinstance(parentnode, dotplaceholdernode): curren_level += 1 print "curren_level: ", curren_level # "|" if random() < splitrate: print "new spliternode" return makerandomtree(M, U, spliternode(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # ".." elif random() < concatrate: print "new concat_node" return makerandomtree(M, U, concat_node(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" elif random() < charrate: print "new charnode" return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "|" split node if isinstance(parentnode, spliternode): curren_level += 1 print "curren_level: ", curren_level print "init spliternode" splitnode_i = spliternode( dotplaceholdernode(None), dotplaceholdernode(None) ) print "new dotplaceholdernode" splitnode_i.left_child_dotplaceholder = makerandomtree(M, U, splitnode_i.left_child_dotplaceholder, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) print "new dotplaceholdernode" splitnode_i.right_child_dotplaceholder = makerandomtree(M, U, splitnode_i.right_child_dotplaceholder, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return splitnode_i # ".." concat node if isinstance(parentnode, concat_node): curren_level += 1 print "curren_level: ", curren_level # "foo" if random() < charrate: print "new charnode" return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "++" if random() < qualifierate: print "new qualifiernode" return makerandomtree(M, U, qualifiernode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" char node if isinstance(parentnode, charnode): curren_level += 1 print "curren_level: ", curren_level charnode_str = choice(list(regex_parts(M, U))) print "charnode_str: ", charnode_str print "new charnode" charnode_i = charnode(charnode_str) return charnode_i # "++" qualifierate node if isinstance(parentnode, qualifiernode): curren_level += 1 print "curren_level: ", curren_level qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?']) print "qualifiernode_str: ", qualifiernode_str print "new qualifiernode" qualifiernode_i = qualifiernode(qualifiernode_str) return qualifiernode_i if __name__ == '__main__': exampletree = exampletree() print type(exampletree), exampletree print printregextree(exampletree) M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') print regex_parts(M, U) rd1 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0) print rd1 print printregextree(rd1)
可以看到,随机产生的题解并不是一个好的题解,但是它却是一个有效的题解。现在我们有了一个好的开始,接下来我们可以着手进行遗传变异了。
6. 遗传变异
遗传编程的遗传变异分为两种:1)mutate;2)crossover,我们分别来定义。
1)mutate
mutate变异遍历整个regex tree,针对不同节点采取不同的变异策略:
- ”concat node“:根据一定的概率决定是否随机用一颗新树代替两个子节点
- ”char node“节点:根据一定的概率决定是否随机从ngram候选列表中选一个新的char sring代替
- “qualifiernode node”节点:根据一定的概率决定是否随机从修饰符候选集中选一个新的qualifiernode string代替
def mutate(M, U, t, probchange=0.2):
if random() < probchange:
return makerandomtree(M, U)
else:
result = deepcopy(t)
if hasattr(t, "left_concatchildnode"):
result.left_concatchildnode = mutate(M, U, t.left_concatchildnode, probchange)
if hasattr(t, "right_concatchildnode"):
result.right_concatchildnode = mutate(M, U, t.right_concatchildnode, probchange)
if hasattr(t, "childnode"):
result.childnode = mutate(M, U, t.childnode, probchange)
if hasattr(t, "qualifierstrig"):
result.qualifierstrig = qualifiernode(choice(['.', '+', '?', '*', '.*', '.+', '.*?']))
if hasattr(t, "charstring"):
result.charstring = charnode(choice(list(regex_parts(M, U))))
return result
总体来说,mutate的作用在于向种群中引入更多的多样性,随机性和多样性是物种进化的原动力。
2)crossover
crossover交叉是同序遍历两棵regex tree,按照一定的概率决定是否要将各自的节点进行互换。
def crossover(t1, t2, probswap=0.7): if random() < probswap: return deepcopy(t2) else: result = deepcopy(t1) if hasattr(t1, 'left_childnode') and hasattr(t2, 'left_childnode'): result.left_childnode = crossover(t1.left_childnode, t2.left_childnode, probswap) if hasattr(t1, 'right_childnode') and hasattr(t2, 'right_childnode'): result.right_childnode = crossover(t1.right_childnode, t2.right_childnode, probswap) if hasattr(t1, 'childnode') and hasattr(t2, 'childnode'): result.childnode = crossover(t1.childnode, t2.childnode, probswap) if hasattr(t1, 'qualifierstrig') and hasattr(t2, 'qualifierstrig'): result.qualifierstrig = t2.qualifierstrig if hasattr(t1, 'charstring') and hasattr(t2, 'charstring'): result.charstring = t2.charstring return result
总体来说,crossover的作用在于加速优秀基因和保留,和劣质基因的淘汰。因为可以这么理解,因为crossover的存在,同一个基因模式在种群中会有扩大的趋势,而如果是优秀的基因则会不断被保留。
7. 自动遗传迭代进化
至此,regex tree遗传进化的所有元素都已经准备妥当了,我们现在可以开始编写遗传进化算法主程序了,让程序自动生成一段符合题解的正则。
# -*- coding: utf-8 -*- from random import random, randint, choice import re from copy import deepcopy import itertools from math import log import numpy as np import os # "ROOT" class rootnode: def __init__(self, left_childnode, right_childnode): if left_childnode and right_childnode: self.left_childnode = left_childnode self.right_childnode = right_childnode else: self.left_childnode = node self.right_childnode = node def display(self): return "|" # universal child node class node: def __init__(self, node): self.node = node # "|" class spliternode: def __init__(self, left_childnode, right_childnode): if left_childnode and right_childnode: self.left_childnode = left_childnode self.right_childnode = right_childnode else: self.left_childnode = node self.right_childnode = node def display(self): return "|" # "(.)" class dotplaceholdernode: def __init__(self, childnode=None): if childnode: self.childnode = childnode else: self.childnode = node # "foo" class charnode: def __init__(self, charstring): if charstring: self.charstring = charstring else: self.charstring = node def display(self): return self.charstring # ".." class concat_node: def __init__(self, left_concatchildnode, right_concatchildnode): if left_concatchildnode and right_concatchildnode: self.left_concatchildnode = left_concatchildnode self.right_concatchildnode = right_concatchildnode else: self.left_concatchildnode = node self.right_concatchildnode = node # "++" class qualifiernode: def __init__(self, qualifierstrig): if qualifierstrig: self.qualifierstrig = qualifierstrig else: self.qualifierstrig = node def display(self): return self.qualifierstrig def exampletree(): return rootnode( dotplaceholdernode( charnode("foo") ), dotplaceholdernode( concat_node( concat_node( charnode("ba"), qualifiernode("++") ), charnode("r") ) ) ) # left child deep first travel def printregextree(rootnode_i): if rootnode_i is None: return "" if isinstance(rootnode_i, rootnode): # concat the finnal regex str finnal_regexstr = "" finnal_regexstr += printregextree(rootnode_i.left_childnode) finnal_regexstr += rootnode_i.display() finnal_regexstr += printregextree(rootnode_i.right_childnode) return finnal_regexstr if isinstance(rootnode_i, spliternode): # concat the finnal regex str split_regexstr = "" split_regexstr += printregextree(rootnode_i.left_childnode) split_regexstr += rootnode_i.display() split_regexstr += printregextree(rootnode_i.right_childnode) return split_regexstr if isinstance(rootnode_i, dotplaceholdernode): return printregextree(rootnode_i.childnode) if isinstance(rootnode_i, charnode): return rootnode_i.display() if isinstance(rootnode_i, concat_node): concat_str = "" concat_str += printregextree(rootnode_i.left_concatchildnode) concat_str += printregextree(rootnode_i.right_concatchildnode) return concat_str if isinstance(rootnode_i, qualifiernode): return rootnode_i.display() def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in p} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=5): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def words(text): return set(text.split()) def makerandomtree(M, U, charnode_pool, parentnode=rootnode(None, None), splitrate=0.7, concatrate=0.7, charrate=0.7, qualifierate=0.7, maxdepth=6, curren_level=0, stopearly=0.1): if curren_level > maxdepth: #print "curren_level > maxdepth: ", curren_level return if random() < stopearly: return # ROOT node if isinstance(parentnode, rootnode): curren_level = 0 #print "curren_level: ", curren_level # init root node #print "init rootnode: ", curren_level rootnode_i = rootnode( dotplaceholdernode(None), dotplaceholdernode(None) ) # create left child node #print "new dotplaceholdernode" rootnode_i.left_childnode = makerandomtree(M, U, charnode_pool, rootnode_i.left_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) #print "new dotplaceholdernode" # create right child node rootnode_i.right_childnode = makerandomtree(M, U, charnode_pool, rootnode_i.right_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return rootnode_i # ".." dot placeholder node if isinstance(parentnode, dotplaceholdernode): curren_level += 1 #print "curren_level: ", curren_level # "|" if random() < splitrate: #print "new spliternode" return makerandomtree(M, U, charnode_pool, spliternode(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # ".." elif random() < concatrate: #print "new concat_node" return makerandomtree(M, U, charnode_pool, concat_node(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" elif random() < charrate: #print "new charnode" return makerandomtree(M, U, charnode_pool, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "|" split node if isinstance(parentnode, spliternode): curren_level += 1 #print "curren_level: ", curren_level #print "init spliternode" splitnode_i = spliternode( dotplaceholdernode(None), dotplaceholdernode(None) ) #print "new dotplaceholdernode" splitnode_i.left_childnode = makerandomtree(M, U, charnode_pool, splitnode_i.left_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) #print "new dotplaceholdernode" splitnode_i.right_childnode = makerandomtree(M, U, charnode_pool, splitnode_i.right_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return splitnode_i # ".." concat node if isinstance(parentnode, concat_node): curren_level += 1 #print "curren_level: ", curren_level # "foo" if random() < charrate: #print "new charnode" return makerandomtree(M, U, charnode_pool, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "++" if random() < qualifierate: #print "new qualifiernode" return makerandomtree(M, U, charnode_pool, qualifiernode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" char node if isinstance(parentnode, charnode): curren_level += 1 #print "curren_level: ", curren_level charnode_str = choice(charnode_pool) #print "charnode_str: ", charnode_str #print "new charnode" charnode_i = charnode(charnode_str) return charnode_i # "++" qualifierate node if isinstance(parentnode, qualifiernode): curren_level += 1 #print "curren_level: ", curren_level qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?']) #print "qualifiernode_str: ", qualifiernode_str #print "new qualifiernode" qualifiernode_i = qualifiernode(qualifiernode_str) return qualifiernode_i def scorefunction(tree, M, U, w=1): dif = 0 regex_str = printregextree(tree) M_cn, U_cn = 0, 0 for s in list(M): try: if re.search(regex_str, s): M_cn += 1 except Exception, e: # print e.message, "regex_str: ", regex_str # this regex tree is illegal, low socre!! return -32 for u in list(U): if re.search(regex_str, u): U_cn += 1 # print "M_cn: ", M_cn # print "U_cn: ", U_cn dif = w * (M_cn - 2*U_cn) - len(regex_str) return dif def rankfunction_(M, U, population): scores = [(scorefunction(t, M, U), t) for t in population] # remove illegal regex scores_ = [] for i in scores: if i[1]: scores_.append(i) scores_.sort(reverse=True) return scores_ def mutate(M, U, charnode_pool, t, probchange=0.4): if random() < probchange: return makerandomtree(M, U, charnode_pool) else: result = deepcopy(t) if hasattr(t, "left_concatchildnode"): result.left_concatchildnode = mutate(M, U, charnode_pool, t.left_concatchildnode, probchange) if hasattr(t, "right_concatchildnode"): result.right_concatchildnode = mutate(M, U, charnode_pool, t.right_concatchildnode, probchange) if hasattr(t, "childnode"): result.childnode = mutate(M, U, charnode_pool, t.childnode, probchange) if hasattr(t, "qualifierstrig"): result.qualifierstrig = qualifiernode(choice(['.', '+', '?', '*', '.*', '.+', '.*?'])) if hasattr(t, "charstring"): result.charstring = charnode(choice(charnode_pool)) return result def crossover(t1, t2, probswap=0.5): if random() < probswap: return deepcopy(t2) else: result = deepcopy(t1) if hasattr(t1, 'left_childnode') and hasattr(t2, 'left_childnode'): result.left_childnode = crossover(t1.left_childnode, t2.left_childnode, probswap) if hasattr(t1, 'right_childnode') and hasattr(t2, 'right_childnode'): result.right_childnode = crossover(t1.right_childnode, t2.right_childnode, probswap) if hasattr(t1, 'childnode') and hasattr(t2, 'childnode'): result.childnode = crossover(t1.childnode, t2.childnode, probswap) if hasattr(t1, 'qualifierstrig') and hasattr(t2, 'qualifierstrig'): result.qualifierstrig = t2.qualifierstrig if hasattr(t1, 'charstring') and hasattr(t2, 'charstring'): result.charstring = t2.charstring return result def evolve(M, U, charnode_pool, popsize=128, rankfunction=rankfunction_, maxgen=500, mutationrate=0.6, probswap=0.5, pexp=0.3, pnew=0.8): # Returns a random number, tending towards lower numbers. # The lower pexp is, more lower numbers you will get # probexp:表示在构造新种群时,”选择评价较低的程序“这一概率的递减比例。该值越大,相应的筛选过程就越严格,即只选择评价最高的多少比例的个体作为复制对象 def selectindex(): return int(log(random()) / log(pexp)) # Create a random initial population population = [makerandomtree(M, U, charnode_pool) for i in range(popsize)] scores = [] for i in range(maxgen): scores = rankfunction(M, U, population) print scores[0] print "evole round: {0}, top score: {1}, regex_str: {2}".format(i, scores[0][0], printregextree(scores[0][1])) if scores[0][0] > 0: print "found good solution: {0}".format(printregextree(scores[0][1])) break # The top 20% always make it # newpop = np.array(scores)[:int(len(scores) * 0.2), 1].tolist() newpop = [scores[0][1], scores[1][1]] # Build the next generation # probnew:表示在构造新种群时,”引入一个全新的随机程序“的概率,该参数和probexp是”种群多样性“的重要决定参数 while len(newpop) < popsize: if random() < pnew: newpop.append( mutate( M, U, charnode_pool, crossover( scores[selectindex()][1], scores[selectindex()][1], probswap ), mutationrate ) ) else: # Add a random node to mix things up new_tree = makerandomtree(M, U, charnode_pool) # print "evole round: {0}, add new tree: {1}".format(i, printregextree(new_tree)) newpop.append(new_tree) population = newpop # return the evolutionary results return scores[0][1] def test_regex(M, U, regex_str): dif = 0 M_cn, U_cn = 0, 0 for s in list(M): try: if re.search(regex_str, s): M_cn += 1 except Exception, e: # print e.message, "regex_str: ", regex_str # this regex tree is illegal, low socre!! dif = -32 for u in list(U): try: if re.search(regex_str, u): U_cn += 1 except Exception, e: # print e.message, "regex_str: ", regex_str # this regex tree is illegal, low socre!! dif = -32 print "M_cn: ", M_cn print "U_cn: ", U_cn dif = 1 * (M_cn - 4 * U_cn) - 4 * len(regex_str) print "dif: ", dif def test_regex_golf(): # exampletree = exampletree() # print type(exampletree), exampletree # print printregextree(exampletree) M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') charnode_pool = list(regex_parts(M, U)) print charnode_pool # rd1 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0) # rd2 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0) # print "rd1: " # print printregextree(rd1) # print "rd2: " # print printregextree(rd2) # dif = scorefunction(tree=rd1, M=M, U=U, w=1) # print "dif: ", dif # population = [makerandomtree(M, U) for i in range(10)] # for i in population: # print printregextree(i) # scores = rankfunction_(M, U, population) # print "function score: ", scores # print np.array(scores)[:int(len(scores) * 0.2), 1].tolist() # rd1_mutate = mutate(M, U, rd1, probchange=0.2) # print "rd1_mutate: " # print printregextree(rd1_mutate) # rd1_rd2_crossover = crossover(rd1, rd2, probswap=0.7) # print "rd1_rd2_crossover: " # print printregextree(rd1_rd2_crossover) evolutionary_regex_str = evolve(M, U, charnode_pool) print printregextree(evolutionary_regex_str) def load_data(): M, U = [], [] rootDir = "./blacksamples" for lists in os.listdir(rootDir): if lists == '.DS_Store': continue filepath = os.path.join(rootDir, lists) filecontent = open(filepath, 'r').read() # only remain English word cop = re.compile("[^^a-z^A-Z^0-9^s]") # remove space filecontent = re.sub(r's+', '', filecontent).strip() filecontent = cop.sub('', filecontent) M.append(filecontent) rootDir = "./whitesamples" for lists in os.listdir(rootDir): if lists == '.DS_Store': continue filepath = os.path.join(rootDir, lists) filecontent = open(filepath, 'r').read() # only remain English word cop = re.compile("[^^a-z^A-Z^0-9^s]") filecontent = cop.sub('', filecontent) # remove space filecontent = re.sub(r's+', '', filecontent).strip() U.append(filecontent) M = set(M) U = set(U) return M, U def test_webshell(): M, U = load_data() # print M charnode_pool = list(regex_parts(M, U)) print charnode_pool print len(charnode_pool) evolutionary_regex_str = evolve(M, U, charnode_pool) print printregextree(evolutionary_regex_str) print "test_regex: " test_regex(M, U, charnode_pool) if __name__ == '__main__': test_webshell() # test_regex_golf()
代码中的blacksamples、whitesamples请读者朋友自行准备。
Relevant Link:
http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%ba%8c%e6%84%9a%e5%bc%84%e6%b7%b1%e5%ba%a6%e5%ad%a6%e4%b9%a0%e7%9a%84%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95 Bartoli, Alberto, et al. “Playing regex golf with genetic programming.” Proceedings of the 2014 conference on Genetic and evolutionary computation. ACM, 2014. https://alf.nu/RegexGolf http://www.doc88.com/p-0387699026353.html http://regex.inginf.units.it/golf/# https://github.com/norvig/pytudes https://github.com/norvig/pytudes/blob/master/ipynb/xkcd1313.ipynb https://github.com/norvig/pytudes/blob/master/ipynb/xkcd1313-part2.ipynb
6. 遗传编程能够应用在网络安全攻防上?
我们来回顾一下要应用遗传编程在某个具体场景中,需要的两个必要条件:
- 能够明确定义出可数值化的损失函数:针对每一次变种后的结果都能够实时计算出所有个体对当前环境的适应度(被判黑的程度)
- 有明确生成外部表象的内显子算法:例如PHP Token Tree、Regex Tree、Four fundamental operations of arithmeticTree,能够按照某种深度优先遍历算法,对整个AST Tree进行遍历,在遍历的过程中完成节点变异和个体间交叉
- 基于内显子算法生成的外显子需要具备业务可解释性:和正则表达式,数学函数方程式这种纯数学概念的外显子不同,在安全领域,对生成的文本还有一个“业务可解释性”的要求。例如说基于cmd ast tree生成了一段cmdline,虽然可能这段cmdline是符合cmdline语法的,但是本身不具备攻击性,即这个cmdline无法对被执行对象完成特定的攻击目的。也许有读者会说,那很简单,我们只要在内显子变异算法上增加一个约束条件,强制生成的外显子字符串具备攻击性不就好了吗?但是最难的问题就在这里,一个cmdline是否具备攻击性,具备多大的攻击性,是非常难通过数学方式形式化定义的
0x1:基于遗传编程自动挖掘0day样本
基于php token生成一个php ast tree,并且在损失函数中找到一定定量评估方法,判断当前文件的恶意程度。用遗传编程自动生成一些可以绕过当前检测机制的php webshell
- 随机化初始化一棵php token tree
- 基于token tree重构出原始文件
- 个体适应度判断:这一步有两种不同的思路,
- 绕过所有检测引擎发现0day样本的优化目标:通过多个检测引擎对文件进行恶意行为检测,并根据命中情况计算损失函数值,这么做的目的是区分出种群中适应度不同的个体。
- 绕过单个概率检测引擎的优化目标:对于想深度学习sigmoid损失函数来说,倒数最后一层sigmoid函数输出的是一个置信概率,区间是【0,1】,这就给不同的个体赋予了不同的适应度,遗传编程可以通过优化尝试降低这个置信概率,使之逃过模型的判黑阈值,这也是一种攻击深度学习算法模型的方式
- 筛选出本轮中适应度最高(损失值最低)的个体,按照标准遗传编程进行种群繁殖
- 直到找到一个损失值为零(完全绕过现有检测体系的新文件)
笔者提醒:
本质上来说,遗传编程的优化方向是随机的,和梯度驱动的SGD优化算法相比,遗传编程每次的迭代优化并不明确朝某个方向前进,而是被动由环境来进行淘汰和筛选,所以是一种环境选择压驱动的优化算法。
遗传编程的这种的优化特性特别适合像“恶意样本检测”这种“阶跃损失”的分类问题,因为对于恶意样本来说,只有两种状态,“黑或白”,损失函数的值也只有两种,“0或者1”,因此,我们无法用SGD类似的算法来优化这种阶跃损失函数问题,因为阶跃点处的梯度要么不存在(左极限),要么是无穷大的(右极限)。
但是遗传编程依然能在每轮筛选出“优胜者”,并按照一定的策略保留优胜者,进行交叉和变异以进入下一轮,同时也会按照一定的概率挑选部分的“失败者”也进入下一轮进化,这么做的目的是引入多样性。