尋找5億次訪問中,訪問次數最多的人

  • 2020 年 2 月 10 日
  • 筆記

場景描述:這是一個Spark的實戰題目,也是在面試中經常出現的一類題目。

問題描述

對於一個大型網站,用戶訪問量嘗嘗高達數十億。對於數十億是一個什麼樣的概念,我們這裡可以簡單的計算一下。對於一個用戶,單次訪問,我們通常會記錄下哪些數據呢?

  • 1、用戶的id
  • 2、用戶訪問的時間
  • 3、用戶逗留的時間
  • 4、用戶執行的操作
  • 5、用戶的其餘數據(比如IP等等)

我們單單從用戶id來說,比如10011802330414,這個ID,那麼我們一個id差不多就是一個long類型,因為在大量數據存儲的時候,我們都是採用文本存儲。因此對於5億個用戶ID,完全存儲在磁碟當中,大概是5G的大小,對於這個大小,並不能算是大數據。但是對於一個案例來說,已經非常足夠了。

我們會產生一個5億條ID的數據集,我們上面說到,這個數據集大小為5G(不壓縮的情況下),因此我不會在GitHub上上傳這樣一個數據集,但是我們提供一個方法,來生成一個5億條數據。

當然要解決這個問題,你可以依然在local模式下運行項目,但是你得有足夠的磁碟空間和記憶體空間,大概8G磁碟空間(因為除了數據本身,spark運行過程還要產生一些臨時數據),5G記憶體(要進行reduceByKey)。為了真正展示spark的特性,我們這個案例,將會運行在spark集群上。

關於如何搭建集群,我準備在後續的章節補上。但是在網上有大量的集群搭建教程,其中不乏一些詳細優秀的教程。當然,這節我們不講如何搭建集群,但是我們仍然可以開始我們的案例。

問題分析

那麼現在我們擁有了一個5億條數據(實際上這個數據並不以文本存儲,而是在運行的時候生成),從五億條數據中,找出訪問次數最多的人,這看起來並不難。但實際上我們想要通過這個案例了解spark的真正優勢。

5億條ID數據,首先可以用map將其快取到RDD中,然後對RDD進行reduceByKey,最後找出出現最多的ID。思路很簡單,因此程式碼量也不會很多。

實現

scala實現

首先是ID生成方法:

RandomId.class

import org.apache.spark.{SparkConf, SparkContext}    object ActiveVisitor {        def main(args: Array[String]): Unit = {      val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")        val sc = new SparkContext(conf)        //生成一個0-9999的列表      val list = 1 until 10000        val id =new RandomId()        //這裡記錄最大的次數      var max = 0        //這裡記錄最大次數的ID      var maxId = 0L        val lastNum = sc.parallelize(list)        //第一步生成5億條數據        .flatMap(num => {        //遍歷list列表        //總共遍歷1萬次每次生成5萬個ID        var list2 = List(id.next())        for (i <- 1 to 50000){          list2 = id.next() :: list2        }        //這裡記錄當前生成ID的百分比        println(num/1000.0 +"%")          //返回生成完成後的list        //每次循環裡面都包含5萬個ID        list2      })        //遍歷5億條數據        //為每條數據出現標記1        .map((_,1))        //對標記後的數據進行處理        //得到每個ID出現的次數,即(ID,Count)        .reduceByKey(_+_)        //遍歷處理後的數據        .foreach(x => {        //將最大值存儲在max中        if (x._2 > max){          max = x._2          maxId = x._1          //若X比之前記錄的值大,則輸出該id和次數          //最後一次輸出結果,則是出現次數最多的的ID和以及其出現的次數          //當然出現次數最多的可能有多個ID          //這裡只輸出一個          println(x)        }      })    }    }

‍然後是用它生成5億條數據

import org.apache.spark.{SparkConf, SparkContext}    object ActiveVisitor {      def main(args: Array[String]): Unit = {      val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")        val sc = new SparkContext(conf)        val list = 1 until 100000        val id =new RandomId()        var max = 0        var maxId = 0L        val lastNum = sc.parallelize(list).flatMap(num => {        var list2 = List(id.next())        for (i <- 1 to 50000){          list2 = id.next() :: list2        }        println(num +"%")        list2      }).map((_,1)).reduceByKey(_+_).foreach(x => {        if (x._2 > max){          max = x._2          maxId = x._1          println(x)        }      })    }  }

‍處理5億條數據

import org.apache.spark.{SparkConf, SparkContext}    object ActiveVisitor {      def main(args: Array[String]): Unit = {      val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")        val sc = new SparkContext(conf)        //生成一個0-9999的列表      val list = 1 until 10000        val id =new RandomId()        //這裡記錄最大的次數      var max = 0        //這裡記錄最大次數的ID      var maxId = 0L        val lastNum = sc.parallelize(list)        //第一步生成5億條數據        .flatMap(num => {        //遍歷list列表        //總共遍歷1萬次每次生成5萬個ID        var list2 = List(id.next())        for (i <- 1 to 50000){          list2 = id.next() :: list2        }        //這裡記錄當前生成ID的百分比        println(num/1000.0 +"%")          //返回生成完成後的list        //每次循環裡面都包含5萬個ID        list2      })        //遍歷5億條數據        //為每條數據出現標記1        .map((_,1))        //對標記後的數據進行處理        //得到每個ID出現的次數,即(ID,Count)        .reduceByKey(_+_)        //遍歷處理後的數據        .foreach(x => {        //將最大值存儲在max中        if (x._2 > max){          max = x._2          maxId = x._1          //若X比之前記錄的值大,則輸出該id和次數          //最後一次輸出結果,則是出現次數最多的的ID和以及其出現的次數          //當然出現次數最多的可能有多個ID          //這裡只輸出一個          println(x)        }      })    }  }

運行得到結果

將其提交到spark上運行,觀察日誌

1%  5000%  2%  5001%  3%  5002%  4%  5003%  5%  5004%  6%  5005%  7%  5006%  8%  5007%  9%  5008%  10%  5009%  11%  5010%  12%  5011%  5012%  13%  5013%  14%  15%  5014%    ...  ...  ...  

這裡是輸出的部分日誌,從日誌中,我們顯然發現,程式是並行的。我採用的集群由四個節點組成,每個節點提供5G的記憶體空間,集群在不同節點中運行,有節點分配到的分區是從1開始,而有節點則是從5000開始,因此程式並沒有按照我們所想的從1%-9999%。好在未按照順序執行,也並不影響最終結果,畢竟最終要進行一個reduceByKey,才是我們真正需要得到結果的地方。

再看日誌另一部分:

5634%  5635%  5636%  5637%  5638%  5639%  5640%  5641%  5642%  5643%  5644%  5645%  2019-03-05 11:52:14 INFO  ExternalSorter:54 - Thread 63 spilling in-memory map of 1007.3 MB to disk (2 times so far)  647%  648%  649%  650%  651%  652%  653%  654%  655%  656%

‍注意到這裡,spilling in-memory map of 1007.3 MB to disk,spilling操作將map中的 1007.3 MB的數據溢寫到磁碟中。這是由於spark在處理的過程中,由於數據量過於龐大,因此將多的數據溢寫到磁碟,當再次用到時,會從磁碟讀取。對於實時性操作的程式來說,多次、大量讀寫磁碟是絕對不被允許的。但是在處理大數據中,溢寫到磁碟是非常常見的操作。

事實上,在完整的日誌中,我們可以看到有相當一部分日誌是在溢寫磁碟的時候生成的,大概49次(這是我操作過程中的總數)

如圖:

總共出現49條溢寫操作的日誌,每次大概是1G,這也印證了我們5億條數據,佔據空間5G的一個說法。事實上,我曾將這5億條數據存儲在磁碟中,的確其佔據的空間是5G左右。

結果

最終,我們可以在日誌中看到結果。

整個過程持續了將近47min,當然在龐大的集群中,時間能夠大大縮短,要知道,我們現在只採用了4個節點。

我們看到了次數2、4、6、8居然分別出現了兩次,這並不奇怪,因為集群並行運行,非同步操作,出現重複結果十分正常,當然我們也可以用並發機制,去處理這個現象。這個在後續的案例中,我們會繼續優化結果。

從結果上看,我們發現5億條數據中,出現最多的ID也僅僅出現了8次,這說明了在大量數據中,很多ID可能只出現了1次、2次。這也就是為什麼最後我採用的是foreach方法去尋找最大值,而不採用如下的方法

import org.apache.spark.{SparkConf, SparkContext}    object ActiveVisitor {        def main(args: Array[String]): Unit = {      val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")        val sc = new SparkContext(conf)        //生成一個0-9999的列表      val list = 1 until 10000        val id =new RandomId()        //這裡記錄最大的次數      var max = 0        //這裡記錄最大次數的ID      var maxId = 0L        val lastNum = sc.parallelize(list)        //第一步生成5億條數據        .flatMap(num => {        //遍歷list列表        //總共遍歷1萬次每次生成5萬個ID        var list2 = List(id.next())        for (i <- 1 to 50000){          list2 = id.next() :: list2        }        //這裡記錄當前生成ID的百分比        println(num/1000.0 +"%")          //返回生成完成後的list        //每次循環裡面都包含5萬個ID        list2      })        //遍歷5億條數據        //為每條數據出現標記1        .map((_,1))        //對標記後的數據進行處理        //得到每個ID出現的次數,即(ID,Count)        .reduceByKey(_+_)        //為數據進行排序        //倒序        .sortByKey(false)        //次數最多的,在第一個,將其輸出      println(lastNum.first())    }  }

這個方法中,我們對reduceByKey結果進行排序,輸出排序結果的第一個,即次數最大的ID。這樣做似乎更符合我們的要求。但是實際上,為了得到同樣的結果,這樣做,會消耗更多的資源。如我們所說,很多ID啟其實只出現了一次,兩次,排序的過程中,仍然要對其進行排序。要知道,由於很多ID只出現一次,排序的數據集大小很有可能是數億的條目。

根據我們對排序演算法的了解,這樣一個龐大數據集進行排序,勢必要耗費大量資源。因此,我們能夠容忍輸出一些冗餘資訊,但不影響我們的得到正確結果。

至此,我們完成了5億數據中,找出最多出現次數的數據。如果感興趣,可以嘗試用這個方法解決50億條數據,出現最多的數據條目。但是這樣做的話,你得準備好50G的空間。儘管用上述的程式,屬於閱後即焚,但是50億數據仍然會耗費大量的時間。

作者:詩昭 鏈接:https://juejin.im/post/5c7e73115188251b89373146