Mediapipe 在RK3399PRO上的初探(二)(自定義Calculator)

PS:要轉載請註明出處,本人版權所有。

PS: 這個只是基於《我自己》的理解,

如果和你的原則及想法相衝突,請諒解,勿噴。

前置說明

  本文作為本人csdn blog的主站的備份。(BlogID=104)

環境說明
  • Ubuntu 18.04
  • gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04)
  • RK3399PRO 板卡

前言


  本文有一篇前置文章為《Mediapipe 在RK3399PRO上的初探(一)(編譯、運行CPU和GPU Demo, RK OpenglES 填坑,編譯bazel)》 //blog.csdn.net/u011728480/article/details/115838306 ,本文是在前置文章上的一點點探索與發現。

  在前置文章中,我們介紹了一些編譯和使用Mediapipe的注意事項,也初步跑起來了一點點demo,算是對這個框架有了一個初步的認知。但是前置文章也說了,了解這個框架的意義是替換我們小組現有的框架,而且能夠支撐我們的板卡產品。於是我們還需要一個最最最最最最重要的功能就是「Custormer Calculator」,就是自定義計算節點,因為這個框架的核心就是計算節點。下文我們將會講到這個框架的一些基本概念,這些概念都是來至於官方文檔的機翻+我自己的理解。同時,我會給出一個我定義的計算節點的實例,算是給大家一個感性認知。

Mediapipe的一些概念(本小節基本來至於官方文檔的機翻+我自己的理解,不感興趣,請直接看下一小節)


  本小節內容,主要參考 //github.com/google/mediapipe/tree/master/docs/framework_concepts 的幾個介紹概念的md文件。
  MediaPipe 感覺中文直譯為「媒體管道」,為啥會有這個名字呢?因為它把數據+處理組合成一個計算節點,然後把一個一個節點連接起來,用數據來驅動整個核心邏輯。如果大家對Caffe、ncnn類似的框架源碼有一點點了解的話,就會覺得他們非常像,但是又不像。像是說,都是通過配置文件定義計算節點邏輯圖,然後通過運算,得到我們想得到的邏輯圖中節點的數據。不像的話,就是說的是Mediapipe的調度機制了,極大的增加了節點計算的並行功能,而那些框架是按照圖節點的上下順序進行執行的。
  上面這段話可能有點抽象,我想表達的就是 Mediapipe 就是把任何一個「操作」都可以變為一個Calculator,因為我們的每一個項目的邏輯抽象出來都是 Calculator0+Data0->Calculator1+Data1->Calculator2+Data2->… …,然後,Mediapipe 做的是基於這種calculator的調度和執行。這裡我舉個栗子:

  • 人臉圖+檢測演算法=人臉檢測結果,這是一個Calculator
  • RTSP流+解碼模組=解碼之後的圖片,這是一個Calculator

好了,其他的就不過分的解讀了,下面就使用MediaPipe的helloworld example( //github.com/google/mediapipe/blob/master/mediapipe/examples/desktop/hello_world/hello_world.cc )為例,簡單的說說以下幾個概念。

Packet

  Packet,是mediapipe中的數據單元,它可以接收任意類型的數據。也是mediapipe中的數據流動單元。就是在mediapipe中,我們設計的Graph中,所有的邏輯流動都是通過packet流動來實現的。

  實例程式碼片段:

//MakePacket<std::string>("Hello World!") 創建一個packet,順帶說一句,我不喜歡這裡的宏,不利於維護
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream("in", MakePacket<std::string>("Hello World!").At(Timestamp(i))));

//從packet中獲取數據
mediapipe::Packet packet;
packet.Get<std::string>();
Graph

  Graph是由各個Calculator組成的,可以直接把Calculator理解為數據結構中圖的節點。而Graph直接把他當做圖就行了。Graph是我們定義的邏輯流程的具體載體,也就是說我們的業務邏輯是什麼樣子的,那麼Graph裡面就會有相應的邏輯流程。可以具備多輸入輸出。

  實例程式碼片段:

CalculatorGraph graph;
Caculator(Node)

  上面不是介紹了Graph和Packet嘛,這裡的Calculator就是Graph裡面的節點,也是處理Packet的具體單元。可以具備多輸入輸出。

  實例程式碼片段:

//比如mediapipe/calculators/core/pass_through_calculator.h裡面的定義,這個Calculator被Helloworld這個例子使用,作用就是把輸入的數據直接傳遞到輸出,不做任何處理,類似NOP
class PassThroughCalculator : public CalculatorBase 
Stream

  Stream 就是 Caculator 之間的連接起來後,形成的一個數據流動路徑。

Side packets

  Side packets 可以直接理解為一些靜態的數據packet,在graph創建之後就不會改變的數據。

… …

自定義實現Calculator


  Talk is cheap, show me the code.

/*
 * @Description: 
 * @Author: Sky
 * @Date: 
 * @LastEditors: Sky
 * @LastEditTime: 
 * @Github: 
 */
#include <cstdio>
#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/port/parse_text_proto.h"
#include "mediapipe/framework/port/status.h"

//customer calculator
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/canonical_errors.h"


class CustomerDataType{

  public:
  CustomerDataType(int i, float f, bool b, const std::string & str):
  val_i(i),
  val_f(f),
  val_b(b),
  s_str(str)
  {}
  int val_i = 1;
  float val_f = 11.f;
  bool val_b = true;
  std::string s_str = "customer str.";
};



namespace mediapipe {

  class MyStringProcessCalculator : public CalculatorBase {
  public:
    /*
    Calculator authors can specify the expected types of inputs and outputs of a calculator in GetContract(). 
    When a graph is initialized, the framework calls a static method to verify if the packet types of the connected inputs and outputs match the information in this specification.
    */
    static absl::Status GetContract(CalculatorContract* cc) {
      
      /*
      class InputStreamShard;
      typedef internal::Collection<InputStreamShard> InputStreamShardSet;
      class OutputStreamShard;
      typedef internal::Collection<OutputStreamShard> OutputStreamShardSet;
      */
      //cc->Inputs().NumEntries() returns the number of input streams
      // if (!cc->Inputs().TagMap()->SameAs(*cc->Outputs().TagMap())) {

      //   return absl::InvalidArgumentError("Input and output streams's TagMap can't be same.");
      // }

      //set stream
      // for (CollectionItemId id = cc->Inputs().BeginId(); id < cc->Inputs().EndId(); ++id) {

      //   cc->Inputs().Get(id).SetAny();
      //   cc->Outputs().Get(id).SetSameAs(&cc->Inputs().Get(id));
      // }
      cc->Inputs().Index(0).SetAny();
      cc->Inputs().Index(1).Set<CustomerDataType>();

      cc->Outputs().Index(0).SetSameAs(&cc->Inputs().Index(0));


      //set stream package
      // for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {
        
      //   cc->InputSidePackets().Get(id).SetAny();
      // }
      // cc->InputSidePackets().Index(0).SetAny();
      // cc->InputSidePackets().Index(1).Set<CustomerDataType>();//set customer data-type


      if (cc->OutputSidePackets().NumEntries() != 0) {

        // if (!cc->InputSidePackets().TagMap()->SameAs(*cc->OutputSidePackets().TagMap())) {

        //   return absl::InvalidArgumentError("Input and output side packets's TagMap can't be same.");
        // }
        
        // for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {

        //   cc->OutputSidePackets().Get(id).SetSameAs(&cc->InputSidePackets().Get(id));
        // }

        cc->OutputSidePackets().Index(0).SetSameAs(&cc->InputSidePackets().Index(0));
      }      

      return absl::OkStatus();
    }

    absl::Status Open(CalculatorContext* cc) final {

      for (CollectionItemId id = cc->Inputs().BeginId();id < cc->Inputs().EndId(); ++id) {

        if (!cc->Inputs().Get(id).Header().IsEmpty()) {

          cc->Outputs().Get(id).SetHeader(cc->Inputs().Get(id).Header());
        }
      }

      if (cc->OutputSidePackets().NumEntries() != 0) {

        for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {
          
          cc->OutputSidePackets().Get(id).Set(cc->InputSidePackets().Get(id));
        }
      }

      // Sets this packet timestamp offset for Packets going to all outputs.
      // If you only want to set the offset for a single output stream then
      // use OutputStream::SetOffset() directly.
      cc->SetOffset(TimestampDiff(0));

      return absl::OkStatus();
    }

    absl::Status Process(CalculatorContext* cc) final {

      if (cc->Inputs().NumEntries() == 0) {
        return tool::StatusStop();
      }

      //get node input data
      mediapipe::Packet  _data0 = cc->Inputs().Index(0).Value();
      mediapipe::Packet  _data1 = cc->Inputs().Index(1).Value();

      //not safety.
      char _tmp_buf[1024];
      
      ::memset(_tmp_buf, 0, 1024);

      snprintf(_tmp_buf, 1024, _data0.Get<std::string>().c_str(), _data1.Get<CustomerDataType>().val_i, _data1.Get<CustomerDataType>().val_f, _data1.Get<CustomerDataType>().val_b, _data1.Get<CustomerDataType>().s_str.c_str());

      std::string _out_data = _tmp_buf;
      cc->Outputs().Index(0).AddPacket(MakePacket<std::string>(_out_data).At(cc->InputTimestamp()));

      return absl::OkStatus();
    }

    absl::Status Close(CalculatorContext* cc) final { 

      return absl::OkStatus(); 
    }
  };

  REGISTER_CALCULATOR(MyStringProcessCalculator);
}




namespace mediapipe {

  absl::Status  RunMyGraph() {

    // Configures a simple graph, which concatenates 2 PassThroughCalculators.
    CalculatorGraphConfig config = ParseTextProtoOrDie<CalculatorGraphConfig>(R"(
      input_stream: "in"
      input_stream: "customer_in"
      output_stream: "out"
      node {
        calculator: "PassThroughCalculator"
        input_stream: "in"
        output_stream: "out1"
      }
      node {
        calculator: "MyStringProcessCalculator"
        input_stream: "out1"
        input_stream: "customer_in"
        output_stream: "out2"        
      }      
      node {
        calculator: "PassThroughCalculator"
        input_stream: "out2"
        output_stream: "out"
      }
    )");
    LOG(INFO)<<"parse graph cfg-str done ... ...";

    CalculatorGraph graph;
    MP_RETURN_IF_ERROR(graph.Initialize(config));
    LOG(INFO)<<"init graph done ... ...";

    ASSIGN_OR_RETURN(OutputStreamPoller poller,
                    graph.AddOutputStreamPoller("out"));
    LOG(INFO)<<"add out-node to output-streampoller done ... ...";


    MP_RETURN_IF_ERROR(graph.StartRun({}));
    LOG(INFO)<<"start run graph done ... ...";
    

    // Give 10 input packets that contains the same std::string "Hello World!".
    for (int i = 0; i < 10; ++i) {

      MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
          "in", MakePacket<std::string>("CustomerCalculator: val_i %d, val_f %f, val_b %d, val_str %s").At(Timestamp(i))));

      MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
          "customer_in", MakePacket<CustomerDataType>(i, i + 1.f, i%2==0, "s" + std::to_string(i)).At(Timestamp(i))));
    }
    // Close the input stream "in".
    MP_RETURN_IF_ERROR(graph.CloseInputStream("in"));
    MP_RETURN_IF_ERROR(graph.CloseInputStream("customer_in"));

    mediapipe::Packet packet;
    // Get the output packets std::string.
    while (poller.Next(&packet)) {
      LOG(INFO) << packet.Get<std::string>();
    }

    LOG(INFO)<<"RunGraph Done";
    return graph.WaitUntilDone();
  }

}  // namespace mediapipe



int main(int argc, char** argv) {
  
  gflags::ParseCommandLineFlags(&argc, &argv, true);

  FLAGS_minloglevel = 0;
  FLAGS_stderrthreshold = 0;
  FLAGS_alsologtostderr = 1;

  google::InitGoogleLogging(argv[0]);

  LOG(INFO) << "glog init success ... ...";

  absl::Status run_status = mediapipe::RunMyGraph();
  if (!run_status.ok())
    LOG(ERROR) << "Failed to run the graph: " << run_status.message();

  google::ShutdownGoogleLogging();
  return 0;
}

下面簡單介紹這段程式碼。

自定義Calculator:MyStringProcessCalculator

   這裡自定義了一個Calculator,主要作用就是傳入snprintf的fmt字元串和fmt字元串所需要的數據。所以可以看到有兩個輸入,一個是string,一個是我自定義的data-type。輸出是一個格式化之後的字元串,所以輸出是string。

   自定義Calculator主要還是實現4個介面,分別是GetContract,Open,Process,Close。其中GetContract是Graph初始化的時候,檢查Calculator用的。Open介面是在Graph開始後,對Calculator做一些初始化工作,例如設定一些Calculator初始狀態等。Process是實際的Calculator功能。

namespace mediapipe {

  class MyStringProcessCalculator : public CalculatorBase {
  public:
    /*
    Calculator authors can specify the expected types of inputs and outputs of a calculator in GetContract(). 
    When a graph is initialized, the framework calls a static method to verify if the packet types of the connected inputs and outputs match the information in this specification.
    */
    static absl::Status GetContract(CalculatorContract* cc) {
      
      /*
      class InputStreamShard;
      typedef internal::Collection<InputStreamShard> InputStreamShardSet;
      class OutputStreamShard;
      typedef internal::Collection<OutputStreamShard> OutputStreamShardSet;
      */
      //cc->Inputs().NumEntries() returns the number of input streams
      // if (!cc->Inputs().TagMap()->SameAs(*cc->Outputs().TagMap())) {

      //   return absl::InvalidArgumentError("Input and output streams's TagMap can't be same.");
      // }

      //set stream
      // for (CollectionItemId id = cc->Inputs().BeginId(); id < cc->Inputs().EndId(); ++id) {

      //   cc->Inputs().Get(id).SetAny();
      //   cc->Outputs().Get(id).SetSameAs(&cc->Inputs().Get(id));
      // }
      cc->Inputs().Index(0).SetAny();
      cc->Inputs().Index(1).Set<CustomerDataType>();

      cc->Outputs().Index(0).SetSameAs(&cc->Inputs().Index(0));


      //set stream package
      // for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {
        
      //   cc->InputSidePackets().Get(id).SetAny();
      // }
      // cc->InputSidePackets().Index(0).SetAny();
      // cc->InputSidePackets().Index(1).Set<CustomerDataType>();//set customer data-type


      if (cc->OutputSidePackets().NumEntries() != 0) {

        // if (!cc->InputSidePackets().TagMap()->SameAs(*cc->OutputSidePackets().TagMap())) {

        //   return absl::InvalidArgumentError("Input and output side packets's TagMap can't be same.");
        // }
        
        // for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {

        //   cc->OutputSidePackets().Get(id).SetSameAs(&cc->InputSidePackets().Get(id));
        // }

        cc->OutputSidePackets().Index(0).SetSameAs(&cc->InputSidePackets().Index(0));
      }      

      return absl::OkStatus();
    }

    absl::Status Open(CalculatorContext* cc) final {

      for (CollectionItemId id = cc->Inputs().BeginId();id < cc->Inputs().EndId(); ++id) {

        if (!cc->Inputs().Get(id).Header().IsEmpty()) {

          cc->Outputs().Get(id).SetHeader(cc->Inputs().Get(id).Header());
        }
      }

      if (cc->OutputSidePackets().NumEntries() != 0) {

        for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {
          
          cc->OutputSidePackets().Get(id).Set(cc->InputSidePackets().Get(id));
        }
      }

      // Sets this packet timestamp offset for Packets going to all outputs.
      // If you only want to set the offset for a single output stream then
      // use OutputStream::SetOffset() directly.
      cc->SetOffset(TimestampDiff(0));

      return absl::OkStatus();
    }
    //這裡是整個Calculator的核心,就是調用snprintf
    absl::Status Process(CalculatorContext* cc) final {

      if (cc->Inputs().NumEntries() == 0) {
        return tool::StatusStop();
      }

      //get node input data
      mediapipe::Packet  _data0 = cc->Inputs().Index(0).Value();
      mediapipe::Packet  _data1 = cc->Inputs().Index(1).Value();

      //not safety.
      char _tmp_buf[1024];
      
      ::memset(_tmp_buf, 0, 1024);

      snprintf(_tmp_buf, 1024, _data0.Get<std::string>().c_str(), _data1.Get<CustomerDataType>().val_i, _data1.Get<CustomerDataType>().val_f, _data1.Get<CustomerDataType>().val_b, _data1.Get<CustomerDataType>().s_str.c_str());

      std::string _out_data = _tmp_buf;
      cc->Outputs().Index(0).AddPacket(MakePacket<std::string>(_out_data).At(cc->InputTimestamp()));

      return absl::OkStatus();
    }

    absl::Status Close(CalculatorContext* cc) final { 

      return absl::OkStatus(); 
    }
  };

  REGISTER_CALCULATOR(MyStringProcessCalculator);
}
然後開始編譯運行得到結果

   編譯。


# 注意,這裡的--check_visibility=false 為了關閉bazel關於target之間的可見性檢查,因為我的Calculator自定義放在我自己的目錄的,有一個target對這個目錄不可見,編譯會報錯。

bazel build -c dbg --define MEDIAPIPE_DISABLE_GPU=1 --copt -DMESA_EGL_NO_X11_HEADERS --copt -DEGL_NO_X11 my_target --check_visibility=false --verbose_failures  --local_cpu_resources=1

   然後運行。得到如下圖的結果。

result

後記


  好了,一個超級簡單的自定義calculator已經實現了,相信你已經明白了吧。本系列也就此終結吧,以後隨緣更新。


打賞、訂閱、收藏、丟香蕉、硬幣,請關注公眾號(攻城獅的搬磚之路)
qrc_img

PS: 請尊重原創,不喜勿噴。

PS: 要轉載請註明出處,本人版權所有。

PS: 有問題請留言,看到後我會第一時間回復。