livy提交spark應用

 

  

spark-submit的使用shell時時靈活性較低,livy作為spark提交的一種工具,是使用介面或者java客戶端的方式提交,可以集成到web應用中

1.客戶端提交的方式

//livy.incubator.apache.org/docs/latest/programmatic-api.html

核心程式碼

LivyClient client = new LivyClientBuilder()
  .setURI(new URI(livyUrl))
  .build();

try {
  System.err.printf("Uploading %s to the Spark context...\n", piJar);
  client.uploadJar(new File(piJar)).get();

  System.err.printf("Running PiJob with %d samples...\n", samples);
  double pi = client.submit(new PiJob(samples)).get();

  System.out.println("Pi is roughly: " + pi);
} finally {
  client.stop(true);
}

2.REST API

//livy.incubator.apache.org/docs/latest/rest-api.html

1.以最常使用的batches介面作為例子,請求參數

 

 rest 的http

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpUtils {
//post 請求
 public  String postAccess(String url, Map<String, String> headers, String data) {

        HttpPost post = new HttpPost(url);
        if (headers != null && headers.size() > 0) {
            headers.forEach((K, V) -> post.addHeader(K, V));
        }
        try {
            StringEntity entity = new StringEntity(data);
            entity.setContentEncoding("UTF-8");
            entity.setContentType("application/json");
            post.setEntity(entity);
            HttpResponse response = httpClient.execute(post);
            HttpEntity resultEntity = response.getEntity();
            result = EntityUtils.toString(resultEntity);
            return result;
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("postAccess執行有誤" + e.getMessage());
        }
        return result;
    }
}  

livy提交spark應用類,非同步執行緒進行狀態列印或者也可以狀態監控返回web端

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wanmi.sbc.dw.utils.GsonUtil;
import com.wanmi.sbc.dw.utils.HttpUtils;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

/**
 * @ClassName: com.spark.submit.impl.livy.LivyApp
 * @Description: livy提交spark任務
 * @Author: 小何
 * @Time: 2020/12/15 10:46
 * @Version: 1.0
 */
@Component
public class LivyServer {
    private static final Logger logger = LoggerFactory.getLogger(LivyServer.class);

    private static final List<String> FAIl_STATUS_LIST = Arrays.asList("shutting_down", "error", "dead", "killed");
    private final HashMap<String, String> headers;

    private HttpUtils httpUtils;

    public LivyServer() {
        headers = new HashMap<>();
        headers.put("Content-Type", "application/json");
        headers.put("X-Requested-By", "admin");
    }


    /**
     * 提交參數
     *
     * @param livyParam
     * @return
     */
    @SneakyThrows
    public String batchSubmit(LivyParam livyParam) {
        this.httpUtils = new HttpUtils();
        String livyUri = livyParam.getLivyUri();
        LivyParam livyParamCopy = new LivyParam();
        BeanUtils.copyProperties(livyParam, livyParamCopy);
        livyParamCopy.setLivyUri(null);
        String request = GsonUtil.toJsonString(livyParamCopy);
        logger.info("任務提交資訊{}", request);
        String result = httpUtils.postAccess(livyUri + "/batches", headers, request);
        if (!GsonUtil.isJson(result)) {
            logger.info("任務提交錯誤:{}", result);
            return "error:" + result;
        }
        if (result == null) {
            return "error:" + "livy地址:" + livyUri + "錯誤,請檢查";
        }
        logger.info("提交返回任務返回資訊:{}", result);
        JSONObject jsonObject = JSONObject.parseObject(result);
        String state = jsonObject.getString("state");
        String id = jsonObject.getString("id");
        Thread thread = new Thread(() -> {
            try {
                queryState(livyParam.getLivyUri(), id, state);
            } catch (InterruptedException | IOException e) {
                logger.error("執行緒運行出錯:{}", e.fillInStackTrace());
            }
        }, livyParam.getName() + System.currentTimeMillis());
        thread.start();
        return result;
    }


    //提交任務執行狀態驗證
    public void queryState(String livyUrl, String batchId, String responseState) throws InterruptedException, IOException {
        if (responseState != null && !FAIl_STATUS_LIST.contains(responseState)) {
            boolean isRunning = true;
            while (isRunning) {
                String url = livyUrl + "/batches/" + batchId;
                String batchesInfo = httpUtils.getAccess(url, headers);
                JSONObject info = JSON.parseObject(batchesInfo);
                String id = info.getString("id");
                String sta = info.getString("state");
                String appId = info.getString("appId");
                String appInfo = info.getString("appInfo");
                logger.info("livy:sessionId:{},state:{}", id, sta);
                if ("success".equals(sta)) {
                    logger.info("任務{}:執行完成", appId, appInfo);
                    httpUtils.close();
                    isRunning = false;
                } else if (FAIl_STATUS_LIST.contains(sta) || sta == null) {
                    logger.error("任務{}執行有誤,請檢查後重新提交:\n", appId, batchesInfo);
                    httpUtils.close();
                    isRunning = false;
                } else if ("running".equals(sta) || "idle".equals(sta) || "starting".equals(sta)) {
                    logger.info("查看任務{},運行狀態:\n{}", appId, batchesInfo);
                } else {
                    logger.info("任務{}狀態:{},未知,退出任務查看", id, sta);
                    isRunning = false;
                }
                Thread.sleep(5000);
            }
        }
    }
}

livy請求參數

@Data
public class LivyParam {
    /**
     * livy的地址
     */
    private String livyUri;

    /**
     * 要運行的jar包路徑
     */
    private String file;
    /**
     * 運行的代理名
     */
    private String proxyUser;
    /**
     * 運行主類
     */
    private String className;
    /**
     * 主類的參數
     */
        private List<String> args;
    /**
     * 需要運行的jar包
     */
    private String thirdJarPath;
    private List<String> jars;
    private List<String> pyFiles;
    private List<String> files;
    private String driverMemory;
    private Integer driverCores;
    private String executorMemory;
    private Integer executorCores;
    private Integer numExecutors;
    private List<String> archives;
    /**
     * 隊列
     */
    private String queue;
    /**
     * appName
     */
    private String name;
    /**
     * 其他配置
     */
    private Map<String, String> conf;

}

測試

      構建參數
        new  livyParam = new LivyParam();
        livyParam.setLivyUri(sparkSubmitParam.getLivyUri());
        livyParam.setClassName(sparkSubmitParam.getClassName());
        livyParam.setArgs(sparkSubmitParam.getArgs());
        livyParam.setConf(sparkSubmitParam.getConf());
        livyParam.setDriverCores(sparkSubmitParam.getDriverCores());
        livyParam.setDriverMemory(sparkSubmitParam.getDriverMemory());
        livyParam.setArchives(sparkSubmitParam.getArchives());
        livyParam.setExecutorCores(sparkSubmitParam.getExecutorCores());
        livyParam.setExecutorMemory(sparkSubmitParam.getExecutorMemory());
        livyParam.setJars(sparkSubmitParam.getJars());
        livyParam.setFile(sparkSubmitParam.getFile());
        livyParam.setName(sparkSubmitParam.getName());
        livyParam.setQueue(sparkSubmitParam.getQueue());
        livyParam.setProxyUser(sparkSubmitParam.getProxyUser());

//發送請求
   String result = liveServer.batchSubmit(livyParam);