【譯】Using .NET for Apache Spark to Analyze Log Data
.NET for Spark可用於處理成批數據、實時流、機器學習和ad-hoc查詢。在這篇部落格文章中,我們將探討如何使用.NET for Spark執行一個非常流行的大數據任務,即日誌分析。
1 什麼是日誌分析?
日誌分析的目標是從這些日誌中獲得有關工具或服務的活動和性能的有意義的見解。NET for Spark使我們能夠快速高效地分析從兆位元組到千兆位元組的日誌數據!
在這篇文章中,我們將分析一組Apache日誌條目,這些條目表示用戶如何與web伺服器上的內容交互。您可以在這裡查看Apache日誌條目的示例。
2 編寫一個應用
日誌分析是Spark批量處理的一個例子。批處理是靜態數據的轉換,意味著源數據已經載入到數據存儲中。在我們的例子中,輸入文本文件已經填充了日誌,並且在處理時不會接收新的或更新的日誌。
在為Spark應用程式創建新的.NET時,我們只需要遵循以下幾個步驟,就可以開始從我們的數據中獲得這些有趣的見解:
-
-
- 創建Spark會話
- 讀取輸入數據,通常使用DataFrame
- 操作和分析輸入數據,通常使用Spark SQL
-
2.1 創建Spark會話
在任何Spark應用程式中,我們首先建立一個新的SparkSession,它是使用Spark編程的入口點:
SparkSession spark = SparkSession .Builder() .AppName("Apache User Log Processing") .GetOrCreate();
2.2 讀取輸入數據
我們將輸入數據存儲在DataFrame中,DataFrame是數據的分散式集合,它把數據組織為命名列的集合:
DataFrame generalDf = spark.Read().Text("<path to input data set>");
2.3 操縱和分析輸入的數據
重要的第一步是數據準備。數據準備包括以某種方式清理我們的數據。這可能包括刪除不完整的條目以避免以後計算中出現錯誤,或者刪除不相關的輸入以提高性能。
在我們的示例中,我們應該首先確保所有條目都是完整的日誌。我們可以通過將每個日誌條目與一個正則表達式進行匹配來實現這一點。
string s_apacheRx = "^(\S+) (\S+) (\S+) [([\w:/]+\s[+-]\d{4})] \"(\S+) (\S+) (\S+)\" (\d{3}) (\d+)";
我們如何對DataFrame的每一行執行計算,比如將每個日誌條目與上面的s_apacheRx進行匹配?答案是Spark SQL。
2.4 Spark SQL
Spark SQL為處理DataFrame中存儲的結構化數據提供了許多很棒的函數。Spark SQL最流行的特性之一是UDF(用戶自定義函數)。我們定義它們接受的輸入類型和產生的輸出類型,然後定義它們如何執行計算或篩選。
讓我們定義一個新的UDF GeneralReg,將每個日誌條目與s_apacheRx 進行匹配。我們的UDF需要一個Apache日誌條目,它是一個字元串,並根據日誌是否與s_apacheRx匹配返回true或false:
spark.Udf().Register<string, bool>("GeneralReg", log => Regex.IsMatch(log, s_apacheRx));
除了UDF之外,Spark SQL還提供了編寫SQL調用來分析我們的數據的能力,通常編寫一個SQL調用來將UDF應用於每一行數據:
DataFrame generalDf = spark.Sql("SELECT logs.value, GeneralReg(logs.value) FROM Logs");
這個SQL調用測試generalDf的每一行,以確定它是否是一個有效且完整的日誌。
我們可以使用.Filter()只在數據中保留完整的日誌條目,然後使用.Show()顯示新篩選的DataFrame:
generalDf = generalDf.Filter(generalDf["GeneralReg(value)"]); generalDf.Show();
現在我們已經完成了一些初始數據準備,我們可以繼續過濾和分析我們的數據。讓我們從以10開頭的IP地址中查找與垃圾郵件相關的日誌條目:
// Choose valid log entries that start with 10 spark.Udf().Register<string, bool>( "IPReg", log => Regex.IsMatch(log, "^(?=10)")); generalDf.CreateOrReplaceTempView("IPLogs"); // Apply UDF to get valid log entries starting with 10 DataFrame ipDf = spark.Sql( "SELECT iplogs.value FROM IPLogs WHERE IPReg(iplogs.value)"); ipDf.Show(); // Choose valid log entries that start with 10 and deal with spam spark.Udf().Register<string, bool>( "SpamRegEx", log => Regex.IsMatch(log, "\\b(?=spam)\\b")); ipDf.CreateOrReplaceTempView("SpamLogs"); // Apply UDF to get valid, start with 10, spam entries DataFrame spamDF = spark.Sql( "SELECT spamlogs.value FROM SpamLogs WHERE SpamRegEx(spamlogs.value)");
最後,讓我們計算最後清理的數據集中GET請求的數量。.NET for Spark的神奇之處在於,我們可以將其與其他流行的.NET功能結合起來編寫我們的應用程式。我們將使用LINQ分析Spark應用程式中的數據:
int numGetRequests = spamDF .Collect() .Where(r => ContainsGet(r.GetAs<string>("value"))) .Count();
在上面的程式碼中,ContainsGet()使用regex匹配檢查GET請求:
// Use regex matching to group data // Each group matches a column in our log schema // i.e. first group = first column = IP public static bool ContainsGet(string logLine) { Match match = Regex.Match(logLine, s_apacheRx); // Determine if valid log entry is a GET request if (match.Success) { Console.WriteLine("Full log entry: '{0}'", match.Groups[0].Value); // 5th column/group in schema is "method" if (match.Groups[5].Value == "GET") { return true; } } return false; }
作為最後一步,我們調用Spark.Stop()關閉底層的Spark會話和Spark上下文。
3 運行程式
需要使用spark-submit命令,該命令將提交您的應用程式以便在Apache Spark上運行。
spark-submit命令包括:
-
-
- –class,用於調用DotnetRunner
- –master, 用於定義是本地還是雲端的Spark提交
- Path,Microsoft.Spark jar的路徑
- 應用程式的其他參數或依賴項,例如輸入文件或包含UDF定義的dll的路徑。
-
運行應用程式的Windows命令示例如下:
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<version>.jar dotnet /path/to/netcoreapp<version>/LoggingApp.dll
4 .NET for Apache Spark總結
這裡只是一個簡單的介紹,.NET for Apache Spark是一個免費、開源、跨平台的大數據分析框架,更多的功能需要讀者進一步深入學習。
名詞解釋
Ad-hoc Query
Ad-hoc Query是一種「on-the-fly」的特殊查詢。也就是說,在每一個查詢操作被執行之前,查詢的目標對象是不明確的。
比如下面這樣一條語句:
var mySqlQuery = "SELECT * FROM table WHERE id = " + std_name;
每次執行這一條查詢的時候返回的結果都可能會不一樣,這取決於std_name的值。