Mediapipe 在RK3399PRO上的初探(二)(自定義Calculator)
PS:要轉載請註明出處,本人版權所有。
PS: 這個只是基於《我自己》的理解,
如果和你的原則及想法相衝突,請諒解,勿噴。
前置說明
本文作為本人csdn blog的主站的備份。(BlogID=104)
環境說明
- Ubuntu 18.04
- gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04)
- RK3399PRO 板卡
前言
本文有一篇前置文章為《Mediapipe 在RK3399PRO上的初探(一)(編譯、運行CPU和GPU Demo, RK OpenglES 填坑,編譯bazel)》 //blog.csdn.net/u011728480/article/details/115838306 ,本文是在前置文章上的一點點探索與發現。
在前置文章中,我們介紹了一些編譯和使用Mediapipe的注意事項,也初步跑起來了一點點demo,算是對這個框架有了一個初步的認知。但是前置文章也說了,了解這個框架的意義是替換我們小組現有的框架,而且能夠支撐我們的板卡產品。於是我們還需要一個最最最最最最重要的功能就是「Custormer Calculator」,就是自定義計算節點,因為這個框架的核心就是計算節點。下文我們將會講到這個框架的一些基本概念,這些概念都是來至於官方文檔的機翻+我自己的理解。同時,我會給出一個我定義的計算節點的實例,算是給大家一個感性認知。
Mediapipe的一些概念(本小節基本來至於官方文檔的機翻+我自己的理解,不感興趣,請直接看下一小節)
本小節內容,主要參考 //github.com/google/mediapipe/tree/master/docs/framework_concepts 的幾個介紹概念的md文件。
MediaPipe 感覺中文直譯為「媒體管道」,為啥會有這個名字呢?因為它把數據+處理組合成一個計算節點,然後把一個一個節點連接起來,用數據來驅動整個核心邏輯。如果大家對Caffe、ncnn類似的框架源碼有一點點了解的話,就會覺得他們非常像,但是又不像。像是說,都是通過配置文件定義計算節點邏輯圖,然後通過運算,得到我們想得到的邏輯圖中節點的數據。不像的話,就是說的是Mediapipe的調度機制了,極大的增加了節點計算的並行功能,而那些框架是按照圖節點的上下順序進行執行的。
上面這段話可能有點抽象,我想表達的就是 Mediapipe 就是把任何一個「操作」都可以變為一個Calculator,因為我們的每一個項目的邏輯抽象出來都是 Calculator0+Data0->Calculator1+Data1->Calculator2+Data2->… …,然後,Mediapipe 做的是基於這種calculator的調度和執行。這裡我舉個栗子:
- 人臉圖+檢測演算法=人臉檢測結果,這是一個Calculator
- RTSP流+解碼模組=解碼之後的圖片,這是一個Calculator
好了,其他的就不過分的解讀了,下面就使用MediaPipe的helloworld example( //github.com/google/mediapipe/blob/master/mediapipe/examples/desktop/hello_world/hello_world.cc )為例,簡單的說說以下幾個概念。
Packet
Packet,是mediapipe中的數據單元,它可以接收任意類型的數據。也是mediapipe中的數據流動單元。就是在mediapipe中,我們設計的Graph中,所有的邏輯流動都是通過packet流動來實現的。
實例程式碼片段:
//MakePacket<std::string>("Hello World!") 創建一個packet,順帶說一句,我不喜歡這裡的宏,不利於維護
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream("in", MakePacket<std::string>("Hello World!").At(Timestamp(i))));
//從packet中獲取數據
mediapipe::Packet packet;
packet.Get<std::string>();
Graph
Graph是由各個Calculator組成的,可以直接把Calculator理解為數據結構中圖的節點。而Graph直接把他當做圖就行了。Graph是我們定義的邏輯流程的具體載體,也就是說我們的業務邏輯是什麼樣子的,那麼Graph裡面就會有相應的邏輯流程。可以具備多輸入輸出。
實例程式碼片段:
CalculatorGraph graph;
Caculator(Node)
上面不是介紹了Graph和Packet嘛,這裡的Calculator就是Graph裡面的節點,也是處理Packet的具體單元。可以具備多輸入輸出。
實例程式碼片段:
//比如mediapipe/calculators/core/pass_through_calculator.h裡面的定義,這個Calculator被Helloworld這個例子使用,作用就是把輸入的數據直接傳遞到輸出,不做任何處理,類似NOP
class PassThroughCalculator : public CalculatorBase
Stream
Stream 就是 Caculator 之間的連接起來後,形成的一個數據流動路徑。
Side packets
Side packets 可以直接理解為一些靜態的數據packet,在graph創建之後就不會改變的數據。
… …
自定義實現Calculator
Talk is cheap, show me the code.
/*
* @Description:
* @Author: Sky
* @Date:
* @LastEditors: Sky
* @LastEditTime:
* @Github:
*/
#include <cstdio>
#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/port/parse_text_proto.h"
#include "mediapipe/framework/port/status.h"
//customer calculator
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/canonical_errors.h"
class CustomerDataType{
public:
CustomerDataType(int i, float f, bool b, const std::string & str):
val_i(i),
val_f(f),
val_b(b),
s_str(str)
{}
int val_i = 1;
float val_f = 11.f;
bool val_b = true;
std::string s_str = "customer str.";
};
namespace mediapipe {
class MyStringProcessCalculator : public CalculatorBase {
public:
/*
Calculator authors can specify the expected types of inputs and outputs of a calculator in GetContract().
When a graph is initialized, the framework calls a static method to verify if the packet types of the connected inputs and outputs match the information in this specification.
*/
static absl::Status GetContract(CalculatorContract* cc) {
/*
class InputStreamShard;
typedef internal::Collection<InputStreamShard> InputStreamShardSet;
class OutputStreamShard;
typedef internal::Collection<OutputStreamShard> OutputStreamShardSet;
*/
//cc->Inputs().NumEntries() returns the number of input streams
// if (!cc->Inputs().TagMap()->SameAs(*cc->Outputs().TagMap())) {
// return absl::InvalidArgumentError("Input and output streams's TagMap can't be same.");
// }
//set stream
// for (CollectionItemId id = cc->Inputs().BeginId(); id < cc->Inputs().EndId(); ++id) {
// cc->Inputs().Get(id).SetAny();
// cc->Outputs().Get(id).SetSameAs(&cc->Inputs().Get(id));
// }
cc->Inputs().Index(0).SetAny();
cc->Inputs().Index(1).Set<CustomerDataType>();
cc->Outputs().Index(0).SetSameAs(&cc->Inputs().Index(0));
//set stream package
// for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {
// cc->InputSidePackets().Get(id).SetAny();
// }
// cc->InputSidePackets().Index(0).SetAny();
// cc->InputSidePackets().Index(1).Set<CustomerDataType>();//set customer data-type
if (cc->OutputSidePackets().NumEntries() != 0) {
// if (!cc->InputSidePackets().TagMap()->SameAs(*cc->OutputSidePackets().TagMap())) {
// return absl::InvalidArgumentError("Input and output side packets's TagMap can't be same.");
// }
// for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {
// cc->OutputSidePackets().Get(id).SetSameAs(&cc->InputSidePackets().Get(id));
// }
cc->OutputSidePackets().Index(0).SetSameAs(&cc->InputSidePackets().Index(0));
}
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
for (CollectionItemId id = cc->Inputs().BeginId();id < cc->Inputs().EndId(); ++id) {
if (!cc->Inputs().Get(id).Header().IsEmpty()) {
cc->Outputs().Get(id).SetHeader(cc->Inputs().Get(id).Header());
}
}
if (cc->OutputSidePackets().NumEntries() != 0) {
for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {
cc->OutputSidePackets().Get(id).Set(cc->InputSidePackets().Get(id));
}
}
// Sets this packet timestamp offset for Packets going to all outputs.
// If you only want to set the offset for a single output stream then
// use OutputStream::SetOffset() directly.
cc->SetOffset(TimestampDiff(0));
return absl::OkStatus();
}
absl::Status Process(CalculatorContext* cc) final {
if (cc->Inputs().NumEntries() == 0) {
return tool::StatusStop();
}
//get node input data
mediapipe::Packet _data0 = cc->Inputs().Index(0).Value();
mediapipe::Packet _data1 = cc->Inputs().Index(1).Value();
//not safety.
char _tmp_buf[1024];
::memset(_tmp_buf, 0, 1024);
snprintf(_tmp_buf, 1024, _data0.Get<std::string>().c_str(), _data1.Get<CustomerDataType>().val_i, _data1.Get<CustomerDataType>().val_f, _data1.Get<CustomerDataType>().val_b, _data1.Get<CustomerDataType>().s_str.c_str());
std::string _out_data = _tmp_buf;
cc->Outputs().Index(0).AddPacket(MakePacket<std::string>(_out_data).At(cc->InputTimestamp()));
return absl::OkStatus();
}
absl::Status Close(CalculatorContext* cc) final {
return absl::OkStatus();
}
};
REGISTER_CALCULATOR(MyStringProcessCalculator);
}
namespace mediapipe {
absl::Status RunMyGraph() {
// Configures a simple graph, which concatenates 2 PassThroughCalculators.
CalculatorGraphConfig config = ParseTextProtoOrDie<CalculatorGraphConfig>(R"(
input_stream: "in"
input_stream: "customer_in"
output_stream: "out"
node {
calculator: "PassThroughCalculator"
input_stream: "in"
output_stream: "out1"
}
node {
calculator: "MyStringProcessCalculator"
input_stream: "out1"
input_stream: "customer_in"
output_stream: "out2"
}
node {
calculator: "PassThroughCalculator"
input_stream: "out2"
output_stream: "out"
}
)");
LOG(INFO)<<"parse graph cfg-str done ... ...";
CalculatorGraph graph;
MP_RETURN_IF_ERROR(graph.Initialize(config));
LOG(INFO)<<"init graph done ... ...";
ASSIGN_OR_RETURN(OutputStreamPoller poller,
graph.AddOutputStreamPoller("out"));
LOG(INFO)<<"add out-node to output-streampoller done ... ...";
MP_RETURN_IF_ERROR(graph.StartRun({}));
LOG(INFO)<<"start run graph done ... ...";
// Give 10 input packets that contains the same std::string "Hello World!".
for (int i = 0; i < 10; ++i) {
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"in", MakePacket<std::string>("CustomerCalculator: val_i %d, val_f %f, val_b %d, val_str %s").At(Timestamp(i))));
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"customer_in", MakePacket<CustomerDataType>(i, i + 1.f, i%2==0, "s" + std::to_string(i)).At(Timestamp(i))));
}
// Close the input stream "in".
MP_RETURN_IF_ERROR(graph.CloseInputStream("in"));
MP_RETURN_IF_ERROR(graph.CloseInputStream("customer_in"));
mediapipe::Packet packet;
// Get the output packets std::string.
while (poller.Next(&packet)) {
LOG(INFO) << packet.Get<std::string>();
}
LOG(INFO)<<"RunGraph Done";
return graph.WaitUntilDone();
}
} // namespace mediapipe
int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
FLAGS_minloglevel = 0;
FLAGS_stderrthreshold = 0;
FLAGS_alsologtostderr = 1;
google::InitGoogleLogging(argv[0]);
LOG(INFO) << "glog init success ... ...";
absl::Status run_status = mediapipe::RunMyGraph();
if (!run_status.ok())
LOG(ERROR) << "Failed to run the graph: " << run_status.message();
google::ShutdownGoogleLogging();
return 0;
}
下面簡單介紹這段程式碼。
自定義Calculator:MyStringProcessCalculator
這裡自定義了一個Calculator,主要作用就是傳入snprintf的fmt字元串和fmt字元串所需要的數據。所以可以看到有兩個輸入,一個是string,一個是我自定義的data-type。輸出是一個格式化之後的字元串,所以輸出是string。
自定義Calculator主要還是實現4個介面,分別是GetContract,Open,Process,Close。其中GetContract是Graph初始化的時候,檢查Calculator用的。Open介面是在Graph開始後,對Calculator做一些初始化工作,例如設定一些Calculator初始狀態等。Process是實際的Calculator功能。
namespace mediapipe {
class MyStringProcessCalculator : public CalculatorBase {
public:
/*
Calculator authors can specify the expected types of inputs and outputs of a calculator in GetContract().
When a graph is initialized, the framework calls a static method to verify if the packet types of the connected inputs and outputs match the information in this specification.
*/
static absl::Status GetContract(CalculatorContract* cc) {
/*
class InputStreamShard;
typedef internal::Collection<InputStreamShard> InputStreamShardSet;
class OutputStreamShard;
typedef internal::Collection<OutputStreamShard> OutputStreamShardSet;
*/
//cc->Inputs().NumEntries() returns the number of input streams
// if (!cc->Inputs().TagMap()->SameAs(*cc->Outputs().TagMap())) {
// return absl::InvalidArgumentError("Input and output streams's TagMap can't be same.");
// }
//set stream
// for (CollectionItemId id = cc->Inputs().BeginId(); id < cc->Inputs().EndId(); ++id) {
// cc->Inputs().Get(id).SetAny();
// cc->Outputs().Get(id).SetSameAs(&cc->Inputs().Get(id));
// }
cc->Inputs().Index(0).SetAny();
cc->Inputs().Index(1).Set<CustomerDataType>();
cc->Outputs().Index(0).SetSameAs(&cc->Inputs().Index(0));
//set stream package
// for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {
// cc->InputSidePackets().Get(id).SetAny();
// }
// cc->InputSidePackets().Index(0).SetAny();
// cc->InputSidePackets().Index(1).Set<CustomerDataType>();//set customer data-type
if (cc->OutputSidePackets().NumEntries() != 0) {
// if (!cc->InputSidePackets().TagMap()->SameAs(*cc->OutputSidePackets().TagMap())) {
// return absl::InvalidArgumentError("Input and output side packets's TagMap can't be same.");
// }
// for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {
// cc->OutputSidePackets().Get(id).SetSameAs(&cc->InputSidePackets().Get(id));
// }
cc->OutputSidePackets().Index(0).SetSameAs(&cc->InputSidePackets().Index(0));
}
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
for (CollectionItemId id = cc->Inputs().BeginId();id < cc->Inputs().EndId(); ++id) {
if (!cc->Inputs().Get(id).Header().IsEmpty()) {
cc->Outputs().Get(id).SetHeader(cc->Inputs().Get(id).Header());
}
}
if (cc->OutputSidePackets().NumEntries() != 0) {
for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {
cc->OutputSidePackets().Get(id).Set(cc->InputSidePackets().Get(id));
}
}
// Sets this packet timestamp offset for Packets going to all outputs.
// If you only want to set the offset for a single output stream then
// use OutputStream::SetOffset() directly.
cc->SetOffset(TimestampDiff(0));
return absl::OkStatus();
}
//這裡是整個Calculator的核心,就是調用snprintf
absl::Status Process(CalculatorContext* cc) final {
if (cc->Inputs().NumEntries() == 0) {
return tool::StatusStop();
}
//get node input data
mediapipe::Packet _data0 = cc->Inputs().Index(0).Value();
mediapipe::Packet _data1 = cc->Inputs().Index(1).Value();
//not safety.
char _tmp_buf[1024];
::memset(_tmp_buf, 0, 1024);
snprintf(_tmp_buf, 1024, _data0.Get<std::string>().c_str(), _data1.Get<CustomerDataType>().val_i, _data1.Get<CustomerDataType>().val_f, _data1.Get<CustomerDataType>().val_b, _data1.Get<CustomerDataType>().s_str.c_str());
std::string _out_data = _tmp_buf;
cc->Outputs().Index(0).AddPacket(MakePacket<std::string>(_out_data).At(cc->InputTimestamp()));
return absl::OkStatus();
}
absl::Status Close(CalculatorContext* cc) final {
return absl::OkStatus();
}
};
REGISTER_CALCULATOR(MyStringProcessCalculator);
}
然後開始編譯運行得到結果
編譯。
# 注意,這裡的--check_visibility=false 為了關閉bazel關於target之間的可見性檢查,因為我的Calculator自定義放在我自己的目錄的,有一個target對這個目錄不可見,編譯會報錯。
bazel build -c dbg --define MEDIAPIPE_DISABLE_GPU=1 --copt -DMESA_EGL_NO_X11_HEADERS --copt -DEGL_NO_X11 my_target --check_visibility=false --verbose_failures --local_cpu_resources=1
然後運行。得到如下圖的結果。

後記
好了,一個超級簡單的自定義calculator已經實現了,相信你已經明白了吧。本系列也就此終結吧,以後隨緣更新。

PS: 請尊重原創,不喜勿噴。
PS: 要轉載請註明出處,本人版權所有。
PS: 有問題請留言,看到後我會第一時間回復。