[Spring cloud 一步步实现广告系统] 16. 增量索引实现以及投送数据到MQ(kafka)

  • 2019 年 10 月 3 日
  • 笔记

实现增量数据索引

上一节中,我们为实现增量索引的加载做了充足的准备,使用到mysql-binlog-connector-java 开源组件来实现MySQL 的binlog监听,关于binlog的相关知识,大家可以自行网络查阅。或者可以mailto:[email protected]

本节我们将根据binlog 的数据对象,来实现增量数据的处理,我们构建广告的增量数据,其实说白了就是为了在后期能把广告投放到索引服务,实现增量数据到增量索引的生成。Let’s code.

  • 定义一个投递增量数据的接口(接收参数为我们上一节定义的binlog日志的转换对象)
/**   * ISender for 投递增量数据 方法定义接口   *   * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>   */  public interface ISender {        void sender(MysqlRowData rowData);  }
  • 创建增量索引监听器
/**   * IncrementListener for 增量数据实现监听   *   * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>   * @since 2019/6/27   */  @Slf4j  @Component  public class IncrementListener implements Ilistener {        private final AggregationListener aggregationListener;        @Autowired      public IncrementListener(AggregationListener aggregationListener) {          this.aggregationListener = aggregationListener;      }        //根据名称选择要注入的投递方式      @Resource(name = "indexSender")      private ISender sender;        /**       * 标注为 {@link PostConstruct},       * 即表示在服务启动,Bean完成初始化之后,立刻初始化       */      @Override      @PostConstruct      public void register() {          log.info("IncrementListener register db and table info.");          Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this));      }        @Override      public void onEvent(BinlogRowData eventData) {          TableTemplate table = eventData.getTableTemplate();          EventType eventType = eventData.getEventType();            //包装成最后需要投递的数据          MysqlRowData rowData = new MysqlRowData();          rowData.setTableName(table.getTableName());          rowData.setLevel(eventData.getTableTemplate().getLevel());          //将EventType转为OperationTypeEnum          OperationTypeEnum operationType = OperationTypeEnum.convert(eventType);          rowData.setOperationTypeEnum(operationType);            //获取模版中该操作对应的字段列表          List<String> fieldList = table.getOpTypeFieldSetMap().get(operationType);          if (null == fieldList) {              log.warn("{} not support for {}.", operationType, table.getTableName());              return;          }            for (Map<String, String> afterMap : eventData.getAfter()) {              Map<String, String> _afterMap = new HashMap<>();              for (Map.Entry<String, String> entry : afterMap.entrySet()) {                  String colName = entry.getKey();                  String colValue = entry.getValue();                    _afterMap.put(colName, colValue);              }                rowData.getFieldValueMap().add(_afterMap);          }          sender.sender(rowData);      }  }
开启binlog监听
  • 首先来配置监听binlog的数据库连接信息
adconf:    mysql:      host: 127.0.0.1      port: 3306      username: root      password: 12345678      binlogName: ""      position: -1 # 从当前位置开始监听

编写配置类:

/**   * BinlogConfig for 定义监听Binlog的配置信息   *   * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>   */  @Component  @ConfigurationProperties(prefix = "adconf.mysql")  @Data  @AllArgsConstructor  @NoArgsConstructor  public class BinlogConfig {      private String host;      private Integer port;      private String username;      private String password;      private String binlogName;      private Long position;  }

在我们实现 监听binlog那节,我们实现了一个自定义client CustomBinlogClient,需要实现binlog的监听,这个监听的客户端就必须是一个独立运行的线程,并且要在程序启动的时候进行监听,我们来实现运行当前client的方式,这里我们会使用到一个新的Runnerorg.springframework.boot.CommandLineRunner,let’s code.

@Slf4j  @Component  public class BinlogRunner implements CommandLineRunner {        @Autowired      private CustomBinlogClient binlogClient;        @Override      public void run(String... args) throws Exception {          log.info("BinlogRunner is running...");          binlogClient.connect();      }  }
增量数据投递

在binlog监听的过程中,我们看到针对于int, String 这类数据字段,mysql的记录是没有问题的,但是针对于时间类型,它被格式化成了字符串类型:Fri Jun 21 15:07:53 CST 2019

--------Insert-----------  WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[  [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]  --------Update-----------  UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[      {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}

对于这个时间格式,我们需要关注2点信息:

  • CST,这个时间格式会比我们的时间+ 8h(中国标准时间 China Standard Time UT+8:00)
  • 需要对这个日期进行解释处理

当然,我们也可以通过设置mysql的日期格式来改变该行为,在此,我们通过编码来解析该时间格式:

  /**     * Thu Jun 27 08:00:00 CST 2019     */    public static Date parseBinlogString2Date(String dateString) {        try {            DateFormat dateFormat = new SimpleDateFormat(                    "EEE MMM dd HH:mm:ss zzz yyyy",                    Locale.US            );            return DateUtils.addHours(dateFormat.parse(dateString), -8);          } catch (ParseException ex) {            log.error("parseString2Date error:{}", dateString);            return null;        }    }

因为我们在定义索引的时候,是根据表之间的层级关系(Level)来设定的,根据代码规范,不允许出现Magic Number, 因此我们定义一个数据层级枚举,来表达数据层级。

/**   * AdDataLevel for 广告数据层级   *   * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>   */  @Getter  public enum AdDataLevel {        LEVEL2("2", "level 2"),      LEVEL3("3", "level 3"),      LEVEL4("4", "level 4");        private String level;      private String desc;        AdDataLevel(String level, String desc) {          this.level = level;          this.desc = desc;      }  }
实现数据投递

因为增量数据可以投递到不同的位置以及用途,我们之前实现了一个投递接口com.sxzhongf.ad.sender.ISender,接下来我们实现一个投递类:

@Slf4j  @Component("indexSender")  public class IndexSender implements ISender {        /**       * 根据广告级别,投递Binlog数据       */      @Override      public void sender(MysqlRowData rowData) {          if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {              Level2RowData(rowData);          } else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {              Level3RowData(rowData);          } else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {              Level4RowData(rowData);          } else {              log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));          }      }        private void Level2RowData(MysqlRowData rowData) {            if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {              List<AdPlanTable> planTables = new ArrayList<>();                for (Map<String, String> fieldValueMap : rowData.getFieldValueMap()) {                  AdPlanTable planTable = new AdPlanTable();                  //Map的第二种循环方式                  fieldValueMap.forEach((k, v) -> {                      switch (k) {                          case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:                              planTable.setPlanId(Long.valueOf(v));                              break;                          case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:                              planTable.setUserId(Long.valueOf(v));                              break;                          case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:                              planTable.setPlanStatus(Integer.valueOf(v));                              break;                          case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:                              planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));                              break;                          case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:                              planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));                              break;                      }                  });                  planTables.add(planTable);              }                //投递推广计划              planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));          } else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {              List<AdCreativeTable> creativeTables = new LinkedList<>();                rowData.getFieldValueMap().forEach(afterMap -> {                  AdCreativeTable creativeTable = new AdCreativeTable();                  afterMap.forEach((k, v) -> {                      switch (k) {                          case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:                              creativeTable.setAdId(Long.valueOf(v));                              break;                          case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:                              creativeTable.setType(Integer.valueOf(v));                              break;                          case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:                              creativeTable.setMaterialType(Integer.valueOf(v));                              break;                          case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:                              creativeTable.setHeight(Integer.valueOf(v));                              break;                          case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:                              creativeTable.setWidth(Integer.valueOf(v));                              break;                          case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:                              creativeTable.setAuditStatus(Integer.valueOf(v));                              break;                          case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:                              creativeTable.setAdUrl(v);                              break;                      }                  });                  creativeTables.add(creativeTable);              });                //投递广告创意              creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));          }      }        private void Level3RowData(MysqlRowData rowData) {         ...      }        /**       * 处理4级广告       */      private void Level4RowData(MysqlRowData rowData) {          ...      }  }
投放增量数据到MQ(kafka)

为了我们的数据投放更加灵活,方便数据统计,分析等系统的需求,我们来实现一个投放到消息中的接口,其他服务可以订阅当前MQ 的TOPIC来实现数据订阅。

配置文件中配置TOPIC  adconf:    kafka:      topic: ad-search-mysql-data    --------------------------------------  /**   * KafkaSender for 投递Binlog增量数据到kafka消息队列   *   * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>   * @since 2019/7/1   */  @Component(value = "kafkaSender")  public class KafkaSender implements ISender {        @Value("${adconf.kafka.topic}")      private String topic;        @Autowired      private KafkaTemplate kafkaTemplate;        /**       * 发送数据到kafka队列       */      @Override      public void sender(MysqlRowData rowData) {          kafkaTemplate.send(                  topic, JSON.toJSONString(rowData)          );      }        /**       * 测试消费kafka消息       */      @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")      public void processMysqlRowData(ConsumerRecord<?, ?> record) {          Optional<?> kafkaMsg = Optional.ofNullable(record.value());          if (kafkaMsg.isPresent()) {              Object message = kafkaMsg.get();              MysqlRowData rowData = JSON.parseObject(                      message.toString(),                      MysqlRowData.class              );              System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));              //sender.sender();          }        }  }