Kafka與Spark案例實踐

1.概述

Kafka系統的靈活多變,讓它擁有豐富的拓展性,可以與第三方套件很方便的對接。例如,實時計算引擎Spark。接下來通過一個完整案例,運用Kafka和Spark來合理完成。

2.內容

2.1 初始Spark

在大數據應用場景中,面對實時計算、處理流數據、降低計算耗時等問題時,Apache Spark提供的計算引擎能很好的滿足這些需求。
Spark是一種基於內存的分佈式計算引擎,其核心為彈性分佈式數據集(Resilient Distributed Datasets簡稱,RDD),它支持多種數據來源,擁有容錯機制,數據集可以被緩存,並且支持並行操作,能夠很好的地用於數據挖掘和機器學習。
Spark是專門為海量數據處理而設計的快速且通用的計算引擎,支持多種編程語言(如Java、Scala、Python等),並且擁有更快的計算速度。

提示:
據Spark官方數據統計,通過利用內存進行數據計算,Spark的計算速度比Hadoop中的MapReduce的計算速度快100倍左右。

另外,Spark提供了大量的庫,其中包含Spark SQL、Spark Streaming、MLlib、GraphX等。在項目開發的過程當中,可以在同一個應用程序中輕鬆地組合使用這些類庫,如下圖所示:

 

 

 

 2.2 Spark SQL

Spark SQL是Spark處理結構化數據的一個模塊,。與Spark的RDD應用接口不同,Spark SQL提供的接口更加偏向於處理結構化的數據。在使用相同的執行引擎時,不同的應用接口或者編程語言在做計算時都是相互獨立的,。這意味着,用戶在使用時,可以很方便的地在不同的應用接口或編程語言之間進行切換。
Spark SQL很重要的一個優勢就是,可以通過SQL語句來實現業務功能,。Spark SQL可以讀取不同的存儲介質,例如Kafka、Hive、HDFS等。
在使用編程語言執行一個Spark SQL語句時,執行後的結果會返回一個數據集,用戶可以通過使用命令行、JDBC、ODBC的方式與Spark SQL進行數據交互。

提示:
JDBC是一個面向對象的應用程序接口,通過它可以訪問各類關係型數據庫。
ODBC是微軟公司開放服務結構中有關數據庫的一個組成部分,它制定並提供了一套訪問數據庫的應用接口。

2.3 Spark Streaming

Spark Streaming是Spark核心應用接口的一種擴展,它可以用於進行大規模數據處理、高吞吐量處理、容錯處理等場景。同時,Spark Streaming支持從不同的數據源中讀取數據,並且能夠使用聚合函數、窗口函數等這類複雜算法來處理數據。
處理後的數據結果可以保存到本地文件系統(如文本)、分佈式文件系統(如HDFS)、關係型數據庫(如MySQL)、非關係型數據庫(如HBase)等存儲介質中。

2.4 MLlib

MLlib是Spark的機器學習(Machine Learning)類庫,目的在於簡化機器學習的可操作性和易擴展性。
MLlib由一些通用的學習算法和工具組成,其內容包含分類、回歸、聚類、協同過濾等。

2.5 GraphX

GraphX是構建在Spark之上的圖計算框架,它使用RDD來存儲圖數據,並提供了實用的圖操作方法。
由於RDD的特性,GraphX高效的地實現了圖的分佈式存儲和處理,可以應用於社交網絡這類大規模的圖計算場景。

3.操作Spark命令

在$SPARK_HOME/bin目錄中,提供了一系列的腳本,例如spark-shell、spark-submit等。

進入到Hadoop集群,準備好數據源並將數據源上傳Hadoop分佈式文件系統(HDFS)中。然後使用Spark Shell的方式讀取HDFS上的數據,並統計單詞出現的頻率,具體操作步驟如下。

1.準備數據源

(1)在本地創建一個文本文件,並在該文本文件中添加待統計的數據,具體操作命令如下。

# 新建文本文件
[hadoop@dn1 tmp]$ vi wordcount.txt

(2)然後,在wordcount.txt文件中添加待統計的單詞,內容如下。

kafka spark
hadoop spark
kafka hadoop
kafka hbase

2.上傳數據源到HDFS

(1)將本地準備好的wordcount.txt文件上傳到HDFS中,具體操作命令如下。

# 在HDFS上創建一個目錄
[hadoop@dn1 tmp]$ hdfs dfs -mkdir -p /data/spark
# 上傳wordcount.txt到HDFS指定目錄
[hadoop@dn1 tmp]$ hdfs dfs -put wordcount.txt /data/spark

(2)然後,執行HDFS查看命令,驗證本地文件是否上傳成功,具體操作命令如下。

# 查看上傳的文件是否成功
[hadoop@dn1 tmp]$ hdfs dfs -cat /data/spark/wordcount.txt

若查看命令執行成功,輸出結果如圖所示:

 

 

 3.使用Spark Shell統計單詞出現頻

(1)進入到$SPARK_HOME/bin目錄,然後運行./spark-shell腳本進入到Spark Shell控制台。

提示:
如果直接執行該腳本,則表示以本地模式單線程方式啟動。
如果執行./spark-shell local[n]命令,則表示以多線程方式啟動,其中變量n代表線程數。

(2)通過本地模式運行,等待Spark加載配置文件,加載完成後,輸出結果

(3)統計單詞出現的頻率,具體實現如下:

val wc = sc.textFile("hdfs://nna:9000/data/spark/wordcount.txt")
val stats=wc.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
stats.collect()

提示:
第一行代碼表示,讀取HDFS上待統計單詞的原始數據;
第二行代碼表示,實現統計單詞出現頻率的具體業務邏輯;
第三行代碼表示,從彈性分佈式數據集(RDD)中獲取數據,並以數組的形式展示統計結果。

(4)執行上述代碼後,Spark Shell控制台輸出結果如圖所示:

 

 

 4.案例實踐

Kafka是一種實時消息隊列技術,通過Kafka中間件,可以構建實時消息處理平台來滿足企業的實時類需求。
本案例以Kafka為核心中間件,以Spark作為實時計算引擎,來完成對遊戲明細數據的實時統計。
以本項目為例,需要實時描繪當天遊戲用戶的行為軌跡,例如用戶訂單、用戶分佈、新增用戶等指標數據。針對這類需求,可以將遊戲用戶實時產生的業務數據上報到Kafka消息隊列系統進行存儲,然後通過Spark流計算的方式來統計應用指標。最後,將統計後的業務結果形成報表或者趨勢圖進行展示,為製作數據方案者提供數據支持。

4.1 背景和價值

1. 背景

在實時應用場景中,與離線統計任務有所不同。它對時延的要求比較高,需要縮短業務數據計算的時間。對於離線任務來說,通常是計算前一天或者更早的業務數據。
現實業務場景中,很多業務場景需要實時查看統計結果。流計算能夠很好的彌補這一不足之處,對於當天變化的流數據可以通過流計算(比如Flink、Spark Streaming、Storm等)後,及時呈現報表數據或趨勢圖。

2. 價值

這樣一個實時計算項目能夠實時掌握遊戲用戶的行為軌跡、活躍度。具體涉及的內容如下:

  1. 通過對遊戲用戶實時產生的業務數據進行實時統計,可以分析出遊戲用戶在各個業務模塊下的活躍度、停留時間等。將這些結果形成報表或者趨勢圖,讓以便能夠實時地準確的掌握遊戲用戶的行為軌跡;
  2. 按小時維度將當天的實時業務數據進行統計,那麼可以知道遊戲用戶在哪個時間段具有最高的訪問量。利用這些數據可以針對這個時間段做一些推廣活動,例如道具「秒殺」活動、打折優惠等,從而刺激遊戲用戶去充值消費。
  3. 將實時計算產生的結果,去發揮它應有的價值。在高峰時間段推廣一些優惠活動後,通過實時統計的數據結果分析活動的效果,例如促銷的「秒殺」活動、道具打折等這些活動是否受到遊戲用戶的喜愛。針對這些反饋效果,可以做出快速合理的反應。

4.2 實現流程

架構體系可以分為數據源、數據採集、數據存儲、流計算、結果持久化、服務接口、數據可視化等,實現流程圖如圖所示:

 

 

1. 數據源

遊戲用戶通過移動設備或者瀏覽器操作遊戲產生的記錄,會實時上報到日誌服務器進行存儲,數據格式會封裝成JSON對象進行上報,便於後續消費解析。

2. 數據採集

在日誌服務器中部署Flume Agent來實時監控上報的業務日誌數據,。當業務日誌數據有更新(可通過文件MD5值、文件日期等來判斷文件的變動)時,由Flume Agent啟動採集任務,通過Flume Sink組件配置Kafka集群連接地址進行數據傳輸。

3. 數據存儲

利用Kafka的消息隊列特性來存儲消息記錄。將接收的數據按照業務進行區分,以不同的Topic來存儲各種類型的業務數據。

4. 流計算

Spark擁有實時計算的能力,使用Spark Streaming將Spark和Kafka關聯起來。
通過消費Kafka集群中指定的Topic來獲取業務數據,並將獲取的業務數據利用Spark集群來做實時計算。

5. 結果持久化

通過Spark計算引擎,將統計後的結果存儲到數據庫,方便可視化系統查詢展示。
選用Redis和MySQL來作為持久化的存儲介質,在Spark代碼邏輯中使用對應的編程接口(如Java Redis API或Java MySQL API)將計算後的結果存儲到數據庫。

6. 數據接口

數據庫中存儲的統計結果需要對外共享,可以通過統一的接口服務對外提供訪問。
可以選擇Thrift框架來實現數據接口,編寫RPC服務供外界訪問。

提示:
Apache Thrift是一個軟件框架,用來進行可擴展且跨編程語言服務的開發工作。
Apache Thrift結合了功能強大的軟件堆棧和代碼生成引擎,可以與Java、Go、Python、Ruby等編程語言進行無縫連接。

7. 可視化

從RPC服務中獲取數據庫中存儲的統計結果。然後,在瀏覽器中將這些結果進行渲染,以報表和趨勢圖表的形式進行呈現。

5.核心邏輯實現

通過讀取Kafka系統Topic中的流數據,對平台號進行分組統計。每隔10秒鐘,將相同平台號下用戶金額進行累加計算,並將統計後的結果寫入到MySQL數據庫。

5.1 MySQL工具類實現

/**
 * 實現一個MySQL工具類.
 * 
 * @author smartloli.
 *
 *         Created by Jul 15, 2022
 */
public class MySQLPool {
    private static LinkedList<Connection> queues;         // 聲明一個連接隊列

    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");     // 加載MySQL驅動
        } catch (ClassNotFoundException e) {
            e.printStackTrace();                        // 打印異常信息
        }
    }

    /** 初始化MySQL連接對象. */
    public synchronized static Connection getConnection() {
        try {
            if (queues == null) {                        // 判斷連接隊列是否為空
                queues = new LinkedList<Connection>();    // 實例化連接隊列
                for (int i = 0; i < 5; i++) {
                    Connection conn = DriverManager
                         .getConnection("jdbc:mysql://nna:3306/game", "root", "123456");
                    queues.push(conn);                    // 初始化連接隊列
                }
            }
        } catch (Exception e) {
            e.printStackTrace();                        // 打印異常信息
        }
        return queues.poll();                            // 返回最新的連接對象

    }

    /** 釋放MySQL連接對象到連接隊列. */
    public static void release(Connection conn) {
        queues.push(conn);                                // 將連接對象放回到連接隊列
    }
}

5.2 Spark邏輯實現

實現按平台號分組統計用戶金額,具體實現見代碼:

/**
 * 使用Spark引擎來統計用戶訂單主題中的金額.
 * 
 * @author smartloli.
 *
 *         Created by Jul 14, 2022
 */
public class UserOrderStats {

    public static void main(String[] args) throws Exception {

        // 設置數據源輸入參數
        if (args.length < 1) { 
            System.err.println("Usage: GroupId <file>");            // 打印提示信息
            System.exit(1);                                            // 退出進程
        }

        String bootStrapServers = "dn1:9092,dn2:9092,dn3:9092";        // 指定Kafka連接地址
        String topic = "user_order_stream";                            // 指定Kafka主題名
        String groupId = args[0];                                    // 動態獲取消費者組名
        SparkConf sparkConf = new SparkConf()
                     .setMaster("yarn-client")
                 .setAppName("UserOrder");                        // 實例化Spark配置對象
        // 實例化一個SparkContext對象, 用來打印日誌信息到控制台, 便於調試
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        sc.setLogLevel("WARN");

        // 創建一個流對象, 設置窗口時間為10秒
        JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
        JavaInputDStream<ConsumerRecord<Object, Object>> streams =
         KafkaUtils.createDirectStream(jssc,
             LocationStrategies.PreferConsistent(),
             ConsumerStrategies.Subscribe(Arrays.asList(topic),
             configure(groupId, bootStrapServers)));                    // 獲取流數據集

        // 將Kafka主題(user_order_stream)中的消息轉化成鍵值對(key/value)形式
        JavaPairDStream<Integer, Long> moneys =
             streams.mapToPair(new PairFunction<ConsumerRecord<Object, Object>,
             Integer, Long>() {
            /** 序列號ID. */
            private static final long serialVersionUID = 1L;

            /** 執行回調函數來處理業務邏輯. */
            @Override
            public Tuple2<Integer, Long> call(ConsumerRecord<Object, Object> t)
                 throws Exception {
                JSONObject object = JSON.parseObject(t.value().toString());
                return new Tuple2<Integer, Long>(object.getInteger("plat"),
                 object.getLong("money"));
            }
        }).reduceByKey(new Function2<Long, Long, Long>() {
            /** 序列號ID. */
            private static final long serialVersionUID = 1L;

            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;                 // 通過平台號(plat)進行分組聚合
            }
        });

        // 將統計結果存儲到MySQL數據庫
        moneys.foreachRDD(rdd -> {
            Connection connection = MySQLPool.getConnection();    // 實例化MySQL連接對象
            Statement stmt = connection.createStatement();        // 創建一個操作MySQL的實例
            rdd.collect().forEach(line -> {
                int plat = line._1.intValue();                    // 獲取平台號
                long total = line._2.longValue();                // 獲取用戶總金額
                // 將寫入到MySQL的數據,封裝成SQL語句
                String sql = String.format("insert into `user_order` (`plat`, `total`)
                 values (%s, %s)", plat, total);
                try {
                    // 調用MySQL工具類, 將統計結果組裝成SQL語句寫入到MySQL數據庫
                    stmt.executeUpdate(sql); 
                } catch (SQLException e) {
                    e.printStackTrace();    // 打印異常信息
                }
            });

            MySQLPool.release(connection);     // 是否MySQL連接對象到連接隊列
        });

        jssc.start();                        // 開始計算
        try {
            jssc.awaitTermination();         // 等待計算結束
        } catch (Exception ex) {
            ex.printStackTrace();            // 打印異常信息
        } finally {
            jssc.close();                    // 發生異常, 關閉流操作對象
        }
    }

    /** 初始化Kafka集群信息. */
    private static Map<String, Object> configure(String group, String brokers) {
        Map<String, Object> props = new HashMap<>();    // 實例化一個配置對象
        props.put("bootstrap.servers", brokers);        // 指定Kafka集群地址
        props.put("group.id", group);                    // 指定消費者組
        props.put("enable.auto.commit", "true");        // 開啟自動提交
        props.put("auto.commit.interval.ms", "1000");    // 自動提交的時間間隔
        // 反序列化消息主鍵
        props.put("key.deserializer",
             "org.apache.kafka.common.serialization.StringDeserializer");
        // 反序列化消費記錄
        props.put("value.deserializer",
             "org.apache.kafka.common.serialization.StringDeserializer");
        return props;                                    // 返回配置對象
    }

}

5.3 執行提交

將打包好的應用程序上傳到Spark集群的其中一個節點,然後通過spark-submit腳本來調度應用程序,具體操作命令如下。

# 執行應用程序
[hadoop@dn1 bin]$ ./spark-submit --master yarn-client --class org.smartloli.kafka.game.x.book_11.jubas.UserOrderStats --executor-memory 512MB --total-executor-cores 2 /data/soft/new/UserOrder.jar ke6

5.4 結果預覽

 

 6.結束語

這篇博客就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。