微服務通訊方式——gRPC

 

微服務設計的原則是單一職責、輕量級通訊、服務粒度適當,而說到服務通訊,我們熟知的有MQ通訊,還有REST、Dubbo和Thrift等,這次我來說說gRPC,

Google開發的一種數據交換格式,說不定哪天就需要上了呢,多學習總是件好事。

準備:

Idea2019.03/Gradle6.0.1/Maven3.6.3/JDK11.0.4/Proto3.0/gRPC1.29.0

難度: 新手–戰士–老兵–大師

目標:

  1. 實現四種模式下的gRPC通訊

說明:

為了遇見各種問題,同時保持時效性,我盡量使用最新的軟體版本。源碼地址,其中的day28://github.com/xiexiaobiao/dubbo-project

1 介紹

先引用一小段官方介紹:

Protocol Buffers – Google’s data interchange format. Protocol Buffers (a.k.a., protobuf) are Google’s language-neutral, platform-neutral, extensible

mechanism for serializing structured data.

Protocol Buffers,即protobuf,類比如JSON,XML等數據格式,實現了對結構化數據序列化時跨語言,跨平台,可擴展的機制。通過預定義.proto文件,

來協商通訊雙方的數據交換格式、介面方法,這即是「Protocol」的原譯。.proto文件能編譯為多種語言對應的程式碼,從而實現跨平台。

grpc使用http2.0作為通訊方式,先說http2.0,它是對1.0的增強,而不是替代,特點:

  • 二進位傳輸:相比1.0版的文本,2.0使用二進位幀傳輸數據;
  • 多路復用:一個TCP連接中包含多個組成的,再解析為不同的請求,從而可同時發送多個請求,避免1.0版的隊頭阻塞;
  • 首部壓縮:1.0版的Header部分資訊很多,2.0使用HPACK壓縮格式減少數據量;
  • 服務端推送:服務端預先主動推送客戶端必要的資訊,從而減少延遲;

適用場景:訂製化介面及數據交換格式,追求高性能,通訊對寬頻敏感

缺點:大多數HTTP Server尚不支援http2.0,Nginx目前只能將其降級為1.0處理;沒有連接池、服務發現和負載均衡的實現;

2 實戰步驟

A 普通模式

2.1 建立gradle類型項目,命名為GPRC-project,我上傳的Git程式碼中還包含了maven類型的項目,按照官方說明製作,運行方法略異,見其中.txt文件說明。

2.2 編寫.proto文件,GPRC-project\src\main\proto\helloworld.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.biao.grpc.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
} 

以上程式碼解析,幾個java相關參數:

  1. java_multiple_files 是否單獨生成在.proto文件中定義的頂級message、enum、和service,否則只生成一個包裝了內部類的外部類; java_package 編譯生成的java類文件的包位置;java_outer_classname 外部類名稱;java_generic_services是否生成各語言版本的基類(已過時);

  2. service Greeter {...} 定義服務和包含的方法;

  3. message 定義消息體結構,這裡定義了一個 String類型,且只有一個字元串類型的value成員,該成員編號為1來代替名字,這也是protobuf體積小的原因之一,別的數據描述語言(json、xml)都是通過成員名字標識,而Portobuf通過唯一編號,只是不便於查閱。

 

2.3 編寫GPRC-project\build.gradle,包含依賴引入和gradle編譯配置:

buildscript {
    repositories {
        mavenCentral()
        maven{ url '//maven.aliyun.com/nexus/content/groups/public/'}
    }
    dependencies {
        // protobuf 編譯插件,會在右側gradle--other中,添加Proto相關的任務(共6個)
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.12'
    }
}

//plugins {
//    id 'java'
//    id 'com.google.protobuf'
//    id 'com.google.protobuf' version '0.8.8'
//}

apply plugin: 'java'
apply plugin: 'com.google.protobuf'


group 'com.biao.grpc'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
    maven{ url '//maven.aliyun.com/nexus/content/groups/public/'}
}

dependencies {

    //這裡必須引入lib目錄的j2ee相關jar,否則即使每次手動加入jar依賴,但啟動應用時gradle會reimport,
    // 導致一直提示因少依賴而無法解析,這也是gradle引入第三方jar的方式
    compile fileTree(dir: "lib", include: "*.jar")

    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation 'io.grpc:grpc-netty-shaded:1.29.0'
    implementation 'io.grpc:grpc-protobuf:1.29.0'
    implementation 'io.grpc:grpc-stub:1.29.0'
}

sourceSets {
    main {
        proto {
            // .proto文件目錄
            srcDir 'src/main/proto'
        }
        java {
            // include self written and generated code, 源程式碼生成到一個單獨的目錄
            srcDirs 'src/main/java','generated-sources/main/java'
        }
    }
    // remove the test configuration - at least in your example you don't have a special test proto file
/*    test {
        proto {
            srcDir 'src/test/proto'
        }
        proto {
            srcDir 'src/test/java'
        }
    }*/
}

protobuf {
    // Configure the protoc executable
    protoc {
        // Download from repositories ,從倉庫下載,
        artifact = "com.google.protobuf:protoc:3.11.0"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.29.0'
        }
    }
    //'src' 改為'generated-sources',則會變更.proto文件對應的java類文件生成目錄
    generateProtoTasks.generatedFilesBaseDir = 'src/main/java'

    generateProtoTasks {
        // all() returns the collection of all protoc tasks
        all()*.plugins {
            grpc {}
        }

        // In addition to all(),you may get the task collection by various
        // criteria:

        // (Java only) returns tasks for a sourceSet
        ofSourceSet('main')
    }
}

以上程式碼解析:

  1. Moduleapply plugin: ‘com.google.protobuf’ 引入protobuf-gradle-plugin,作為protobuf 編譯插件,會在右側gradle–other中,自動添加proto相關的任務(共6個)
  2. compile fileTree(dir: “lib”, include: “*.jar”) 這裡必須引入lib目錄的j2ee相關jar,否則即使每次手動加入jar依賴,但啟動應用時gradle會reimport,導致一直提示因少依賴而無法解析,這也是gradle引入第三方jar的正確方式
  3. implementation ‘io.grpc:grpc-protobuf:1.29.0’,gradle新版語法,implementation 僅僅對當前的Module提供介面,對外隱藏不必要的介面,而compile(新版升級為 api )依賴的庫將會完全參與編譯和打包,
  4. protobuf {…}中則聲明protoc-gen-grpc-java插件來源的.proto文件源目錄及生成目標目錄,

 

2.4 運行右側 task :gradle --> other --> generateProto,則自動生成類文件和介面文件(XXXGrpc),並且很貼心的是,如果原.proto文件有注釋,

生成的文件中會自動帶上原注釋內容。可以看到,helloworld.proto 和 stream.proto 生成的對應的文件,前者為 6 個,後者為 1 個,因java_multiple_files參數不同。

生成文件後,將文件移動到src/main/java對應的包下面,並將build.gradle與自動生成文件的部分注釋掉,否則啟動應用時,又會自動生成,導致IDE提示類重複

 

2.5 看下源碼,包com.biao.grpc.helloworld下面生成了對應於helloworld.proto的類和介面,包括服務、請求消息結構體和響應消息結構體。

com.biao.grpc.helloworld.GreeterGrpcgetSayHelloMethod 方法即約定了RPC的方法、請求/響應數據類型,並獲取方法全名:

@io.grpc.stub.annotations.RpcMethod(
      fullMethodName = SERVICE_NAME + '/' + "SayHello",
      requestType = com.biao.grpc.helloworld.HelloRequest.class,
      responseType = com.biao.grpc.helloworld.HelloReply.class,
      methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
  public static io.grpc.MethodDescriptor<com.biao.grpc.helloworld.HelloRequest,
      com.biao.grpc.helloworld.HelloReply> getSayHelloMethod() {
    io.grpc.MethodDescriptor<com.biao.grpc.helloworld.HelloRequest, com.biao.grpc.helloworld.HelloReply> getSayHelloMethod;
    if ((getSayHelloMethod = GreeterGrpc.getSayHelloMethod) == null) {
      synchronized (GreeterGrpc.class) {
        if ((getSayHelloMethod = GreeterGrpc.getSayHelloMethod) == null) {
          GreeterGrpc.getSayHelloMethod = getSayHelloMethod =
              io.grpc.MethodDescriptor.<com.biao.grpc.helloworld.HelloRequest, com.biao.grpc.helloworld.HelloReply>newBuilder()
              .setType(io.grpc.MethodDescriptor.MethodType.UNARY)
              .setFullMethodName(generateFullMethodName(SERVICE_NAME, "SayHello"))
              .setSampledToLocalTracing(true)
              .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
                  com.biao.grpc.helloworld.HelloRequest.getDefaultInstance()))
              .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
                  com.biao.grpc.helloworld.HelloReply.getDefaultInstance()))
              .setSchemaDescriptor(new GreeterMethodDescriptorSupplier("SayHello"))
              .build();
        }
      }
    }
    return getSayHelloMethod;
  }
 

2.6 建立server端:

public class HelloWorld_Server {
    private static final Logger logger = Logger.getLogger(HelloWorld_Server.class.getName());


    private int port = 50051;
    private Server server;

    private void start() throws IOException{
        server = ServerBuilder.forPort(port)
                .addService(new GreeterImpl())
                .build()
                .start();
        logger.info("Server started, listening on "+ port);

        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run(){

                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                HelloWorld_Server.this.stop();
                System.err.println("*** server shut down");
            }
        });
    }

    private void stop(){
        if (server != null){
            server.shutdown();
        }
    }

    // block 一直到退出程式
    private void blockUntilShutdown() throws InterruptedException {
        if (server != null){
            server.awaitTermination();
        }
    }


    public  static  void main(String[] args) throws IOException, InterruptedException {

        final HelloWorld_Server server = new HelloWorld_Server();
        server.start();
        server.blockUntilShutdown();
    }


    // 實現 定義一個實現服務介面的類
    private class GreeterImpl extends com.biao.grpc.helloworld.GreeterGrpc.GreeterImplBase {

        @Override
        public void sayHello(com.biao.grpc.helloworld.HelloRequest req, 
                             StreamObserver<com.biao.grpc.helloworld.HelloReply> responseObserver){
            com.biao.grpc.helloworld.HelloReply reply = com.biao.grpc.helloworld.HelloReply.
                    newBuilder()
                    .setMessage(("Hello "+req.getName()))
                    .build();
            responseObserver.onNext(reply);
            responseObserver.onCompleted();
            System.out.println("Message from gRPC-Client:" + req.getName());
        }
    }
}

以上程式碼解析:通過GreeterImpl擴展GreeterGrpc.GreeterImplBase具體實現了gRPC服務的方法,作為服務端響應請求的業務邏輯。

 

2.7 建立client端:

public class HelloWorld_Client {
    private final ManagedChannel channel;
    private final com.biao.grpc.helloworld.GreeterGrpc.GreeterBlockingStub blockingStub;
    private static final Logger logger = Logger.getLogger(HelloWorld_Client.class.getName());

    public HelloWorld_Client(String host,int port){
        channel = ManagedChannelBuilder.forAddress(host,port)
                .usePlaintext()
                .build();

        blockingStub = com.biao.grpc.helloworld.GreeterGrpc.newBlockingStub(channel);
    }


    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    public  void greet(String name){
        com.biao.grpc.helloworld.HelloRequest request = com.biao.grpc.helloworld.HelloRequest
                .newBuilder()
                .setName(name)
                .build();
        com.biao.grpc.helloworld.HelloReply response;
        try{
            response = blockingStub.sayHello(request);
        } catch (StatusRuntimeException e)
        {
            logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
            return;
        }
        logger.info("Message from gRPC-Server: "+response.getMessage());
    }

    public static void main(String[] args) throws InterruptedException {
        HelloWorld_Client client = new HelloWorld_Client("127.0.0.1",50051);
        try{
            String user = "world";
            if (args.length > 0){
                user = args[0];
            }
            client.greet(user);
        }finally {
            client.shutdown();
        }
    }
}

以上程式碼解析:在greet方法中,引用HelloRequest和HelloReply,並發起gRPC業務請求。

 

2.8 先運行com.biao.grpc.helloworld.HelloWorld_Server,再運行com.biao.grpc.helloworld.HelloWorld_Client, 輸出以下為成功:

B 流模式

gRPC有四種通訊模式:

  • 普通模式:一次請求對應一次響應,和普通方法請求一樣;
  • 客戶端流模式:客戶端使用流模式傳入多個請求對象,服務端返回一個響應結果;
  • 服務端流模式:一個請求對象,服務端使用流模式傳回多個結果對象;
  • 雙向流模式:客戶端流式和服務端流式組合;

前面有說過,gRPC使用http/2通訊,數據傳輸使用二進位幀,幀是HTTP2.0通訊的最小單位,而消息由一或多個幀組成,流是比消息更大的通訊單位,

是TCP連接中的一個虛擬通道。每個數據流以消息的形式發送,消息中的幀可以亂序發送,然後再根據每個幀首部的流標識符重新組裝為流。

前面的helloworld算第1種,我這裡寫了後 3 種模式,根據stream.proto開發,使用同一個server端:com.biao.grpc.stream.StreamServer,其中實現

了客戶端流模式、服務端流模式和雙向流模式3種通訊模式的具體方法實現,

public class StreamServer {

    private static int port = 8883;
    private static io.grpc.Server server;

    public void run() {
        ServiceImpl serviceImpl = new ServiceImpl();
        server = io.grpc.ServerBuilder.forPort(port).addService(serviceImpl).build();
        try {
            server.start();
            System.out.println("Server start success on port:" + port);
            server.awaitTermination();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 實現 定義一個實現服務介面的類
    private static class ServiceImpl extends StreamServiceGrpc.StreamServiceImplBase {

        @Override
        public void serverSideStreamFun(Stream.RequestData request, StreamObserver<Stream.ResponseData> responseObserver) {
            // TODO Auto-generated method stub
            System.out.println("請求參數:" + request.getText());
            for (int i = 0; i < 10; i++) {
                responseObserver.onNext(Stream.ResponseData.newBuilder()
                        .setText("你好" + i)
                        .build());
            }
            responseObserver.onCompleted();
        }

        @Override
        public StreamObserver<Stream.RequestData> clientSideStreamFun(StreamObserver<Stream.ResponseData> responseObserver) {
            // TODO Auto-generated method stub
            return new StreamObserver<Stream.RequestData>() {
                private Stream.ResponseData.Builder builder = Stream.ResponseData.newBuilder();

                @Override
                public void onNext(Stream.RequestData value) {
                    // TODO Auto-generated method stub
                    System.out.println("請求參數:" + value.getText());

                }

                @Override
                public void onError(Throwable t) {
                    // TODO Auto-generated method stub

                }

                @Override
                public void onCompleted() {
                    // TODO Auto-generated method stub
                    builder.setText("數據接收完成");
                    responseObserver.onNext(builder.build());
                    responseObserver.onCompleted();
                }
            };
        }

        @Override
        public StreamObserver<Stream.RequestData> twoWayStreamFun(StreamObserver<Stream.ResponseData> responseObserver) {
            // TODO Auto-generated method stub
            return new StreamObserver<Stream.RequestData>() {

                @Override
                public void onNext(Stream.RequestData value) {
                    // TODO Auto-generated method stub
                    System.out.println("請求參數:" + value.getText());
                    responseObserver.onNext(Stream.ResponseData.newBuilder()
                            .setText(value.getText() + ",歡迎你的加入")
                            .build());
                }

                @Override
                public void onError(Throwable t) {
                    // TODO Auto-generated method stub
                    t.printStackTrace();
                }

                @Override
                public void onCompleted() {
                    // TODO Auto-generated method stub
                    responseObserver.onCompleted();
                }
            };
        }
    }

    public static void main(String[] args) {
        StreamServer server = new StreamServer();
        server.run();
    }

}
 

另外我寫了3個client端,程式碼分析,略!運行後即可看到效果,我這裡給個雙向流的結果例子:

附:手動編譯

為了加強動手能力,我這裡也做個手動編譯生成java程式碼的步驟:

Java程式碼生成編譯器下載: //repo.maven.apache.org/maven2/io/grpc/protoc-gen-grpc-java/

下載安裝protobuf,//github.com/protocolbuffers/protobuf/releases?after=v3.0.0-alpha-4

To install protobuf, you need to install the protocol compiler (used to compile .proto files) and the protobuf runtime for your chosen programming language.

要使用protobuf,需先安裝協議編譯器(protocol compiler),用於編譯.proto文件,並且作為protobuf的運行時環境。其實安裝protobuf等價於安裝其編譯環境protoc。

 

環境變數設置,將protoc的解壓目錄添加到Path下:

 

CMD下使用protoc --version命令輸出如下即為成功:

手動編譯:進入 .proto 文件所在目錄, 使用protoc.exe生成消息結構體,下圖中標號 2

protoc <待編譯文件> --java_out=<輸出文件保存路徑>

使用protoc-gen-grpc-java生成服務介面:下圖中標號 3

protoc *.proto --plugin=protoc-gen-grpc-java=C:\protobuf-3.0-beta\protoc-gen-grpc-java-1.9.1-windows-x86_64.exe --grpc-java_out=./

 

全文完!


我的其他文章:

       只寫原創,敬請關注