“超参数”与“网络结构”自动化设置方法—DeepHyper
- 2019 年 11 月 20 日
- 筆記
前言: 在深度学习和机器学习算法学习和训练的过程中,有两个非常让人头疼的问题
- 超参数的设置
- 神经网络结构的设计
可以说这两个问题一直困扰每一个学习者,为了解决这些问题,谷歌公司开源了AutoML(貌似收费)。此外还有Keras(后期详解),本篇文章介绍一个自动化学习包: DeepHyper
DeepHyper
可扩展的异步神经网络和超参数搜索深度神经网络 DeepHyper是一种用于深度神经网络的自动化机器学习(AutoML)软件包。 它包括两个组成部分: (1)神经架构搜索是一种自动搜索高性能深度神经网络架构的方法。 (2)超参数搜索是一种自动搜索给定深度神经网络的高性能超参数的方法。
DeepHyper提供了一个基础架构,旨在针对HPC(Hyper Parameters Search)系统中的神经架构和超参数搜索方法,可扩展性和可移植性进行实验研究。 为可扩展的超参数和神经架构搜索方法的实现和研究提供了一个通用接口。 在这个包中,其为用户提供了不同的模块:
- 基准(benchmark):超参数或神经架构搜索的一组问题,用户可以使用它来比较我们的不同搜索算法或作为构建自己问题的示例。
- 评估者(evaluator):一组有助于在不同系统和不同情况下运行搜索的对象,例如快速和轻型实验或长时间和重度运行。
- 搜索(search):一组用于超参数和神经架构搜索的算法。 您还将找到一种模块化方法来定义新的搜索算法和用于超参数或神经架构搜索的特定子模块。
其结构如下:

一、Hyperparameter Search (HPS)搜索
(1)定义超参数问题
首先导入deephyper包,并设置问题和纬度
1from deephyper.benchmark import HpProblem 2Problem = HpProblem() 3Problem.add_dim('nunits', (10, 20), 10) 4print(Problem) 5Problem 6{'nunits': (10, 20)} 7 8Starting Point 9{'nunits': 10}
通过运行模型的函数,结果为类似{'nunits':10}的字典,但每个键的值将根据搜索的选择而改变。 下面看看如何为mnist数据上的多层Perceptron模型训练定义一个简单的运行函数。
1'''Trains a simple deep NN on the MNIST dataset. 2Gets to 98.40% test accuracy after 20 epochs 3(there is *a lot* of margin for parameter tuning). 42 seconds per epoch on a K520 GPU. 5''' 6 7from __future__ import print_function 8 9import keras 10from keras.datasets import mnist 11from keras.models import Sequential 12from keras.layers import Dense, Dropout 13from keras.optimizers import RMSprop 14 15def run(params): 16 nunits = params['nunits] 17 18 batch_size = 128 19 num_classes = 10 20 epochs = 20 21 22 # the data, split between train and test sets 23 (x_train, y_train), (x_test, y_test) = mnist.load_data() 24 25 x_train = x_train.reshape(60000, 784) 26 x_test = x_test.reshape(10000, 784) 27 x_train = x_train.astype('float32') 28 x_test = x_test.astype('float32') 29 x_train /= 255 30 x_test /= 255 31 print(x_train.shape[0], 'train samples') 32 print(x_test.shape[0], 'test samples') 33 34 # convert class vectors to binary class matrices 35 y_train = keras.utils.to_categorical(y_train, num_classes) 36 y_test = keras.utils.to_categorical(y_test, num_classes) 37 38 model = Sequential() 39 model.add(Dense(nunits, activation='relu', input_shape=(784,))) 40 model.add(Dropout(0.2)) 41 model.add(Dense(512, activation='relu')) 42 model.add(Dropout(0.2)) 43 model.add(Dense(num_classes, activation='softmax')) 44 45 model.summary() 46 47 model.compile(loss='categorical_crossentropy', 48 optimizer=RMSprop(), 49 metrics=['accuracy']) 50 51 history = model.fit(x_train, y_train, 52 batch_size=batch_size, 53 epochs=epochs, 54 verbose=1, 55 validation_data=(x_test, y_test)) 56 score = model.evaluate(x_test, y_test, verbose=0) 57 print('Test loss:', score[0]) 58 print('Test accuracy:', score[1]) 59 60 return -score[1] 61
现在,如果您想要搜索上一个问题和模型。 假设问题是在“package_name/problem.py” 中定义,模型在package_name/mnist_mlp.py 中定义。 如果使用命令行运行AMBS之类的搜索:
1python ambs.py --problem package_name.problem.Problem --run package_name.mnist_mlp.run
(2)Asynchronous Model-Base Search (AMBS)
论文:点击阅读
1class deephyper.search.hps.ambs.AMBS(problem, run, evaluator, **kwargs) 2
(3) Genetic Algorithm (GA)
接口类
1class deephyper.search.hps.ga.GA(problem, run, evaluator, **kwargs) 2
完整代码
1import signal 2import random 3 4from deephyper.search.hps.optimizer import GAOptimizer 5from deephyper.search import Search 6from deephyper.search import util 7 8logger = util.conf_logger('deephyper.search.hps.ga') 9 10SERVICE_PERIOD = 2 # Delay (seconds) between main loop iterations 11CHECKPOINT_INTERVAL = 10 # How many jobs to complete between optimizer checkpoints 12EXIT_FLAG = False 13 14def on_exit(signum, stack): 15 global EXIT_FLAG 16 EXIT_FLAG = True 17 18[docs]class GA(Search): 19 def __init__(self, problem, run, evaluator, **kwargs): 20 super().__init__(problem, run, evaluator, **kwargs) 21 logger.info("Initializing GA") 22 self.optimizer = GAOptimizer(self.problem, self.num_workers, self.args) 23 24 @staticmethod 25 def _extend_parser(parser): 26 parser.add_argument('--ga_num_gen', 27 default=100, 28 type=int, 29 help='number of generation for genetic algorithm' 30 ) 31 parser.add_argument('--individuals_per_worker', 32 default=5, 33 type=int, 34 help='number of individuals per worker') 35 return parser 36 37 def run(self): 38 # opt = GAOptimizer(cfg) 39 # evaluator = evaluate.create_evaluator(cfg) 40 logger.info(f"Starting new run") 41 42 43 timer = util.DelayTimer(max_minutes=None, period=SERVICE_PERIOD) 44 timer = iter(timer) 45 elapsed_str = next(timer) 46 47 logger.info("Hyperopt GA driver starting") 48 logger.info(f"Elapsed time: {elapsed_str}") 49 50 51 if self.optimizer.pop is None: 52 logger.info("Generating initial population") 53 logger.info(f"{self.optimizer.INIT_POP_SIZE} individuals") 54 self.optimizer.pop = self.optimizer.toolbox.population(n=self.optimizer.INIT_POP_SIZE) 55 individuals = self.optimizer.pop 56 self.evaluate_fitnesses(individuals, self.optimizer, self.evaluator, self.args.eval_timeout_minutes) 57 self.optimizer.record_generation(num_evals=len(self.optimizer.pop)) 58 59 with open('ga_logbook.log', 'w') as fp: 60 fp.write(str(self.optimizer.logbook)) 61 print("best:", self.optimizer.halloffame[0]) 62 63 while self.optimizer.current_gen < self.optimizer.NGEN: 64 self.optimizer.current_gen += 1 65 logger.info(f"Generation {self.optimizer.current_gen} out of {self.optimizer.NGEN}") 66 logger.info(f"Elapsed time: {elapsed_str}") 67 68 # Select the next generation individuals 69 offspring = self.optimizer.toolbox.select(self.optimizer.pop, len(self.optimizer.pop)) 70 # Clone the selected individuals 71 offspring = list(map(self.optimizer.toolbox.clone, offspring)) 72 73 # Apply crossover and mutation on the offspring 74 for child1, child2 in zip(offspring[::2], offspring[1::2]): 75 if random.random() < self.optimizer.CXPB: 76 self.optimizer.toolbox.mate(child1, child2) 77 del child1.fitness.values 78 del child2.fitness.values 79 80 for mutant in offspring: 81 if random.random() < self.optimizer.MUTPB: 82 self.optimizer.toolbox.mutate(mutant) 83 del mutant.fitness.values 84 85 # Evaluate the individuals with an invalid fitness 86 invalid_ind = [ind for ind in offspring if not ind.fitness.valid] 87 logger.info(f"Evaluating {len(invalid_ind)} invalid individuals") 88 self.evaluate_fitnesses(invalid_ind, self.optimizer, self.evaluator, 89 self.args.eval_timeout_minutes) 90 91 # The population is entirely replaced by the offspring 92 self.optimizer.pop[:] = offspring 93 94 self.optimizer.record_generation(num_evals=len(invalid_ind)) 95 96 with open('ga_logbook.log', 'w') as fp: 97 fp.write(str(self.optimizer.logbook)) 98 print("best:", self.optimizer.halloffame[0]) 99 100 def evaluate_fitnesses(self, individuals, opt, evaluator, timeout_minutes): 101 points = map(opt.space_encoder.decode_point, individuals) 102 points = [{key:x for key,x in zip(self.problem.space.keys(), point)} 103 for point in points] 104 evaluator.add_eval_batch(points) 105 logger.info(f"Waiting on {len(points)} individual fitness evaluations") 106 results = evaluator.await_evals(points, timeout=timeout_minutes*60) 107 108 for ind, (x,fit) in zip(individuals, results): 109 ind.fitness.values = (fit,) 110 111 112 113if __name__ == "__main__": 114 args = GA.parse_args() 115 search = GA(**vars(args)) 116 #signal.signal(signal.SIGINT, on_exit) 117 #signal.signal(signal.SIGTERM, on_exit) 118 search.run()
二、神经网络搜索
Neural Architecture Search (NAS)
(1)异步搜索 NAS A3C (PPO) Asynchronous

搜索接口类
1class deephyper.search.nas.ppo_a3c_async.NasPPOAsyncA3C(problem, run, evaluator, **kwargs) 2
具体搜索运行代码:
1mpirun -np 2 python ppo_a3c_async.py --problem deephyper.benchmark.nas.mnist1D.problem.Problem --run deephyper.search.nas.model.run.alpha.run --evaluator subprocess
完整代码:
1import os 2import json 3from math import ceil, log 4from pprint import pprint, pformat 5from mpi4py import MPI 6import math 7 8from deephyper.evaluator import Evaluator 9from deephyper.search import util, Search 10 11from deephyper.search.nas.agent import nas_ppo_async_a3c 12 13logger = util.conf_logger('deephyper.search.nas.ppo_a3c_async') 14 15def print_logs(runner): 16 logger.debug('num_episodes = {}'.format(runner.global_episode)) 17 logger.debug(' workers = {}'.format(runner.workers)) 18 19def key(d): 20 return json.dumps(dict(arch_seq=d['arch_seq'])) 21 22LAUNCHER_NODES = int(os.environ.get('BALSAM_LAUNCHER_NODES', 1)) 23WORKERS_PER_NODE = int(os.environ.get('DEEPHYPER_WORKERS_PER_NODE', 1)) 24 25[docs]class NasPPOAsyncA3C(Search): 26 """Neural Architecture search using proximal policy gradient with asynchronous optimization. 27 """ 28 29 def __init__(self, problem, run, evaluator, **kwargs): 30 super().__init__(problem, run, evaluator, **kwargs) 31 # set in super : self.problem 32 # set in super : self.run_func 33 # set in super : self.evaluator 34 self.evaluator = Evaluator.create(self.run_func, 35 cache_key=key, 36 method=evaluator) 37 38 self.num_episodes = kwargs.get('num_episodes') 39 if self.num_episodes is None: 40 self.num_episodes = math.inf 41 42 self.reward_rule = util.load_attr_from('deephyper.search.nas.agent.utils.'+kwargs['reward_rule']) 43 44 self.space = self.problem.space 45 46 logger.debug(f'evaluator: {type(self.evaluator)}') 47 48 self.num_agents = MPI.COMM_WORLD.Get_size() - 1 # one is the parameter server 49 self.rank = MPI.COMM_WORLD.Get_rank() 50 51 logger.debug(f'num_agents: {self.num_agents}') 52 logger.debug(f'rank: {self.rank}') 53 54 @staticmethod 55 def _extend_parser(parser): 56 parser.add_argument('--num-episodes', type=int, default=None, 57 help='maximum number of episodes') 58 parser.add_argument('--reward-rule', type=str, 59 default='final_reward_for_all_timesteps', 60 choices=[ 61 'final_reward_for_all_timesteps', 62 'episode_reward_for_final_timestep' 63 ], 64 help='A function which describe how to spread the episodic reward on all timesteps of the corresponding episode.') 65 return parser 66 67 def main(self): 68 # Settings 69 #num_parallel = self.evaluator.num_workers - 4 #balsam launcher & controller of search for cooley 70 # num_nodes = self.evaluator.num_workers - 1 #balsam launcher & controller of search for theta 71 num_nodes = LAUNCHER_NODES * WORKERS_PER_NODE - 1 # balsam launcher 72 num_nodes -= 1 # parameter server is neither an agent nor a worker 73 if num_nodes > self.num_agents: 74 num_episodes_per_batch = (num_nodes-self.num_agents)//self.num_agents 75 else: 76 num_episodes_per_batch = 1 77 78 if self.rank == 0: 79 logger.debug(f'<Rank={self.rank}> num_nodes: {num_nodes}') 80 logger.debug(f'<Rank={self.rank}> num_episodes_per_batch: {num_episodes_per_batch}') 81 82 logger.debug(f'<Rank={self.rank}> starting training...') 83 nas_ppo_async_a3c.train( 84 num_episodes=self.num_episodes, 85 seed=2018, 86 space=self.problem.space, 87 evaluator=self.evaluator, 88 num_episodes_per_batch=num_episodes_per_batch, 89 reward_rule=self.reward_rule 90 ) 91 92 93if __name__ == "__main__": 94 args = NasPPOAsyncA3C.parse_args() 95 search = NasPPOAsyncA3C(**vars(args)) 96 search.main()
(2)同步更新 NAS A3C (PPO) Synchronous

1class deephyper.search.nas.ppo_a3c_sync.NasPPOSyncA3C(problem, run, evaluator, **kwargs) 2
使用近端策略梯度进行神经结构搜索和同步优化
1python -m deephyper.search.nas.ppo_a3c_sync --evaluator subprocess --problem 'deephyper.benchmark.nas.linearReg.problem.Problem' --run 'deephyper.search.nas.model.run.alpha.run'
或者使用MPI来启动n个代理,其中n = np,因为所有代理都是将与第一个代理同步:
1mpirun -np 2 python ppo_a3c_async.py --problem deephyper.benchmark.nas.mnist1D.problem.Problem --run deephyper.search.nas.model.run.alpha.run --evaluator subprocess
完整代码实现
1import os 2import json 3from pprint import pprint, pformat 4from mpi4py import MPI 5import math 6 7from deephyper.evaluator import Evaluator 8from deephyper.search import util, Search 9 10from deephyper.search.nas.agent import nas_ppo_sync_a3c 11 12logger = util.conf_logger('deephyper.search.nas.ppo_a3c_sync') 13 14def print_logs(runner): 15 logger.debug('num_episodes = {}'.format(runner.global_episode)) 16 logger.debug(' workers = {}'.format(runner.workers)) 17 18def key(d): 19 return json.dumps(dict(arch_seq=d['arch_seq'])) 20 21LAUNCHER_NODES = int(os.environ.get('BALSAM_LAUNCHER_NODES', 1)) 22WORKERS_PER_NODE = int(os.environ.get('DEEPHYPER_WORKERS_PER_NODE', 1)) 23 24[docs]class NasPPOSyncA3C(Search): 25 """Neural Architecture search using proximal policy gradient with synchronous optimization. 26 """ 27 28 def __init__(self, problem, run, evaluator, **kwargs): 29 super().__init__(problem, run, evaluator, **kwargs) 30 # set in super : self.problem 31 # set in super : self.run_func 32 # set in super : self.evaluator 33 self.evaluator = Evaluator.create(self.run_func, 34 cache_key=key, 35 method=evaluator) 36 37 self.num_episodes = kwargs.get('num_episodes') 38 if self.num_episodes is None: 39 self.num_episodes = math.inf 40 41 self.reward_rule = util.load_attr_from('deephyper.search.nas.agent.utils.'+kwargs['reward_rule']) 42 43 self.space = self.problem.space 44 45 logger.debug(f'evaluator: {type(self.evaluator)}') 46 47 self.num_agents = MPI.COMM_WORLD.Get_size() - 1 # one is the parameter server 48 self.rank = MPI.COMM_WORLD.Get_rank() 49 50 logger.debug(f'num_agents: {self.num_agents}') 51 logger.debug(f'rank: {self.rank}') 52 53 @staticmethod 54 def _extend_parser(parser): 55 parser.add_argument('--num-episodes', type=int, default=None, 56 help='maximum number of episodes') 57 parser.add_argument('--reward-rule', type=str, 58 default='reward_for_final_timestep', 59 choices=[ 60 'reward_for_all_timesteps', 61 'reward_for_final_timestep' 62 ], 63 help='A function which describe how to spread the episodic reward on all timesteps of the corresponding episode.') 64 return parser 65 66 def main(self): 67 # Settings 68 #num_parallel = self.evaluator.num_workers - 4 #balsam launcher & controller of search for cooley 69 # num_nodes = self.evaluator.num_workers - 1 #balsam launcher & controller of search for theta 70 num_nodes = LAUNCHER_NODES * WORKERS_PER_NODE - 1 # balsam launcher 71 if num_nodes > self.num_agents: 72 num_episodes_per_batch = (num_nodes-self.num_agents)//self.num_agents 73 else: 74 num_episodes_per_batch = 1 75 76 if self.rank == 0: 77 logger.debug(f'<Rank={self.rank}> num_nodes: {num_nodes}') 78 logger.debug(f'<Rank={self.rank}> num_episodes_per_batch: {num_episodes_per_batch}') 79 80 logger.debug(f'<Rank={self.rank}> starting training...') 81 nas_ppo_sync_a3c.train( 82 num_episodes=self.num_episodes, 83 seed=2018, 84 space=self.problem.space, 85 evaluator=self.evaluator, 86 num_episodes_per_batch=num_episodes_per_batch, 87 reward_rule=self.reward_rule 88 ) 89 90 91if __name__ == "__main__": 92 args = NasPPOSyncA3C.parse_args() 93 search = NasPPOSyncA3C(**vars(args)) 94 search.main()
(3)随机搜索 NAS Random
1class deephyper.search.nas.random.NasRandom(problem, run, evaluator, **kwargs) 2
好了,下面先安装
1From pip: 2 3pip install deephyper 4From github: 5 6git clone https://github.com/deephyper/deephyper.git 7cd deephyper/ 8pip install -e . 9#if you want to install deephyper with test and documentation packages: 10 11# From Pypi 12pip install 'deephyper[tests,docs]' 13 14# From github 15git clone https://github.com/deephyper/deephyper.git 16cd deephyper/ 17pip install -e '.[tests,docs]'
以上为本文内容,详细内容请学习文档内容
参考文献:https://deephyper.readthedocs.io/en/latest/?badge=latest