Presto Event Listener开发
- 2019 年 10 月 3 日
- 筆記
??
?Hive Hook???Presto????????Event Listener?????Presto??????????????????????????????????????????????????????????????Presto????????Hive Hook????Presto???????????Event Listener???????
Event Listener??Plugin???????
- Query Creation(????????)
- Query completion (success or failure)(????????????????????????????????)
- Split completion (success or failure)(split???????????????????)
??Hook?Listener???????????????????????
- ??Presto Event Listener?EventListenerFactory???
- ????????jar?
- ?????Presto????????????
??
- ??EventListener???????????????????????????
public interface EventListener { //query??????? default void queryCreated(QueryCreatedEvent queryCreatedEvent) { } //query??????? default void queryCompleted(QueryCompletedEvent queryCompletedEvent) { } //split??????? default void splitCompleted(SplitCompletedEvent splitCompletedEvent) { } }
- ??EventListenerFactory?????????EventListener
- ??Plugin?????getEventListenerFactories()????????????EventListenerFactory
- ????????etc/event-listener.properties???event-listener.name?????????????plugin???????
??
?????????????????????????????????????????????????????????????????Presto?????????????????????????????Mysql???????
Maven Pom
<dependency> <groupId>com.facebook.presto</groupId> <artifactId>presto-spi</artifactId> <version>0.220</version> <scope>compile</scope> </dependency>
QueryEventListenerFactory
public class QueryEventListenerFactory implements EventListenerFactory { @Override public String getName() { return "query-event-listener"; } @Override public EventListener create(Map<String, String> config) { if (!config.containsKey("jdbc.uri")) { throw new RuntimeException("/etc/event-listener.properties file missing jdbc.uri"); } if (!config.containsKey("jdbc.user")) { throw new RuntimeException("/etc/event-listener.properties file missing jdbc.user"); } if (!config.containsKey("jdbc.pwd")) { throw new RuntimeException("/etc/event-listener.properties file missing jdbc.pwd"); } return new QueryEventListener(config); } }
QueryEventPlugin
public class QueryEventPlugin implements Plugin { @Override public Iterable<EventListenerFactory> getEventListenerFactories() { EventListenerFactory listenerFactory = new QueryEventListenerFactory(); return Arrays.asList(listenerFactory); } }
QueryEventListener
public class QueryEventListener implements EventListener { private Map<String, String> config; private Connection connection; public QueryEventListener(Map<String, String> config) { this.config = new HashMap<>(); this.config.putAll(config); init(); } private void init() { try { if (connection == null || !connection.isValid(10)) { Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager .getConnection(config.get("jdbc.uri"), config.get("jdbc.user"), config.get("jdbc.pwd")); } } catch (SQLException | ClassNotFoundException e) { e.printStackTrace(); } } @Override public void queryCreated(QueryCreatedEvent queryCreatedEvent) { } @Override public void queryCompleted(QueryCompletedEvent queryCompletedEvent) { String queryId = queryCompletedEvent.getMetadata().getQueryId(); String querySql = queryCompletedEvent.getMetadata().getQuery(); String queryState = queryCompletedEvent.getMetadata().getQueryState(); String queryUser = queryCompletedEvent.getContext().getUser(); long createTime = queryCompletedEvent.getCreateTime().toEpochMilli(); long endTime = queryCompletedEvent.getEndTime().toEpochMilli(); long startTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli(); //insert into query execution table long analysisTime = queryCompletedEvent.getStatistics().getAnalysisTime().orElse(Duration.ZERO) .toMillis(); long cpuTime = queryCompletedEvent.getStatistics().getCpuTime().toMillis(); long queuedTime = queryCompletedEvent.getStatistics().getQueuedTime().toMillis(); long wallTime = queryCompletedEvent.getStatistics().getWallTime().toMillis(); int completedSplits = queryCompletedEvent.getStatistics().getCompletedSplits(); double cumulativeMemory = queryCompletedEvent.getStatistics().getCumulativeMemory(); long outputBytes = queryCompletedEvent.getStatistics().getOutputBytes(); long outputRows = queryCompletedEvent.getStatistics().getOutputRows(); long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes(); long totalRows = queryCompletedEvent.getStatistics().getTotalRows(); long writtenBytes = queryCompletedEvent.getStatistics().getWrittenBytes(); long writtenRows = queryCompletedEvent.getStatistics().getWrittenRows(); //insert into query info table queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> { int code = queryFailureInfo.getErrorCode().getCode(); String name = queryFailureInfo.getErrorCode().getName(); String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase(); String failureHost = queryFailureInfo.getFailureHost().orElse("").toUpperCase(); String failureMessage = queryFailureInfo.getFailureMessage().orElse("").toUpperCase(); String failureTask = queryFailureInfo.getFailureTask().orElse("").toUpperCase(); String failuresJson = queryFailureInfo.getFailuresJson(); // insert into failed query table }); } @Override public void splitCompleted(SplitCompletedEvent splitCompletedEvent) { long createTime = splitCompletedEvent.getCreateTime().toEpochMilli(); long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MIN).toEpochMilli(); String payload = splitCompletedEvent.getPayload(); String queryId = splitCompletedEvent.getQueryId(); String stageId = splitCompletedEvent.getStageId(); long startTime = splitCompletedEvent.getStartTime().orElse(Instant.MIN).toEpochMilli(); String taskId = splitCompletedEvent.getTaskId(); long completedDataSizeBytes = splitCompletedEvent.getStatistics().getCompletedDataSizeBytes(); long completedPositions = splitCompletedEvent.getStatistics().getCompletedPositions(); long completedReadTime = splitCompletedEvent.getStatistics().getCompletedReadTime().toMillis(); long cpuTime = splitCompletedEvent.getStatistics().getCpuTime().toMillis(); long queuedTime = splitCompletedEvent.getStatistics().getQueuedTime().toMillis(); long wallTime = splitCompletedEvent.getStatistics().getWallTime().toMillis(); //insert into stage info table } }
??
- Presto??????????SPI????Presto?Presto??SPI???????????????????SPI?????????????????
src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin
?????????????????????com.ji3jin.presto.listener.QueryEventListener
- ??
mvn clean install
??
??
- ??????etc/event-listener.properties
event-listener.name=query-event-listener jdbc.uri=jdbc:mysql://localhost:3306/presto_monitor jdbc.user=presto jdbc.pwd=presto123
- ?presto??????
query-event-listener
??????????event listener?name?? - ????jar??mysql connector?jar???????????
- ????Presto????
???????????????????Mysql??????????????????????????????????????????????????
???????????????