基於Kafka+Flink+Redis的電商大屏實時計算案例

  • 2019 年 12 月 23 日
  • 筆記

前言

阿里的雙11銷量大屏可以說是一道特殊的風景線。實時大屏(real-time dashboard)正在被越來越多的企業採用,用來及時呈現關鍵的數據指標。並且在實際操作中,肯定也不會僅僅計算一兩個維度。由於Flink的「真·流式計算」這一特點,它比Spark Streaming要更適合大屏應用。本文從筆者的實際工作經驗抽象出簡單的模型,並簡要敘述計算流程(當然大部分都是源碼)。

數據格式與接入

簡化的子訂單消息體如下。

{      "userId": 234567,      "orderId": 2902306918400,      "subOrderId": 2902306918401,      "siteId": 10219,      "siteName": "site_blabla",      "cityId": 101,      "cityName": "北京市",      "warehouseId": 636,      "merchandiseId": 187699,      "price": 299,      "quantity": 2,      "orderStatus": 1,      "isNewOrder": 0,      "timestamp": 1572963672217  }

由於訂單可能會包含多種商品,故會被拆分成子訂單來表示,每條JSON消息表示一個子訂單。現在要按照自然日來統計以下指標,並以1秒的刷新頻率呈現在大屏上:

  • 每個站點(站點ID即siteId)的總訂單數、子訂單數、銷量與GMV;
  • 當前銷量排名前N的商品(商品ID即merchandiseId)與它們的銷量。

由於大屏的最大訴求是實時性,等待遲到數據顯然不太現實,因此我們採用處理時間作為時間特徵,並以1分鐘的頻率做checkpointing。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);  env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);  env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);

然後訂閱Kafka的訂單消息作為數據源。

    Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");      DataStream<String> sourceStream = env        .addSource(new FlinkKafkaConsumer011<>(          ORDER_EXT_TOPIC_NAME,                        // topic          new SimpleStringSchema(),                    // deserializer          consumerProps                                // consumer properties        ))        .setParallelism(PARTITION_COUNT)        .name("source_kafka_" + ORDER_EXT_TOPIC_NAME)        .uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);

給帶狀態的運算元設定運算元ID(通過調用uid()方法)是個好習慣,能夠保證Flink應用從保存點重啟時能夠正確恢復狀態現場。為了盡量穩妥,Flink官方也建議為每個運算元都顯式地設定ID,參考:https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job

接下來將JSON數據轉化為POJO,JSON框架採用FastJSON。

    DataStream<SubOrderDetail> orderStream = sourceStream        .map(message -> JSON.parseObject(message, SubOrderDetail.class))        .name("map_sub_order_detail").uid("map_sub_order_detail");

JSON已經是預先處理好的標準化格式,所以POJO類SubOrderDetail的寫法可以通過Lombok極大地簡化。如果JSON的欄位有不規範的,那麼就需要手寫Getter和Setter,並用@JSONField註解來指明。

@Getter  @Setter  @NoArgsConstructor  @AllArgsConstructor  @ToString  public class SubOrderDetail implements Serializable {    private static final long serialVersionUID = 1L;      private long userId;    private long orderId;    private long subOrderId;    private long siteId;    private String siteName;    private long cityId;    private String cityName;    private long warehouseId;    private long merchandiseId;    private long price;    private long quantity;    private int orderStatus;    private int isNewOrder;    private long timestamp;  }

統計站點指標

將子訂單流按站點ID分組,開1天的滾動窗口,並同時設定ContinuousProcessingTimeTrigger觸發器,以1秒周期觸發計算。注意處理時間的時區問題,這是老生常談了。

    WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream        .keyBy("siteId")        .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));

接下來寫個聚合函數。

    DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream        .aggregate(new OrderAndGmvAggregateFunc())        .name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");
  public static final class OrderAndGmvAggregateFunc      implements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {      private static final long serialVersionUID = 1L;        @Override      public OrderAccumulator createAccumulator() {        return new OrderAccumulator();      }        @Override      public OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {        if (acc.getSiteId() == 0) {          acc.setSiteId(record.getSiteId());          acc.setSiteName(record.getSiteName());        }        acc.addOrderId(record.getOrderId());        acc.addSubOrderSum(1);        acc.addQuantitySum(record.getQuantity());        acc.addGmv(record.getPrice() * record.getQuantity());        return acc;      }        @Override      public OrderAccumulator getResult(OrderAccumulator acc) {        return acc;      }        @Override      public OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {        if (acc1.getSiteId() == 0) {          acc1.setSiteId(acc2.getSiteId());          acc1.setSiteName(acc2.getSiteName());        }        acc1.addOrderIds(acc2.getOrderIds());        acc1.addSubOrderSum(acc2.getSubOrderSum());        acc1.addQuantitySum(acc2.getQuantitySum());        acc1.addGmv(acc2.getGmv());        return acc1;      }    }

累加器類OrderAccumulator的實現很簡單,看源碼就大概知道它的結構了,因此不再多廢話。唯一需要注意的是訂單ID可能重複,所以需要用名為orderIds的HashSet來保存它。HashSet應付我們目前的數據規模還是沒太大問題的,如果是海量數據,就考慮換用HyperLogLog吧。

接下來就該輸出到Redis供呈現端查詢了。這裡有個問題:一秒內有數據變化的站點並不多,而ContinuousProcessingTimeTrigger每次觸發都會輸出窗口裡全部的聚合數據,這樣做了很多無用功,並且還會增大Redis的壓力。所以,我們在聚合結果後再接一個ProcessFunction,程式碼如下。

    DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream        .keyBy(0)        .process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {}))        .name("process_site_gmv_changed").uid("process_site_gmv_changed");
  public static final class OutputOrderGmvProcessFunc      extends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {      private static final long serialVersionUID = 1L;        private MapState<Long, OrderAccumulator> state;        @Override      public void open(Configuration parameters) throws Exception {        super.open(parameters);        state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>(          "state_site_order_gmv",          Long.class,          OrderAccumulator.class)        );      }        @Override      public void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {        long key = value.getSiteId();        OrderAccumulator cachedValue = state.get(key);          if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {          JSONObject result = new JSONObject();          result.put("site_id", value.getSiteId());          result.put("site_name", value.getSiteName());          result.put("quantity", value.getQuantitySum());          result.put("orderCount", value.getOrderIds().size());          result.put("subOrderCount", value.getSubOrderSum());          result.put("gmv", value.getGmv());          out.collect(new Tuple2<>(key, result.toJSONString());          state.put(key, value);        }      }        @Override      public void close() throws Exception {        state.clear();        super.close();      }    }

說來也簡單,就是用一個MapState狀態快取當前所有站點的聚合數據。由於數據源是以子訂單為單位的,因此如果站點ID在MapState中沒有快取,或者快取的子訂單數與當前子訂單數不一致,表示結果有更新,這樣的數據才允許輸出。

最後就可以安心地接上Redis Sink了,結果會被存進一個Hash結構里。

    // 看官請自己構造合適的FlinkJedisPoolConfig      FlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);      siteResultStream        .addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper()))        .name("sink_redis_site_gmv").uid("sink_redis_site_gmv")        .setParallelism(1);
  public static final class GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> {      private static final long serialVersionUID = 1L;      private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";        @Override      public RedisCommandDescription getCommandDescription() {        return new RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);      }        @Override      public String getKeyFromData(Tuple2<Long, String> data) {        return String.valueOf(data.f0);      }        @Override      public String getValueFromData(Tuple2<Long, String> data) {        return data.f1;      }        @Override      public Optional<String> getAdditionalKey(Tuple2<Long, String> data) {        return Optional.of(          HASH_NAME_PREFIX +          new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) +          "SITES"        );      }    }

商品Top N

我們可以直接復用前面產生的orderStream,玩法與上面的GMV統計大同小異。這裡用1秒滾動窗口就可以了。

    WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream        .keyBy("merchandiseId")        .window(TumblingProcessingTimeWindows.of(Time.seconds(1)));        DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream        .aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc())        .name("aggregate_merch_sales").uid("aggregate_merch_sales")        .returns(TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }));

聚合函數與窗口函數的實現更加簡單了,最終返回的是商品ID與商品銷量的二元組。

  public static final class MerchandiseSalesAggregateFunc      implements AggregateFunction<SubOrderDetail, Long, Long> {      private static final long serialVersionUID = 1L;        @Override      public Long createAccumulator() {        return 0L;      }        @Override      public Long add(SubOrderDetail value, Long acc) {        return acc + value.getQuantity();      }        @Override      public Long getResult(Long acc) {        return acc;      }        @Override      public Long merge(Long acc1, Long acc2) {        return acc1 + acc2;      }    }        public static final class MerchandiseSalesWindowFunc      implements WindowFunction<Long, Tuple2<Long, Long>, Tuple, TimeWindow> {      private static final long serialVersionUID = 1L;        @Override      public void apply(        Tuple key,        TimeWindow window,        Iterable<Long> accs,        Collector<Tuple2<Long, Long>> out) throws Exception {        long merchId = ((Tuple1<Long>) key).f0;        long acc = accs.iterator().next();        out.collect(new Tuple2<>(merchId, acc));      }    }

既然數據最終都要落到Redis,那麼我們完全沒必要在Flink端做Top N的統計,直接利用Redis的有序集合(zset)就行了,商品ID作為field,銷量作為分數值,簡單方便。不過flink-redis-connector項目中默認沒有提供ZINCRBY命令的實現(必須再吐槽一次),我們可以自己加,步驟參照之前寫過的那篇加SETEX的命令的文章,不再贅述。RedisMapper的寫法如下。

  public static final class RankingRedisMapper implements RedisMapper<Tuple2<Long, Long>> {      private static final long serialVersionUID = 1L;      private static final String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";        @Override      public RedisCommandDescription getCommandDescription() {        return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);      }        @Override      public String getKeyFromData(Tuple2<Long, Long> data) {        return String.valueOf(data.f0);      }        @Override      public String getValueFromData(Tuple2<Long, Long> data) {        return String.valueOf(data.f1);      }        @Override      public Optional<String> getAdditionalKey(Tuple2<Long, Long> data) {        return Optional.of(          ZSET_NAME_PREFIX +          new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" +          "MERCHANDISE"        );      }    }

後端取數時,用ZREVRANGE命令即可取出指定排名的數據了。只要數據規模不是大到難以接受,並且有現成的Redis,這個方案完全可以作為各類Top N需求的通用實現。

The End

大屏的實際呈現需要保密,截圖自然是沒有的。以下是提交執行時Flink Web UI給出的執行計劃(實際有更多的統計任務,不止3個Sink)。通過復用源數據,可以在同一個Flink job內實現更多統計需求。

1、《從0到1學習Flink》—— Apache Flink 介紹 2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程式入門 3、《從0到1學習Flink》—— Flink 配置文件詳解 4、《從0到1學習Flink》—— Data Source 介紹 5、《從0到1學習Flink》—— 如何自定義 Data Source ? 6、《從0到1學習Flink》—— Data Sink 介紹 7、《從0到1學習Flink》—— 如何自定義 Data Sink ? 8、《從0到1學習Flink》—— Flink Data transformation(轉換) 9、《從0到1學習Flink》—— 介紹 Flink 中的 Stream Windows 10、《從0到1學習Flink》—— Flink 中的幾種 Time 詳解 11、《從0到1學習Flink》—— Flink 讀取 Kafka 數據寫入到 ElasticSearch 12、《從0到1學習Flink》—— Flink 項目如何運行? 13、《從0到1學習Flink》—— Flink 讀取 Kafka 數據寫入到 Kafka 14、《從0到1學習Flink》—— Flink JobManager 高可用性配置 15、《從0到1學習Flink》—— Flink parallelism 和 Slot 介紹 16、《從0到1學習Flink》—— Flink 讀取 Kafka 數據批量寫入到 MySQL 17、《從0到1學習Flink》—— Flink 讀取 Kafka 數據寫入到 RabbitMQ 18、《從0到1學習Flink》—— 你上傳的 jar 包藏到哪裡去了 19、大數據「重磅炸彈」——實時計算框架 Flink 20、《Flink 源碼解析》—— 源碼編譯運行 21、為什麼說流處理即未來? 22、OPPO數據中台之基石:基於Flink SQL構建實時數據倉庫 23、流計算框架 Flink 與 Storm 的性能對比 24、Flink狀態管理和容錯機制介紹 25、原理解析 | Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理 26、Apache Flink 是如何管理好記憶體的? 27、從0到1學習Flink》——Flink 中這樣管理配置,你知道? 28、從0到1學習Flink》——Flink 不可以連續 Split(分流)? 29、Flink 從0到1學習—— 分享四本 Flink 的書和二十多篇 Paper 論文 30、360深度實踐:Flink與Storm協議級對比 31、Apache Flink 1.9 重大特性提前解讀 32、如何基於Flink+TensorFlow打造實時智慧異常檢測平台?只看這一篇就夠了 33、美團點評基於 Flink 的實時數倉建設實踐 34、Flink 靈魂兩百問,這誰頂得住? 35、一文搞懂 Flink 的 Exactly Once 和 At Least Once 36、你公司到底需不需要引入實時計算引擎? 37、Flink 從0到1學習 —— 如何使用 Side Output 來分流? 38、一文讓你徹底了解大數據實時計算引擎 Flink 39、基於 Flink 實現的商品實時推薦系統(附源碼) 40、如何使用 Flink 每天實時處理百億條日誌? 41、Flink 在趣頭條的應用與實踐 42、Flink Connector 深度解析 43、滴滴實時計算髮展之路及平台架構實踐 44、Flink Back Pressure(背壓)是怎麼實現的?有什麼絕妙之處? 45、Flink 實戰 | 貝殼找房基於Flink的實時平台建設 46、如何使用 Kubernetes 部署 Flink 應用 47、一文徹底搞懂 Flink 網路流控與反壓機制 48、Flink中資源管理機制解讀與展望 49、Flink 實時寫入數據到 ElasticSearch 性能調優 50深入理解 Flink 容錯機制 51吐血之作 | 流系統Spark/Flink/Kafka/DataFlow端到端一致性實現對比