寫給大忙人的Flink的Data Types

  • 2020 年 3 月 18 日
  • 筆記

一.Flink 中 Data Type 組成

  • 基本數據類型:java 的 8 中基本數據類型加上它們各自的包裝類型,在加上 void , String, Date,BigDecimal, BigInteger.
  • 基本數據類型的數據和 Object 類型的數組
  • 複合類型 1.Flink Java Tuples 2. scala case classes 3. Row 4. POJOs: 如果要被 Flink 識別的也允許按 name 引用的話,需要復符合一定的規則(否則的話,會被當做泛型處理) 1). 這個類是 pulic 的並且沒有非靜態的內部類。 2). 得有一個沒有參數的 pulic 構造器 3).所有非靜態的非 transient 的屬性(包括所有的父類)都必須是 pulic 或者符合 java beans 命名規範的 getter setter 方法。
  • 輔助類型 (集合類、Option、Either 等)
  • 泛型:不會被 Flink 自帶的序列化器序列化,而被是 Kryo

二、Flink 是如何處理 Data Type 的 首先Flink會根據自身的序列化器進行序列化,如果不行,則默認回退到 Kryo 序列化器進行序列化。

可能碰到的問題,如下:

  • Registering subtypes 如果方法簽名是父類,而返回或者使用的是子類,也就是所謂的協變返回類型關於協變返回類型。讓 Flink 知道所有的子類可以在一定的程度上提高性能。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  		env.registerType(KuduTableDesc.class);
  • Registering custom serializers 雖然 Flink 自己序列化不了的會給 Kryo,但是 Kryo 也不能很好的處理掉所有的類型,這個時候就要自定義序列化器了。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();    env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
  • Adding Type Hints Flink 可能無法推斷出泛型的類型,僅僅在 Java Api 中時必要的。
DataSet<SomeType> result = dataSet      .map(new MyGenericNonInferrableFunction<Long, SomeType>())          .returns(SomeType.class);    DataSet<SomeType> result = dataSet      .map(new MyGenericNonInferrableFunction<Long, SomeType>())          .returns(new TypeHint<SomeType.class});
  • Manually creating a TypeInformation Flink 可能無法推斷出泛型的類型時
TypeInformation<String> info = TypeInformation.of(String.class);    TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});

三、常見的 returns 的使用

.returns(Types.TUPLE(Types.INT,Types.INT))  .returns(Types.STRING)  .returns(TypeInformation.of(String.class))  .returns(new TypeHint<Tuple2<String, String>>(){})  .returns(TypeInformation.of(new TypeHint<Tuple2<ConsumerRecord, String>>() {}))  .returns(SomeType.class)