storm(3)-任务提交

  • 2019 年 10 月 4 日
  • 笔记

storm job的提交分为本地模式和远程模式

下面我们先从代码入手,分析一下两者的提交

1.本地模式

2.远程模式提交

通过上面两种代码的分析发现本地模式和远程模式还是有着很大的区别

但是如果我们刨根问底会发现其实最终都是一致的

本地模式其实其实使用的是127.0.0.1,如果在storm集群上,借助storm jar则使用的是storm.yaml中的配置

下面我们讲一下通过java的Rumtime exec的方式进行storm jar的提交

public void submitTopologyToMachine(String nimbusAddress, String fileName, List<String> mainArgs, String mainClass, String jarFile){          StringBuffer args = new StringBuffer();          args.append(jarFile).append(" ");          for (String arg: mainArgs){              args.append(arg).append(" ");          }          args.append("-Dstorm.options=nimbus.host=").append(nimbusAddress).append(" ");          args.append(mainClass);          if (fileName != null){              args.append(" ").append(fileName);          }          InputStream is = null;          try {              Runtime rt = Runtime.getRuntime();              String command = "/home/apps/platform/storm/bin/storm/bin/storm jar "+args.toString();              LOG.info("submit topology command is {}", command);              String[] commandStr = {"/bin/sh","-c",command};              Process proc = rt.exec(commandStr);              if (LOG.isDebugEnabled()) {                  is = proc.getErrorStream();                  BufferedReader reader = new BufferedReader(new InputStreamReader(is));                  String line;                  while ((line = reader.readLine()) != null) {                      LOG.debug("submit topology result line {}", line);                  }              }              int exitVal = proc.waitFor();              LOG.info("submit topology result is {}", exitVal==0? "success" : "failure");            } catch (IOException e) {              LOG.error("submit job is exception {}", e);          } catch (InterruptedException e) {              LOG.error("submit job is exception {}", e);          }finally {              try {                  if (is != null){                      is.close();                  }              } catch (IOException e) {                }          }      }

这是利用Runtime.getRuntime().exec()来执行,但是只能在linux上执行,而且还需要提前放置一份storm的安装包