Flink kuduSink開發
1、繼承RichSinkFunction
(1)首先在構造方式傳入kudu的masterAddress地址、默認表名、TableSerializationSchema、KuduTableRowConverter、Properties配置對象
(2)重寫open方法
初始化KuduClient對象操作kudu,KuduSession對象並傳入一堆配置
(3)重寫invoke方法
核心是如果已傳入TableSerializationSchema對象,則通過其serializeTable方法從輸入的json數據里提取表名,如果未定義則直接取默認表名。拿到表名後就能使用KuduClient對象對其操作了
if (schema != null) { String serializeTableName = schema.serializeTable(row); if (serializeTableName == null) return; table = client.openTable(serializeTableName); } else table = client.openTable(tableName); insert = table.newInsert();
2、定義KuduTableRowConverter介面,將每一條輸入數據轉換成TableRow對象
public interface KuduTableRowConverter<IN> extends Serializable { TableRow convert(IN value); }
定義TableRow類,代表一行數據,key是字串型的鍵名,value是Object型的鍵值
public class TableRow implements Serializable { private static final long serialVersionUID = 1L; private Map<String, Object> pairs = new HashMap<>(); public int size() {return pairs.size();} public Map<String, Object> getPairs() {return pairs;} public Object getElement(String key) {return pairs.get(key);} public void putElement(String key, Object value) {pairs.put(key, value);} }
定義JsonKuduTableRowConverter實現KuduTableRowConverter介面,對於輸入的json數據,通過一系列轉換邏輯轉換成TableRow對象
3、定義TableSerializationSchema介面,從每一條輸入數據里提取表名
public interface TableSerializationSchema<IN> extends Serializable { String serializeTable(IN value); }
定義JsonLogidKeyTableSerializationSchema實現TableSerializationSchema介面,對於輸入的json數據,使用指定key值提取value值,然後再從一個預先獲取的map里找到這個value對應的表名,然後加上必要的前綴與後綴組成impala的表名