flink系列(5)-kafka源码分析

  • 2019 年 10 月 4 日
  • 笔记

最近一直在弄flink sql相关的东西,第一阶段的目标是从解决kafka的消费和写入的问题。不过也有些同学并不是很了解,今天我们来详细分析一下包的继承层次。

flink源码如下:

public class KafkaTableSourceFactory implements StreamTableSourceFactory<Row>{        private ConcurrentHashMap<String, KafkaTableSource> kafkaTableSources = new ConcurrentHashMap<>();        @Override      public Map<String, String> requiredContext() {          Map<String, String> context = new HashMap<>();          context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE);          context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION));          return context;      }        @Override      public List<String> supportedProperties() {          List<String> properties = new ArrayList<>();          properties.add(KafkaConnectorDescriptor.DATABASE_KEY);          properties.add(KafkaConnectorDescriptor.TABLE_KEY);          return properties;      }        @Override      public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {          //避免频繁的触发 是否需要加缓存          KafkaTableSource kafkaTableSource;          String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY);          String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY);          if (!kafkaTableSources.containsKey(dataBase + table)) {              Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder();              kafkaTableSource = builder                      .cluster(dataBase)                      .subject(table)                      .build();              kafkaTableSources.put(dataBase + table,kafkaTableSource);          } else {              kafkaTableSource = kafkaTableSources.get(dataBase + table);          }          return kafkaTableSource;      }    }
class Kafka08PBTableSource protected(topic: String,                                       properties: Properties,                                       schema: TableSchema,                                       typeInformation: TypeInformation[Row],                                       paramMap: util.LinkedHashMap[String, AnyRef],                                       entryClass: String)    extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) {      override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = {      this.setStartupMode(StartupMode.EARLIEST)      new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest()      }  }

下面用户自定义的kafka的sink类:

class Kafka08UDMPBTableSink (topic: String,                                properties: Properties,                                partitioner: Optional[FlinkKafkaPartitioner[Row]],                                paramMap: util.LinkedHashMap[String, AnyRef],                                serializationSchema: SerializationSchema[Row],                                fieldNames: Array[String],                                fieldTypes: Array[TypeInformation[_]]                              ) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) {      override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={      new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row]))    }      override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema      override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes)      override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = {      super.configure(this.fieldNames, this.fieldTypes)    }      override def getFieldNames: Array[String]=this.fieldNames      /** Returns the types of the table fields. */    override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes        override def emitDataStream(dataStream: DataStream[Row]): Unit = {      val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner)      dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames))    }    }
public class TrackRowDeserializationSchema implements SerializationSchema<Row>, DeserializationSchema<Row> {      private static final long serialVersionUID = -2885556750743978636L;        /** Type information describing the input type. */      private TypeInformation<Row> typeInfo = null;        private LinkedHashMap paraMap;        private String inSchema;      private String outSchema;      private String inClass;      private String outClass;  }
public class TrackRowFormatFactory extends TableFormatFactoryBase<Row>          implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {        public TrackRowFormatFactory() {          super(TrackValidator.FORMAT_TYPE_VALUE, 1, false);      }        public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) {          super(type, version, supportsSchemaDerivation);      }        @Override      protected List<String> supportedFormatProperties() {          final List<String> properties = new ArrayList<>();          properties.add(TrackValidator.FORMAT_IN_SCHEMA);          properties.add(TrackValidator.FORMAT_IN_CLASS);          properties.add(TrackValidator.FORMAT_OUT_CLASS);          properties.add(TrackValidator.FORMAT_OUT_SCHEMA);          properties.add(TrackValidator.FORMAT_TYPE_INFORMATION);          properties.add(TrackValidator.FORMAT_TYPE_VALUE);          return properties;      }  }