Flink kuduSink開發

1、繼承RichSinkFunction

(1)首先在構造方式傳入kudu的masterAddress地址、默認表名、TableSerializationSchema、KuduTableRowConverter、Properties配置對象

(2)重寫open方法

初始化KuduClient對象操作kudu,KuduSession對象並傳入一堆配置

(3)重寫invoke方法

核心是如果已傳入TableSerializationSchema對象,則通過其serializeTable方法從輸入的json數據里提取表名,如果未定義則直接取默認表名。拿到表名後就能使用KuduClient對象對其操作了

if (schema != null) {
String serializeTableName = schema.serializeTable(row);
if (serializeTableName == null) return;
table = client.openTable(serializeTableName);
}
else
table = client.openTable(tableName);
insert = table.newInsert();

2、定義KuduTableRowConverter介面,將每一條輸入數據轉換成TableRow對象

public interface KuduTableRowConverter<IN> extends Serializable {
TableRow convert(IN value);
}

定義TableRow類,代表一行數據,key是字串型的鍵名,value是Object型的鍵值

public class TableRow implements Serializable {
private static final long serialVersionUID = 1L;
private Map<String, Object> pairs = new HashMap<>();
public int size() {return pairs.size();}
public Map<String, Object> getPairs() {return pairs;}
public Object getElement(String key) {return pairs.get(key);}
public void putElement(String key, Object value) {pairs.put(key, value);}
}

定義JsonKuduTableRowConverter實現KuduTableRowConverter介面,對於輸入的json數據,通過一系列轉換邏輯轉換成TableRow對象

3、定義TableSerializationSchema介面,從每一條輸入數據里提取表名

public interface TableSerializationSchema<IN> extends Serializable {
String serializeTable(IN value);
}

定義JsonLogidKeyTableSerializationSchema實現TableSerializationSchema介面,對於輸入的json數據,使用指定key值提取value值,然後再從一個預先獲取的map里找到這個value對應的表名,然後加上必要的前綴與後綴組成impala的表名