Alink漫談(二十二) :源碼分析之聚類評估

Alink漫談(二十二) :源碼分析之聚類評估

0x00 摘要

Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平台,是業界首個同時支持批式算法、流式算法的機器學習平台。本文和上文將帶領大家來分析Alink中 聚類評估 的實現。

0x01 背景概念

1.1 什麼是聚類

聚類(Clustering),用通俗的話來說,就是物以類聚,人以群分。

聚類是觀察式學習,而不是示例式的學習。聚類能夠作為一個獨立的工具獲得數據的分佈狀況,觀察每一簇數據的特徵,集中對特定的聚簇集合作進一步地分析。

聚類分析還可以作為其他數據挖掘任務(如分類、關聯規則)的預處理步驟。

1.2 聚類分析的方法

聚類分析可以大致分為如下方法:

劃分方法

  • Construct various partitions and then evaluate them by some criterion,e.g.,minimizing the sum of square errors
  • Typical methods:k-means,k-medoids,CLARANS

層次方法:

  • Create a hierarchical decomposition of the set of data (or objects) using some criterion
  • Typical methods: Diana,Agnes,BIRCH,CAMELEON

基於密度的方法:

  • Based on connectivity and density functions
  • Typical methods: DBSCAN,OPTICS,DenClue

基於網格的方法:

  • Based on multiple-level granularity structure
  • Typical methods: STING,WaveCluster,CLIQUE

基於模型的方法:

  • A model is hypothesized for each of the clusters and tries to find the best fit of that model to each other
  • Typical methods: EM,SOM,COBWEB

基於頻繁模式的方法:

  • Based on the analysis of frequent patterns
  • Typical methods: p-Cluster

基於約束的方法:

  • Clustering by considering user-specified or application-specific constraints
  • Typical methods: COD(obstacles),constrained clustering

基於鏈接的方法:

  • Objects are often linked together in various ways
  • Massive links can be used to cluster objects: SimRank,LinkClus

1.3 聚類評估

聚類評估估計在數據集上進行聚類的可行性和被聚類方法產生的結果的質量。聚類評估主要包括:估計聚類趨勢、確定數據集中的簇數、測定聚類質量。

估計聚類趨勢:對於給定的數據集,評估該數據集是否存在非隨機結構。盲目地在數據集上使用聚類方法將返回一些簇,所挖掘的簇可能是誤導。數據集上的聚類分析是有意義的,僅當數據中存在非隨機結構。

聚類趨勢評估確定給定的數據集是否具有可以導致有意義的聚類的非隨機結構。一個沒有任何非隨機結構的數據集,如數據空間中均勻分佈的點,儘管聚類算法可以為該數據集返回簇,但這些簇是隨機的,沒有任何意義。聚類要求數據的非均勻分佈。

測定聚類質量:在數據集上使用聚類方法之後,需要評估結果簇的質量。

具體有兩類方法:外在方法和內在方法

  • 外在方法:有監督的方法,需要基準數據。用一定的度量評判聚類結果與基準數據的符合程度。
  • 內在方法:無監督的方法,無需基準數據。類內聚集程度和類間離散程度。

0x02 Alink支持的評估指標

Alink文檔中如下:聚類評估是對聚類算法的預測結果進行效果評估,支持下列評估指標。但是實際從其測試代碼中可以發現更多。

Compactness(CP), CP越低意味着類內聚類距離越近

\[\overline{CP_i}=\dfrac{1}{|C_i|}\sum_{x \in C_i}\|x_i-u_i\|
\]

\[\overline{CP}=\dfrac{1}{k}\sum_{i=1}^{k}\overline{CP_k}
\]

Seperation(SP), SP越高意味類間聚類距離越遠

\[SP=\dfrac{2}{k^2-k}\sum_{i=1}^{k}\sum_{j=i+1}^{k}\|u_i-u_j\|
\]

Davies-Bouldin Index(DB), DB越小意味着類內距離越小 同時類間距離越大

\[DB=\dfrac{1}{k}\sum_{i=1}^{k}max(\dfrac{\overline{CP_i}+\overline{CP_j}}{\|u_i-u_j\|}), i \not= j
\]

Calinski-Harabasz Index(VRC), VRC越大意味着聚類質量越好

\[SSB=\sum_{i=1}^{k}n_i\|u_i-u\|^2
\]

\[SSW=\sum_{i=1}^{k}\sum_{x \in C_i}\|x_i-u_i\|
\]

\[VRC=\dfrac{SSB}{SSW}*\dfrac{N-k}{k-1}
\]

從其測試代碼中,我們可以發現更多指標:

Assert.assertEquals(metrics.getCalinskiHarabaz(), 12150.00, 0.01);
Assert.assertEquals(metrics.getCompactness(), 0.115, 0.01);
Assert.assertEquals(metrics.getCount().intValue(), 6);
Assert.assertEquals(metrics.getDaviesBouldin(), 0.014, 0.01);
Assert.assertEquals(metrics.getSeperation(), 15.58, 0.01);
Assert.assertEquals(metrics.getK().intValue(), 2);
Assert.assertEquals(metrics.getSsb(), 364.5, 0.01);
Assert.assertEquals(metrics.getSsw(), 0.119, 0.01);
Assert.assertEquals(metrics.getPurity(), 1.0, 0.01);
Assert.assertEquals(metrics.getNmi(), 1.0, 0.01);
Assert.assertEquals(metrics.getAri(), 1.0, 0.01);
Assert.assertEquals(metrics.getRi(), 1.0, 0.01);
Assert.assertEquals(metrics.getSilhouetteCoefficient(), 0.99,0.01);

我們需要介紹幾個指標

2.1 輪廓係數(silhouette coefficient):

對於D中的每個對象o,計算:

  • a(o) : o與o所屬的簇內其他對象之間的平均距離a(o) 。
  • b(o) : 是o到不包含o的所有簇的最小平均距離。

得到輪廓係數定義為:

\[s(o)=\dfrac{b(o)-a(o)}{max\{a(o),b(o)\}}
\]

輪廓係數的值在-1和1之間。

a(o)的值反映o所屬的簇的緊湊性。該值越小,簇越緊湊。

b(o)的值捕獲o與其他簇的分離程度。b(o)的值越大,o與其他簇越分離。

當o的輪廓係數值接近1時,包含o的簇是緊湊的,並且o遠離其他簇,這是一種可取的情況。

當輪廓係數的值為負時,這意味在期望情況下,o距離其他簇的對象比距離與自己同在簇的對象更近,許多情況下,這很糟糕,應當避免。

2.2 Calinski-Harabaz(CH)

CH指標通過計算類中各點與類中心的距離平方和來度量類內的緊密度,通過計算各類中心點與數據集中心點距離平方和來度量數據集的分離度,CH指標由分離度與緊密度的比值得到。從而,CH越大代表着類自身越緊密,類與類之間越分散,即更優的聚類結果。

CH和輪廓係數適用於實際類別信息未知的情況。

2.3 Davies-Bouldin指數(Dbi)

戴維森堡丁指數(DBI),又稱為分類適確性指標,是由大衛戴維斯和唐納德·Bouldin提出的一種評估聚類算法優劣的指標。

這個DBI就是計算類內距離之和與類外距離之比,來優化k值的選擇,避免K-means算法中由於只計算目標函數Wn而導致局部最優的情況。

2.4 Rand index(蘭德指數)(RI) 、Adjusted Rand index(調整蘭德指數)(ARI)

img

其中C表示實際類別信息,K表示聚類結果,a表示在C與K中都是同類別的元素對數,b表示在C與K中都是不同類別的元素對數。

RI取值範圍為[0,1],值越大意味着聚類結果與真實情況越吻合。RI越大表示聚類效果準確性越高 同時每個類內的純度越高

為了實現「在聚類結果隨機產生的情況下,指標應該接近零」,調整蘭德係數(Adjusted rand index)被提出,它具有更高的區分度:

img

ARI取值範圍為[−1,1],值越大意味着聚類結果與真實情況越吻合。從廣義的角度來講,ARI衡量的是兩個數據分佈的吻合程度。

0x03 示例代碼

聚類評估示例代碼如下:

public class EvalClusterBatchOpExp {
    public static void main(String[] args) throws Exception {
        Row[] rows = new Row[] {
                Row.of(0, "0,0,0"),
                Row.of(0, "0.1,0.1,0.1"),
                Row.of(0, "0.2,0.2,0.2"),
                Row.of(1, "9,9,9"),
                Row.of(1, "9.1,9.1,9.1"),
                Row.of(1, "9.2,9.2,9.2")
        };

        MemSourceBatchOp inOp = new MemSourceBatchOp(Arrays.asList(rows), new String[] {"label", "Y"});

        KMeans train = new KMeans()
                .setVectorCol("Y")
                .setPredictionCol("pred")
                .setK(2);

        ClusterMetrics metrics = new EvalClusterBatchOp()
                .setPredictionCol("pred")
                .setVectorCol("Y")
                .setLabelCol("label")
                .linkFrom(train.fit(inOp).transform(inOp))
                .collectMetrics();

        System.out.println(metrics.getCalinskiHarabaz());
        System.out.println(metrics.getCompactness());
        System.out.println(metrics.getCount());
        System.out.println(metrics.getDaviesBouldin());
        System.out.println(metrics.getSeperation());
        System.out.println(metrics.getK());
        System.out.println(metrics.getSsb());
        System.out.println(metrics.getSsw());
        System.out.println(metrics.getPurity());
        System.out.println(metrics.getNmi());
        System.out.println(metrics.getAri());
        System.out.println(metrics.getRi());
        System.out.println(metrics.getSilhouetteCoefficient());
    }
}

輸出為:

12150.000000000042
0.11547005383792497
6
0.014814814814814791
15.588457268119896
2
364.5
0.1199999999999996
1.0
1.0
1.0
1.0
0.9997530305375205

0x04 總體邏輯

代碼整體邏輯如下:

  • label 相關指標計算操作
    • 使用 calLocalPredResult 對每個分區操作
      • flatMap 1 是打散Row,得到 Label y
      • flatMap 2 是打散Row,得到 y_hat,所以前兩步是得到 y 和 y_hat 的映射 map。這兩個會廣播給 CalLocalPredResult 使用。
      • 調用 CalLocalPredResult 建立混淆矩陣
    • 使用 reduce 歸併這些分區操作結果。
    • 使用 extractParamsFromConfusionMatrix 根據混淆矩陣計算 purity, NMI等指標
  • Vector相關指標計算操作
    • 對數據按照類別進行分組
    • 分組歸併,調用 CalcClusterMetricsSummary分佈式計算向量相關的指標
      • 遍歷 rows,累積到 sumVector
      • 循環,計算出若干統計信息
    • 調用 ReduceBaseMetrics,再歸併,形成一個BaseMetricsSummary
    • 調用 calSilhouetteCoefficient 來計算 SilhouetteCoefficient
    • 把數據存儲為Params
  • 合併輸出
    • 做了一個 union,把 labelMetrics 和 vectorMetrics 聯合起來,再歸併輸出到最後的表中
    • 分組歸併
    • 輸出到最後表

具體代碼如下:

public EvalClusterBatchOp linkFrom(BatchOperator<?>... inputs) {
    BatchOperator in = checkAndGetFirst(inputs);
    String labelColName = this.getLabelCol();
    String predResultColName = this.getPredictionCol();
    String vectorColName = this.getVectorCol();
    DistanceType distanceType = getDistanceType();
    ContinuousDistance distance = distanceType.getFastDistance();

    DataSet<Params> empty = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment().fromElements(
        new Params());
    DataSet<Params> labelMetrics = empty, vectorMetrics;

    if (null != labelColName) { // 針對 label 操作
        // 獲取數據
        DataSet<Row> data = in.select(new String[] {labelColName, predResultColName}).getDataSet();
        // 使用 calLocalPredResult 對每個分區操作
        labelMetrics = calLocalPredResult(data)
            .reduce(new ReduceFunction<LongMatrix>() { // 使用 reduce 歸併這些分區操作結果
                @Override
                public LongMatrix reduce(LongMatrix value1, LongMatrix value2) {
                    value1.plusEqual(value2);
                    return value1;
                }
            })
            .map(new MapFunction<LongMatrix, Params>() { 
                @Override
                public Params map(LongMatrix value) {
                    // 使用  extractParamsFromConfusionMatrix 根據混淆矩陣計算 purity, NMI等指標
                    return ClusterEvaluationUtil.extractParamsFromConfusionMatrix(value);
                }
            });
    }
    if (null != vectorColName) {
        // 獲取數據
        DataSet<Row> data = in.select(new String[] {predResultColName, vectorColName}).getDataSet();
      
        DataSet<BaseMetricsSummary> metricsSummary = data
            .groupBy(0) // 對數據按照類別進行分組
            .reduceGroup(new CalcClusterMetricsSummary(distance)) // 分佈式計算向量相關的指標
            .reduce(new EvaluationUtil.ReduceBaseMetrics());// 歸併
        DataSet<Tuple1<Double>> silhouetteCoefficient = data.map(  // 計算silhouette
            new RichMapFunction<Row, Tuple1<Double>>() {
                @Override
                public Tuple1<Double> map(Row value) {
                    List<BaseMetricsSummary> list = getRuntimeContext().getBroadcastVariable(METRICS_SUMMARY);
                    return ClusterEvaluationUtil.calSilhouetteCoefficient(value,
                        (ClusterMetricsSummary)list.get(0));
                }
            }).withBroadcastSet(metricsSummary, METRICS_SUMMARY)
            .aggregate(Aggregations.SUM, 0);

        // 把數據存儲為Params
        vectorMetrics = metricsSummary.map(new ClusterEvaluationUtil.SaveDataAsParams()).withBroadcastSet( 
            silhouetteCoefficient, SILHOUETTE_COEFFICIENT);
    } else {
        vectorMetrics = in.select(predResultColName)
            .getDataSet()
            .reduceGroup(new BasicClusterParams());
    }

    DataSet<Row> out = labelMetrics
        .union(vectorMetrics) // 把 labelMetrics 和 vectorMetrics 聯合起來
        .reduceGroup(new GroupReduceFunction<Params, Row>() { // 分組歸併
            @Override
            public void reduce(Iterable<Params> values, Collector<Row> out) {
                Params params = new Params();
                for (Params p : values) {
                    params.merge(p);
                }
                out.collect(Row.of(params.toJson()));
            }
        });
    // 輸出到最後表
    this.setOutputTable(DataSetConversionUtil.toTable(getMLEnvironmentId(),
        out, new TableSchema(new String[] {EVAL_RESULT}, new TypeInformation[] {Types.STRING}) 
    ));
    return this;
}

0x05 針對 label 操作

5.1 calLocalPredResult

因為前面有 DataSet<Row> data = in.select(new String[] {labelColName, predResultColName}).getDataSet();,所以這裡處理的就是 y 和 y_hat。

有兩個 flatMap 串起來。

  • flatMap 1 是打散Row,得到 Label y
  • flatMap 2 是打散Row,得到 y_hat

兩個 flatMap 都接了 DistinctLabelIndexMap 和 project(0),DistinctLabelIndexMap 作用是 Give each label an ID, return a map of label and ID.,就是給每一個 ID 一個 label。project(0)就是提取出 label。

所以前兩步是得到 y 和 y_hat 的映射 map。這兩個會廣播給 CalLocalPredResult 使用。

第三步是調用 CalLocalPredResult 建立混淆矩陣。

具體代碼如下:

private static DataSet<LongMatrix> calLocalPredResult(DataSet<Row> data) {

    // 打散Row,得到 Label y
    DataSet<Tuple1<Map<String, Integer>>> labels = data.flatMap(new FlatMapFunction<Row, String>() {
        @Override
        public void flatMap(Row row, Collector<String> collector) {
            if (EvaluationUtil.checkRowFieldNotNull(row)) {
                collector.collect(row.getField(0).toString());
            }
        }
    }).reduceGroup(new EvaluationUtil.DistinctLabelIndexMap(false, null)).project(0);
    // 打散Row,得到 y_hat
    DataSet<Tuple1<Map<String, Integer>>> predictions = data.flatMap(new FlatMapFunction<Row, String>() {
        @Override
        public void flatMap(Row row, Collector<String> collector) {
            if (EvaluationUtil.checkRowFieldNotNull(row)) {
                collector.collect(row.getField(1).toString());
            }
        }
    }).reduceGroup(new EvaluationUtil.DistinctLabelIndexMap(false, null)).project(0);

    // 前兩步是得到 y 和 y_hat 的映射 map。這兩個會廣播給 CalLocalPredResult 使用
    // Build the confusion matrix.
    DataSet<LongMatrix> statistics = data
        .rebalance()
        .mapPartition(new CalLocalPredResult())
        .withBroadcastSet(labels, LABELS)
        .withBroadcastSet(predictions, PREDICTIONS);

    return statistics;
}

CalLocalPredResult 建立混淆矩陣。

  • open函數中,會從系統中獲取 y 和 y_hat。
  • mapPartition函數中,建立混淆矩陣。
matrix = {long[2][]@10707} 
 0 = {long[2]@10709} 
  0 = 0
  1 = 0
 1 = {long[2]@10710} 
  0 = 1
  1 = 0

代碼是:

static class CalLocalPredResult extends RichMapPartitionFunction<Row, LongMatrix> {
    private Map<String, Integer> labels, predictions;

    @Override
    public void open(Configuration parameters) throws Exception {
        List<Tuple1<Map<String, Integer>>> list = getRuntimeContext().getBroadcastVariable(LABELS);
        this.labels = list.get(0).f0;
        list = getRuntimeContext().getBroadcastVariable(PREDICTIONS);
        this.predictions = list.get(0).f0;
    }

    @Override
    public void mapPartition(Iterable<Row> rows, Collector<LongMatrix> collector) {
        long[][] matrix = new long[predictions.size()][labels.size()];
        for (Row r : rows) {
            if (EvaluationUtil.checkRowFieldNotNull(r)) {
                int label = labels.get(r.getField(0).toString());
                int pred = predictions.get(r.getField(1).toString());
                matrix[pred][label] += 1;
            }
        }
        collector.collect(new LongMatrix(matrix));
    }
}

5.2 extractParamsFromConfusionMatrix

extractParamsFromConfusionMatrix 這裡就是根據混淆矩陣計算 purity, NMI 等一系列指標。

public static Params extractParamsFromConfusionMatrix(LongMatrix longMatrix) {
    long[][] matrix = longMatrix.getMatrix();
    long[] actualLabel = longMatrix.getColSums();
    long[] predictLabel = longMatrix.getRowSums();
    long total = longMatrix.getTotal();

    double entropyActual = 0.0;
    double entropyPredict = 0.0;
    double mutualInfor = 0.0;
    double purity = 0.0;
    long tp = 0L;
    long tpFpSum = 0L;
    long tpFnSum = 0L;
    for (long anActualLabel : actualLabel) {
        entropyActual += entropy(anActualLabel, total);
        tpFpSum += combination(anActualLabel);
    }
    entropyActual /= -Math.log(2);
    for (long aPredictLabel : predictLabel) {
        entropyPredict += entropy(aPredictLabel, total);
        tpFnSum += combination(aPredictLabel);
    }
    entropyPredict /= -Math.log(2);
    for (int i = 0; i < matrix.length; i++) {
        long max = 0;
        for (int j = 0; j < matrix[0].length; j++) {
            max = Math.max(max, matrix[i][j]);
            mutualInfor += (0 == matrix[i][j] ? 0.0 :
                1.0 * matrix[i][j] / total * Math.log(1.0 * total * matrix[i][j] / predictLabel[i] / actualLabel[j]));
            tp += combination(matrix[i][j]);
        }
        purity += max;
    }
    purity /= total;
    mutualInfor /= Math.log(2);
    long fp = tpFpSum - tp;
    long fn = tpFnSum - tp;
    long totalCombination = combination(total);
    long tn = totalCombination - tp - fn - fp;
    double expectedIndex = 1.0 * tpFpSum * tpFnSum / totalCombination;
    double maxIndex = 1.0 * (tpFpSum + tpFnSum) / 2;
    double ri = 1.0 * (tp + tn) / (tp + tn + fp + fn);
    return new Params()
        .set(ClusterMetrics.NMI, 2.0 * mutualInfor / (entropyActual + entropyPredict))
        .set(ClusterMetrics.PURITY, purity)
        .set(ClusterMetrics.RI, ri)
        .set(ClusterMetrics.ARI, (tp - expectedIndex) / (maxIndex - expectedIndex));
}

0x06 Vector相關

前兩步是分佈式計算 以及 歸併:

DataSet<BaseMetricsSummary> metricsSummary = data
    .groupBy(0)
    .reduceGroup(new CalcClusterMetricsSummary(distance))
    .reduce(new EvaluationUtil.ReduceBaseMetrics());

6.1 CalcClusterMetricsSummary

調用了 ClusterEvaluationUtil.getClusterStatistics 來進行計算。

public static class CalcClusterMetricsSummary implements GroupReduceFunction<Row, BaseMetricsSummary> {
    private ContinuousDistance distance;

    public CalcClusterMetricsSummary(ContinuousDistance distance) {
        this.distance = distance;
    }

    @Override
    public void reduce(Iterable<Row> rows, Collector<BaseMetricsSummary> collector) {
        collector.collect(ClusterEvaluationUtil.getClusterStatistics(rows, distance));
    }
}

ClusterEvaluationUtil.getClusterStatistics如下

public static ClusterMetricsSummary getClusterStatistics(Iterable<Row> rows, ContinuousDistance distance) {
    List<Vector> list = new ArrayList<>();
    int total = 0;
    String clusterId;
    DenseVector sumVector;

    Iterator<Row> iterator = rows.iterator();
    Row row = null;
    while (iterator.hasNext() && !EvaluationUtil.checkRowFieldNotNull(row)) {
        // 取出第一個不為空的item
        row = iterator.next();
    }
    if (EvaluationUtil.checkRowFieldNotNull(row)) {
        clusterId = row.getField(0).toString(); // 取出 clusterId
        Vector vec = VectorUtil.getVector(row.getField(1)); // 取出 Vector
        sumVector = DenseVector.zeros(vec.size()); // 初始化
    } else {
        return null;
    }

    while (null != row) { // 遍歷 rows,累積到 sumVector
        if (EvaluationUtil.checkRowFieldNotNull(row)) {
            Vector vec = VectorUtil.getVector(row.getField(1));
            list.add(vec);
            if (distance instanceof EuclideanDistance) {
                sumVector.plusEqual(vec);
            } else {
                vec.scaleEqual(1.0 / vec.normL2());
                sumVector.plusEqual(vec);
            }
            total++;
        }
        row = iterator.hasNext() ? iterator.next() : null;
    }

    DenseVector meanVector = sumVector.scale(1.0 / total); // 取mean

// runtime變量,這裡示例是第二組的向量  
list = {ArrayList@10654}  size = 3
 0 = {DenseVector@10661} "9.0 9.0 9.0"
 1 = {DenseVector@10662} "9.1 9.1 9.1"
 2 = {DenseVector@10663} "9.2 9.2 9.2"  
  
    double distanceSum = 0.0;
    double distanceSquareSum = 0.0;
    double vectorNormL2Sum = 0.0;
    for (Vector vec : list) { // 循環,計算出幾個統計信息
        double d = distance.calc(meanVector, vec);
        distanceSum += d;
        distanceSquareSum += d * d;
        vectorNormL2Sum += vec.normL2Square();
    }
  
// runtime變量
sumVector = {DenseVector@10656} "27.3 27.3 27.3"
meanVector = {DenseVector@10657} "9.1 9.1 9.1"
distanceSum = 0.34641016151377424
distanceSquareSum = 0.059999999999999575
vectorNormL2Sum = 745.3499999999999  
  
    return new ClusterMetricsSummary(clusterId, total, distanceSum / total, distanceSquareSum, vectorNormL2Sum,
        meanVector, distance);
}

6.2 ReduceBaseMetrics

這裡是進行歸併,形成一個BaseMetricsSummary。

/**
 * Merge the BaseMetrics calculated locally.
 */
public static class ReduceBaseMetrics implements ReduceFunction<BaseMetricsSummary> {
    @Override
    public BaseMetricsSummary reduce(BaseMetricsSummary t1, BaseMetricsSummary t2) throws Exception {
        return null == t1 ? t2 : t1.merge(t2);
    }
}

6.3 calSilhouetteCoefficient

第三步是調用 calSilhouetteCoefficient 來計算 SilhouetteCoefficient。

vectorMetrics = metricsSummary.map(new ClusterEvaluationUtil.SaveDataAsParams()).withBroadcastSet(
        silhouetteCoefficient, SILHOUETTE_COEFFICIENT);

這裡就是和公式一樣的處理

public static Tuple1<Double> calSilhouetteCoefficient(Row row, ClusterMetricsSummary clusterMetricsSummary) {
    if (!EvaluationUtil.checkRowFieldNotNull(row)) {
        return Tuple1.of(0.);
    }
    String clusterId = row.getField(0).toString();
    Vector vec = VectorUtil.getVector(row.getField(1));
    double currentClusterDissimilarity = 0.0;
    double neighboringClusterDissimilarity = Double.MAX_VALUE;
    if (clusterMetricsSummary.distance instanceof EuclideanDistance) {
        double normSquare = vec.normL2Square();
        for (int i = 0; i < clusterMetricsSummary.k; i++) {
            double dissimilarity = clusterMetricsSummary.clusterCnt.get(i) * normSquare
                - 2 * clusterMetricsSummary.clusterCnt.get(i) * MatVecOp.dot(vec, clusterMetricsSummary.meanVector.get(i)) + clusterMetricsSummary.vectorNormL2Sum.get(i);
            if (clusterId.equals(clusterMetricsSummary.clusterId.get(i))) {
                if (clusterMetricsSummary.clusterCnt.get(i) > 1) {
                    currentClusterDissimilarity = dissimilarity / (clusterMetricsSummary.clusterCnt.get(i) - 1);
                }
            } else {
                neighboringClusterDissimilarity = Math.min(neighboringClusterDissimilarity,
                    dissimilarity / clusterMetricsSummary.clusterCnt.get(i));
            }
        }
    } else {
        for (int i = 0; i < clusterMetricsSummary.k; i++) {
            double dissimilarity = 1.0 - MatVecOp.dot(vec, clusterMetricsSummary.meanVector.get(i));
            if (clusterId.equals(clusterMetricsSummary.clusterId.get(i))) {
                if (clusterMetricsSummary.clusterCnt.get(i) > 1) {
                    currentClusterDissimilarity = dissimilarity * clusterMetricsSummary.clusterCnt.get(i) / (clusterMetricsSummary.clusterCnt.get(i) - 1);
                }
            } else {
                neighboringClusterDissimilarity = Math.min(neighboringClusterDissimilarity,
                    dissimilarity);
            }
        }
    }
    return Tuple1.of(currentClusterDissimilarity < neighboringClusterDissimilarity ?
        1 - (currentClusterDissimilarity / neighboringClusterDissimilarity) :
        (neighboringClusterDissimilarity / currentClusterDissimilarity) - 1);
}

6.4 SaveDataAsParams

第四步是把數據存儲為Params

public static class SaveDataAsParams extends RichMapFunction<BaseMetricsSummary, Params> {
    @Override
    public Params map(BaseMetricsSummary t) throws Exception {
        Params params = t.toMetrics().getParams();
        List<Tuple1<Double>> silhouetteCoefficient = getRuntimeContext().getBroadcastVariable(
            EvalClusterBatchOp.SILHOUETTE_COEFFICIENT);
        params.set(ClusterMetrics.SILHOUETTE_COEFFICIENT,
            silhouetteCoefficient.get(0).f0 / params.get(ClusterMetrics.COUNT));
        return params;
    }
}

0x06 合併輸出

這一步做了一個 union,把 labelMetrics 和 vectorMetrics 聯合起來,再歸併輸出到最後的表中。

DataSet<Row> out = labelMetrics
    .union(vectorMetrics)
    .reduceGroup(new GroupReduceFunction<Params, Row>() {
        @Override
        public void reduce(Iterable<Params> values, Collector<Row> out) {
            Params params = new Params();
            for (Params p : values) {
                params.merge(p);
            }
            out.collect(Row.of(params.toJson()));
        }
    });

this.setOutputTable(DataSetConversionUtil.toTable(getMLEnvironmentId(),
    out, new TableSchema(new String[] {EVAL_RESULT}, new TypeInformation[] {Types.STRING})
));

0xFF 參考

聚類算法及其評估指標

[ML] 聚類評價指標

聚類結果的評價指標

聚類評價指標

如何評價聚類結果的好壞?

聚類評估算法-輪廓係數(Silhouette Coefficient )

聚類效果好壞的評價指標

ARI聚類效果評價指標

聚類算法評價指標——Davies-Bouldin指數(Dbi)

【每周一博】淺說Davies-Bouldin指數(DBI)

聚類算法評價指標

聚類模型性能評價指標