Flink非同步之矛-鋒利的Async I/O

  • 2020 年 2 月 10 日
  • 筆記

在Flink 流處理過程中,經常需要和外部系統進行交互,用維度表補全事實表中的欄位。

例如:在電商場景中,需要一個商品的skuid去關聯商品的一些屬性,例如商品所屬行業、商品的生產廠家、生產廠家的一些情況;在物流場景中,知道包裹id,需要去關聯包裹的行業屬性、發貨資訊、收貨資訊等等。

默認情況下,在Flink的MapFunction中,單個並行只能用同步方式去交互: 將請求發送到外部存儲,IO阻塞,等待請求返回,然後繼續發送下一個請求。這種同步交互的方式往往在網路等待上就耗費了大量時間。為了提高處理效率,可以增加MapFunction的並行度,但增加並行度就意味著更多的資源,並不是一種非常好的解決方式。

Async I/O非同步非阻塞請求

Flink 在1.2中引入了Async I/O,在非同步模式下,將IO操作非同步化,單個並行可以連續發送多個請求,哪個請求先返回就先處理,從而在連續的請求間不需要阻塞式等待,大大提高了流處理效率。

Async I/O 是阿里巴巴貢獻給社區的一個呼聲非常高的特性,解決與外部系統交互時網路延遲成為了系統瓶頸的問題。

圖中棕色的長條表示等待時間,可以發現網路等待時間極大地阻礙了吞吐和延遲。為了解決同步訪問的問題,非同步模式可以並發地處理多個請求和回復。也就是說,你可以連續地向資料庫發送用戶a、b、c等的請求,與此同時,哪個請求的回復先返回了就處理哪個回復,從而連續的請求之間不需要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實現原理。

詳細的原理可以參考文末給出的第一個鏈接,來自阿里巴巴雲邪的分享。

一個簡單的例子如下:

public classAsyncIOFunctionTest{      public static void main(String[] args) throws Exception {            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);          env.setParallelism(1);            Properties p = new Properties();          p.setProperty("bootstrap.servers", "localhost:9092");            DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p));          ds.print();            SingleOutputStreamOperator<Order> order = ds                  .map(new MapFunction<String, Order>() {                      @Override                      public Order map(String value) throws Exception {                          return new Gson().fromJson(value, Order.class);                      }                  })                  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {                      @Override                      public long extractAscendingTimestamp(Order element) {                          try {                              return element.getOrderTime();                          } catch (Exception e) {                              e.printStackTrace();                          }                          return 0;                      }                  })                  .keyBy(new KeySelector<Order, String>() {                      @Override                      public String getKey(Order value) throws Exception {                          return value.getUserId();                      }                  })                  .window(TumblingEventTimeWindows.of(Time.minutes(10)))                  .maxBy("orderTime");            SingleOutputStreamOperator<Tuple7<String, String, Integer, String, String, String, Long>> operator = AsyncDataStream                  .unorderedWait(order, new RichAsyncFunction<Order, Tuple7<String, String, Integer, String, String, String, Long>>() {                        private Connection connection;                        @Override                      public void open(Configuration parameters) throws Exception {                          super.open(parameters);                          Class.forName("com.mysql.jdbc.Driver");                          connection = DriverManager.getConnection("url", "user", "pwd");                          connection.setAutoCommit(false);                      }                        @Override                      public void asyncInvoke(Order input, ResultFuture<Tuple7<String, String, Integer, String, String, String, Long>> resultFuture) throws Exception {                          List<Tuple7<String, String, Integer, String, String, String, Long>> list = new ArrayList<>();                          // 在 asyncInvoke 方法中非同步查詢資料庫                          String userId = input.getUserId();                          Statement statement = connection.createStatement();                          ResultSet resultSet = statement.executeQuery("select name,age,sex from user where userid=" + userId);                          if (resultSet != null && resultSet.next()) {                              String name = resultSet.getString("name");                              int age = resultSet.getInt("age");                              String sex = resultSet.getString("sex");                              Tuple7<String, String, Integer, String, String, String, Long> res = Tuple7.of(userId, name, age, sex, input.getOrderId(), input.getPrice(), input.getOrderTime());                              list.add(res);                          }                            // 將數據搜集                          resultFuture.complete(list);                      }                        @Override                      public void close() throws Exception {                          super.close();                          if (connection != null) {                              connection.close();                          }                      }                  }, 5000, TimeUnit.MILLISECONDS,100);            operator.print();              env.execute("AsyncIOFunctionTest");      }  }

上述程式碼中,原始訂單流來自Kafka,去關聯維度表將訂單的用戶資訊取出來。從上面示例中可看到,我們在open()中創建連接對象,在close()方法中關閉連接,在RichAsyncFunction的asyncInvoke()方法中,直接查詢資料庫操作,並將數據返回出去。這樣一個簡單非同步請求就完成了。

Async I/O的原理和基本用法

簡單的來說,使用 Async I/O 對應到 Flink 的 API 就是 RichAsyncFunction 這個抽象類,繼層這個抽象類實現裡面的open(初始化),asyncInvoke(數據非同步調用),close(停止的一些操作)方法,最主要的是實現asyncInvoke 裡面的方法。

我們先來看一個使用Async I/O的模板方法:

  // This example implements the asynchronous request and callback with Futures that have the  // interface of Java 8's futures (which is the same one followed by Flink's Future)    /**   * An implementation of the 'AsyncFunction' that sends requests and sets the callback.   */  classAsyncDatabaseRequestextendsRichAsyncFunction<String, Tuple2<String, String>> {        /** The database specific client that can issue concurrent requests with callbacks */      private transient DatabaseClient client;        @Override      publicvoidopen(Configuration parameters) throws Exception {          client = new DatabaseClient(host, post, credentials);      }        @Override      publicvoidclose() throws Exception {          client.close();      }        @Override      publicvoidasyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {            // issue the asynchronous request, receive a future for result          final Future<String> result = client.query(key);            // set the callback to be executed once the request by the client is complete          // the callback simply forwards the result to the result future          CompletableFuture.supplyAsync(new Supplier<String>() {                @Override              public String get() {                  try {                      return result.get();                  } catch (InterruptedException | ExecutionException e) {                      // Normally handled explicitly.                      return null;                  }              }          }).thenAccept( (String dbResult) -> {              resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));          });      }  }    // create the original stream  DataStream<String> stream = ...;    // apply the async I/O transformation  DataStream<Tuple2<String, String>> resultStream =      AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);  

假設我們一個場景是需要進行非同步請求其他資料庫,那麼要實現一個通過非同步I/O來操作資料庫還需要三個步驟:   

1、實現用來分發請求的AsyncFunction   

2、獲取操作結果的callback,並將它提交到AsyncCollector中   

3、將非同步I/O操作轉換成DataStream

其中的兩個重要的參數:Timeouttimeout 定義了非同步操作過了多長時間後會被丟棄,這個參數是防止了死的或者失敗的請求 Capacity 這個參數定義了可以同時處理多少個非同步請求。雖然非同步I/O方法會帶來更好的吞吐量,但是運算元仍然會成為流應用的瓶頸。超過限制的並發請求數量會產生背壓。

幾個需要注意的點:

  • 使用Async I/O,需要外部存儲有支援非同步請求的客戶端。
  • 使用Async I/O,繼承RichAsyncFunction(介面AsyncFunction的抽象類),重寫或實現open(建立連接)、close(關閉連接)、asyncInvoke(非同步調用)3個方法即可。
  • 使用Async I/O, 最好結合快取一起使用,可減少請求外部存儲的次數,提高效率。
  • Async I/O 提供了Timeout參數來控制請求最長等待時間。默認,非同步I/O請求超時時,會引發異常並重啟或停止作業。如果要處理超時,可以重寫AsyncFunction#timeout方法。
  • Async I/O 提供了Capacity參數控制請求並發數,一旦Capacity被耗盡,會觸發反壓機制來抑制上游數據的攝入。
  • Async I/O 輸出提供亂序和順序兩種模式。 亂序, 用AsyncDataStream.unorderedWait(...) API,每個並行的輸出順序和輸入順序可能不一致。 順序, 用AsyncDataStream.orderedWait(...) API,每個並行的輸出順序和輸入順序一致。為保證順序,需要在輸出的Buffer中排序,該方式效率會低一些。

由於新合入的 Blink 相關功能,使得 Flink 1.9 實現維表功能很簡單。如果你要使用該功能,那就需要自己引入 Blink 的 Planner。

<dependency>      <groupId>org.apache.flink</groupId>      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>      <version>${flink.version}</version>  </dependency>

然後我們只要自定義實現 LookupableTableSource 介面,同時實現裡面的方法就可以進行,下面來分析一下 LookupableTableSource的程式碼:

public interfaceLookupableTableSource<T> extendsTableSource<T> {       TableFunction<T> getLookupFunction(String[] lookupKeys);       AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);       booleanisAsyncEnabled();  }

這三個方法分別是:

  • isAsyncEnabled 方法主要表示該表是否支援非同步訪問外部數據源獲取數據,當返回 true 時,那麼在註冊到 TableEnvironment 後,使用時會返回非同步函數進行調用,當返回 false 時,則使同步訪問函數。
  • getLookupFunction 方法返回一個同步訪問外部數據系統的函數,什麼意思呢,就是你通過 Key 去查詢外部資料庫,需要等到返回數據後才繼續處理數據,這會對系統處理的吞吐率有影響。
  • getAsyncLookupFunction 方法則是返回一個非同步的函數,非同步訪問外部數據系統,獲取數據,這能極大的提升系統吞吐率。

我們拋開同步訪問函數不管。對於getAsyncLookupFunction會返回非同步訪問外部數據源的函數,如果你想使用非同步函數,前提是 LookupableTableSource 的 isAsyncEnabled 方法返回 true 才能使用。使用非同步函數訪問外部數據系統,一般是外部系統有非同步訪問客戶端,如果沒有的話,可以自己使用執行緒池非同步訪問外部系統。例如:

public classMyAsyncLookupFunctionextendsAsyncTableFunction<Row> {      private transient RedisAsyncCommands<String, String> async;      @Override      publicvoidopen(FunctionContext context) throws Exception {          RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");          StatefulRedisConnection<String, String> connection = redisClient.connect();          async = connection.async();      }      publicvoideval(CompletableFuture<Collection<Row>> future, Object... params) {          redisFuture.thenAccept(new Consumer<String>() {              @Override              publicvoidaccept(String value) {                  future.complete(Collections.singletonList(Row.of(key, value)));              }          });      }  }

一個完整的例子如下:

Main方法:

import org.apache.flink.api.common.functions.MapFunction;  import org.apache.flink.api.common.serialization.SimpleStringSchema;  import org.apache.flink.api.common.typeinfo.TypeInformation;  import org.apache.flink.api.common.typeinfo.Types;  import org.apache.flink.api.java.typeutils.RowTypeInfo;  import org.apache.flink.api.java.utils.ParameterTool;  import org.apache.flink.streaming.api.datastream.DataStream;  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;  import org.apache.flink.table.api.EnvironmentSettings;  import org.apache.flink.table.api.Table;  import org.apache.flink.table.api.java.StreamTableEnvironment;  import org.apache.flink.types.Row;  import org.junit.Test;    import java.util.Properties;    public classLookUpAsyncTest{        @Test      public void test() throws Exception {          LookUpAsyncTest.main(new String[]{});      }        public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          //env.setParallelism(1);          EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);            final ParameterTool params = ParameterTool.fromArgs(args);          String fileName = params.get("f");          DataStream<String> source = env.readTextFile("hdfs://172.16.44.28:8020" + fileName, "UTF-8");            TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};          String[] fields = new String[]{"id", "user_click", "time"};          RowTypeInfo typeInformation = new RowTypeInfo(types, fields);            DataStream<Row> stream = source.map(new MapFunction<String, Row>() {              private static final long serialVersionUID = 2349572543469673349L;                @Override              public Row map(String s) {                  String[] split = s.split(",");                  Row row = new Row(split.length);                  for (int i = 0; i < split.length; i++) {                        Object value = split[i];                      if (types[i].equals(Types.STRING)) {                          value = split[i];                      }                      if (types[i].equals(Types.LONG)) {                          value = Long.valueOf(split[i]);                      }                      row.setField(i, value);                  }                  return row;              }          }).returns(typeInformation);            tableEnv.registerDataStream("user_click_name", stream, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");            RedisAsyncLookupTableSource tableSource = RedisAsyncLookupTableSource.Builder.newBuilder()                  .withFieldNames(new String[]{"id", "name"})                  .withFieldTypes(new TypeInformation[]{Types.STRING, Types.STRING})                  .build();          tableEnv.registerTableSource("info", tableSource);            String sql = "select t1.id,t1.user_click,t2.name" +                  " from user_click_name as t1" +                  " join info FOR SYSTEM_TIME AS OF t1.proctime as t2" +                  " on t1.id = t2.id";            Table table = tableEnv.sqlQuery(sql);            DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);            DataStream<String> printStream = result.map(new MapFunction<Row, String>() {              @Override              public String map(Row value) throws Exception {                  return value.toString();              }          });            Properties properties = new Properties();          properties.setProperty("bootstrap.servers", "127.0.0.1:9094");          FlinkKafkaProducer011<String> kafkaProducer = new FlinkKafkaProducer011<>(                  "user_click_name",                  new SimpleStringSchema(),                  properties);          printStream.addSink(kafkaProducer);            tableEnv.execute(Thread.currentThread().getStackTrace()[1].getClassName());      }  }

RedisAsyncLookupTableSource方法:

import org.apache.flink.api.common.typeinfo.TypeInformation;  import org.apache.flink.api.java.typeutils.RowTypeInfo;  import org.apache.flink.streaming.api.datastream.DataStream;  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  import org.apache.flink.table.api.TableSchema;  import org.apache.flink.table.functions.AsyncTableFunction;  import org.apache.flink.table.functions.TableFunction;  import org.apache.flink.table.sources.LookupableTableSource;  import org.apache.flink.table.sources.StreamTableSource;  import org.apache.flink.table.types.DataType;  import org.apache.flink.table.types.utils.TypeConversions;  import org.apache.flink.types.Row;    public classRedisAsyncLookupTableSourceimplementsStreamTableSource<Row>, LookupableTableSource<Row> {        private final String[] fieldNames;      private final TypeInformation[] fieldTypes;        publicRedisAsyncLookupTableSource(String[] fieldNames, TypeInformation[] fieldTypes) {         this.fieldNames = fieldNames;          this.fieldTypes = fieldTypes;      }        //同步方法      @Override      public TableFunction<Row> getLookupFunction(String[] strings) {          return null;      }        //非同步方法      @Override      public AsyncTableFunction<Row> getAsyncLookupFunction(String[] strings) {          return MyAsyncLookupFunction.Builder.getBuilder()                  .withFieldNames(fieldNames)                  .withFieldTypes(fieldTypes)                  .build();      }        //開啟非同步      @Override      publicbooleanisAsyncEnabled() {          return true;      }        @Override      public DataType getProducedDataType() {          return TypeConversions.fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames));      }        @Override      public TableSchema getTableSchema() {          return TableSchema.builder()                  .fields(fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes))                  .build();      }        @Override      public DataStream<Row> getDataStream(StreamExecutionEnvironment environment) {          throw new UnsupportedOperationException("do not support getDataStream");      }        public static final classBuilder{          private String[] fieldNames;          private TypeInformation[] fieldTypes;            privateBuilder() {          }            publicstatic Builder newBuilder() {              return new Builder();          }            public Builder withFieldNames(String[] fieldNames) {              this.fieldNames = fieldNames;              return this;          }            public Builder withFieldTypes(TypeInformation[] fieldTypes) {              this.fieldTypes = fieldTypes;              return this;          }            public RedisAsyncLookupTableSource build() {              return new RedisAsyncLookupTableSource(fieldNames, fieldTypes);          }      }  }

MyAsyncLookupFunction

import io.lettuce.core.RedisClient;  import io.lettuce.core.RedisFuture;  import io.lettuce.core.api.StatefulRedisConnection;  import io.lettuce.core.api.async.RedisAsyncCommands;  import org.apache.flink.api.common.typeinfo.TypeInformation;  import org.apache.flink.api.java.typeutils.RowTypeInfo;  import org.apache.flink.table.functions.AsyncTableFunction;  import org.apache.flink.table.functions.FunctionContext;  import org.apache.flink.types.Row;    import java.util.Collection;  import java.util.Collections;  import java.util.concurrent.CompletableFuture;  import java.util.function.Consumer;    public classMyAsyncLookupFunctionextendsAsyncTableFunction<Row> {        private final String[] fieldNames;      private final TypeInformation[] fieldTypes;        private transient RedisAsyncCommands<String, String> async;        publicMyAsyncLookupFunction(String[] fieldNames, TypeInformation[] fieldTypes) {          this.fieldNames = fieldNames;          this.fieldTypes = fieldTypes;      }        @Override      publicvoidopen(FunctionContext context) {          //配置redis非同步連接          RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");          StatefulRedisConnection<String, String> connection = redisClient.connect();          async = connection.async();      }        //每一條流數據都會調用此方法進行join      publicvoideval(CompletableFuture<Collection<Row>> future, Object... paramas) {          //表名、主鍵名、主鍵值、列名          String[] info = {"userInfo", "userId", paramas[0].toString(), "userName"};          String key = String.join(":", info);          RedisFuture<String> redisFuture = async.get(key);            redisFuture.thenAccept(new Consumer<String>() {              @Override              publicvoidaccept(String value) {                  future.complete(Collections.singletonList(Row.of(key, value)));              }          });      }        @Override      public TypeInformation<Row> getResultType() {          return new RowTypeInfo(fieldTypes, fieldNames);      }        public static final classBuilder{          private String[] fieldNames;          private TypeInformation[] fieldTypes;            private Builder() {          }            public static Builder getBuilder() {              return new Builder();          }            public Builder withFieldNames(String[] fieldNames) {              this.fieldNames = fieldNames;              return this;          }            public Builder withFieldTypes(TypeInformation[] fieldTypes) {              this.fieldTypes = fieldTypes;              return this;          }            public MyAsyncLookupFunction build() {              return new MyAsyncLookupFunction(fieldNames, fieldTypes);          }      }  }

使用Async十分需要注意的幾個點:

1、 外部數據源必須是非同步客戶端:如果是執行緒安全的,你可以不加 transient 關鍵字,初始化一次。否則,你需要加上 transient,不對其進行初始化,而在 open 方法中,為每個 Task 實例初始化一個。

2、eval 方法中多了一個 CompletableFuture,當非同步訪問完成時,需要調用其方法進行處理。比如上面例子中的:

redisFuture.thenAccept(new Consumer<String>() {              @Override              public void accept(String value) {                  future.complete(Collections.singletonList(Row.of(key, value)));              }          });

3、社區雖然提供非同步關聯維度表的功能,但事實上大數據量下關聯外部系統維表仍然會成為系統的瓶頸,所以一般我們會在同步函數和非同步函數中加入快取。綜合併發、易用、實時更新和多版本等因素考慮,Hbase可能是最理想的外部維表。

參考文章:

http://wuchong.me/blog/2017/05/17/flink-internals-async-io/#

https://www.jianshu.com/p/d8f99d94b761

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673

https://www.jianshu.com/p/7ce84f978ae0