Presto Event Listener开发

  • 2019 年 10 月 3 日
  • 筆記

??

?Hive Hook???Presto????????Event Listener?????Presto??????????????????????????????????????????????????????????????Presto????????Hive Hook????Presto???????????Event Listener???????

Event Listener??Plugin???????

  • Query Creation(????????)
  • Query completion (success or failure)(????????????????????????????????)
  • Split completion (success or failure)(split???????????????????)

??Hook?Listener???????????????????????

  1. ??Presto Event Listener?EventListenerFactory???
  2. ????????jar?
  3. ?????Presto????????????

??

  1. ??EventListener???????????????????????????
public interface EventListener  {      //query???????      default void queryCreated(QueryCreatedEvent queryCreatedEvent)      {      }      //query???????      default void queryCompleted(QueryCompletedEvent queryCompletedEvent)      {      }      //split???????      default void splitCompleted(SplitCompletedEvent splitCompletedEvent)      {      }  }  
  1. ??EventListenerFactory?????????EventListener
  2. ??Plugin?????getEventListenerFactories()????????????EventListenerFactory
  3. ????????etc/event-listener.properties???event-listener.name?????????????plugin???????

??

?????????????????????????????????????????????????????????????????Presto?????????????????????????????Mysql???????

Maven Pom

<dependency>        <groupId>com.facebook.presto</groupId>        <artifactId>presto-spi</artifactId>        <version>0.220</version>        <scope>compile</scope>      </dependency>  

QueryEventListenerFactory

public class QueryEventListenerFactory implements EventListenerFactory {      @Override    public String getName() {      return "query-event-listener";    }      @Override    public EventListener create(Map<String, String> config) {      if (!config.containsKey("jdbc.uri")) {        throw new RuntimeException("/etc/event-listener.properties file missing jdbc.uri");      }      if (!config.containsKey("jdbc.user")) {        throw new RuntimeException("/etc/event-listener.properties file missing jdbc.user");      }      if (!config.containsKey("jdbc.pwd")) {        throw new RuntimeException("/etc/event-listener.properties file missing jdbc.pwd");      }        return new QueryEventListener(config);    }  }  

QueryEventPlugin

public class QueryEventPlugin implements Plugin {      @Override    public Iterable<EventListenerFactory> getEventListenerFactories() {      EventListenerFactory listenerFactory = new QueryEventListenerFactory();      return Arrays.asList(listenerFactory);    }  }  

QueryEventListener

public class QueryEventListener implements EventListener {      private Map<String, String> config;    private Connection connection;      public QueryEventListener(Map<String, String> config) {      this.config = new HashMap<>();      this.config.putAll(config);      init();    }      private void init() {      try {        if (connection == null || !connection.isValid(10)) {          Class.forName("com.mysql.jdbc.Driver");          connection = DriverManager              .getConnection(config.get("jdbc.uri"), config.get("jdbc.user"), config.get("jdbc.pwd"));        }      } catch (SQLException | ClassNotFoundException e) {        e.printStackTrace();      }    }      @Override    public void queryCreated(QueryCreatedEvent queryCreatedEvent) {    }      @Override    public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {      String queryId = queryCompletedEvent.getMetadata().getQueryId();      String querySql = queryCompletedEvent.getMetadata().getQuery();      String queryState = queryCompletedEvent.getMetadata().getQueryState();      String queryUser = queryCompletedEvent.getContext().getUser();      long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();      long endTime = queryCompletedEvent.getEndTime().toEpochMilli();      long startTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli();      //insert into query execution table        long analysisTime = queryCompletedEvent.getStatistics().getAnalysisTime().orElse(Duration.ZERO)          .toMillis();      long cpuTime = queryCompletedEvent.getStatistics().getCpuTime().toMillis();      long queuedTime = queryCompletedEvent.getStatistics().getQueuedTime().toMillis();      long wallTime = queryCompletedEvent.getStatistics().getWallTime().toMillis();      int completedSplits = queryCompletedEvent.getStatistics().getCompletedSplits();      double cumulativeMemory = queryCompletedEvent.getStatistics().getCumulativeMemory();      long outputBytes = queryCompletedEvent.getStatistics().getOutputBytes();      long outputRows = queryCompletedEvent.getStatistics().getOutputRows();      long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();      long totalRows = queryCompletedEvent.getStatistics().getTotalRows();      long writtenBytes = queryCompletedEvent.getStatistics().getWrittenBytes();      long writtenRows = queryCompletedEvent.getStatistics().getWrittenRows();      //insert into query info table        queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {        int code = queryFailureInfo.getErrorCode().getCode();        String name = queryFailureInfo.getErrorCode().getName();        String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();        String failureHost = queryFailureInfo.getFailureHost().orElse("").toUpperCase();        String failureMessage = queryFailureInfo.getFailureMessage().orElse("").toUpperCase();        String failureTask = queryFailureInfo.getFailureTask().orElse("").toUpperCase();        String failuresJson = queryFailureInfo.getFailuresJson();        // insert into failed query table      });    }        @Override    public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {      long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();      long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MIN).toEpochMilli();      String payload = splitCompletedEvent.getPayload();      String queryId = splitCompletedEvent.getQueryId();      String stageId = splitCompletedEvent.getStageId();      long startTime = splitCompletedEvent.getStartTime().orElse(Instant.MIN).toEpochMilli();      String taskId = splitCompletedEvent.getTaskId();      long completedDataSizeBytes = splitCompletedEvent.getStatistics().getCompletedDataSizeBytes();      long completedPositions = splitCompletedEvent.getStatistics().getCompletedPositions();      long completedReadTime = splitCompletedEvent.getStatistics().getCompletedReadTime().toMillis();      long cpuTime = splitCompletedEvent.getStatistics().getCpuTime().toMillis();      long queuedTime = splitCompletedEvent.getStatistics().getQueuedTime().toMillis();      long wallTime = splitCompletedEvent.getStatistics().getWallTime().toMillis();      //insert into stage info table    }    }  

??

  1. Presto??????????SPI????Presto?Presto??SPI???????????????????SPI?????????????????src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin????????????????????? com.ji3jin.presto.listener.QueryEventListener
  2. ??mvn clean install??

??

  1. ??????etc/event-listener.properties
event-listener.name=query-event-listener    jdbc.uri=jdbc:mysql://localhost:3306/presto_monitor  jdbc.user=presto  jdbc.pwd=presto123  
  1. ?presto??????query-event-listener??????????event listener?name??
  2. ????jar??mysql connector?jar???????????
  3. ????Presto????

???????????????????Mysql??????????????????????????????????????????????????

???????????????