java版gRPC實戰之六:客戶端動態獲取服務端地址

  • 2021 年 9 月 18 日
  • 筆記

歡迎訪問我的GitHub

//github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

《java版gRPC實戰》全系列鏈接

  1. 用proto生成程式碼
  2. 服務發布和調用
  3. 服務端流
  4. 客戶端流
  5. 雙向流
  6. 客戶端動態獲取服務端地址
  7. 基於eureka的註冊發現

客戶端為什麼要動態獲取服務端地址

本文是《java版gRPC實戰》系列的第六篇,前面咱們在開發客戶端應用時,所需的服務端地址都是按如下步驟設置的:

  • 在application.yml中配置,如下圖:

在這裡插入圖片描述

  • 在用到gRPC的bean中,使用註解GrpcClient即可將Stub類注入到成員變數中:

在這裡插入圖片描述

  • 上述操作方式的優點是簡單易用好配置,缺點也很明顯:服務端的IP地址或者埠一旦有變化,就必須修改application.yml並重啟客戶端應用;

為什麼不用註冊中心

  • 您一定會想到解決上述問題最簡單的方法就是使用註冊中心,如nacos、eureka等,其實我也是這麼想的,直到有一天,由於工作原因,我要在一個已有的gRPC微服務環境部署自己的應用,這個微服務環境並非java技術棧,而是基於golang的,他們都使用了go-zero框架( 老扎心了),這個go-zero框架沒有提供java語言的SDK,因此,我只能服從go-zero框架的規則,從etcd中取得其他微服務的地址資訊,才能調用其他gRPC服務端,如下圖所示:

在這裡插入圖片描述

  • 如此一來,咱們之前那種在application.yml中配置服務端資訊的方法就用不上了,本篇咱們來開發一個新的gRPC客戶端應用,滿足以下需求:
  1. 創建Stub對象的時候,服務端的資訊不再來自註解GrpcClient,而是來自查詢etcd的結果;
  2. etcd上的服務端資訊有變化的時候,客戶端可以及時更新,而不用重啟應用;

本篇概覽

  • 本篇要開發名為get-service-addr-from-etcd的springboot應用,該應用從etcd取得local-server應用的IP和埠,然後調用local-server的sayHello介面,如下圖:

在這裡插入圖片描述

  1. 開發客戶端應用;
  2. 部署gRPC服務端應用;
  3. 部署etcd;
  4. 模擬go-zero的規則,將服務端應用的IP地址和埠寫入etcd;
  5. 啟動客戶端應用,驗證能否正常調用服務端的服務;
  6. 重啟服務端,重啟的時候修改埠;
  7. 修改etcd中服務端的埠資訊;
  8. 調用介面觸發客戶端重新實例化Stub對象;
  9. 驗證客戶端能否正常調用修改了埠的服務端服務;

源碼下載

名稱 鏈接 備註
項目主頁 //github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) //github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議
  • 這個git項目中有多個文件夾,《java版gRPC實戰》系列的源碼在grpc-tutorials文件夾下,如下圖紅框所示:

在這裡插入圖片描述

  • grpc-tutorials文件夾下有多個目錄,本篇文章對應的客戶端程式碼在get-service-addr-from-etcd目錄下,如下圖:

在這裡插入圖片描述

開發客戶端應用

  • 在父工程的build.gradle文件中新增一行,這是etcd相關的庫,如下圖紅框所示:

在這裡插入圖片描述

  • 在父工程grpc-turtorials下面新建名為get-service-addr-from-etcd的模組,其build.gradle內容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation 'io.etcd:jetcd-core'
    implementation project(':grpc-lib')
}
  • 配置文件application.yml,設置自己的web埠號和應用名,另外grpc.etcdendpoints是etcd集群的地址資訊:
server:
  port: 8084
spring:
  application:
    name: get-service-addr-from-etcd

grpc:
  # etcd的地址,從此處取得gRPC服務端的IP和埠
  etcdendpoints: '//192.168.72.128:2379,//192.168.50.239:2380,//192.168.50.239:2381'
  • 啟動類DynamicServerAddressDemoApplication.java的程式碼就不貼了,普通的springboot啟動類而已;

  • 新增StubWrapper.java文件,這是個spring bean,要重點關注的是simpleBlockingStub方法,當bean在spring註冊的時候simpleBlockingStub方法會被執行,這樣每當bean在spring註冊時,都會從etcd查詢gRPC服務端資訊,然後創建SimpleBlockingStub對象:

package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import static com.google.common.base.Charsets.UTF_8;

/**
 * @author will ([email protected])
 * @version 1.0
 * @description: 包裝了SimpleBlockingStub實例的類,發起gRPC請求時需要用到SimpleBlockingStub實例
 * @date 2021/5/8 19:34
 */
@Component("stubWrapper")
@Data
@Slf4j
@ConfigurationProperties(prefix = "grpc")
public class StubWrapper {

    /**
     * 這是etcd中的一個key,該key對應的值是grpc服務端的地址資訊
     */
    private static final String GRPC_SERVER_INFO_KEY = "/grpc/local-server";

    /**
     * 配置文件中寫好的etcd地址
     */
    private String etcdendpoints;

    private SimpleGrpc.SimpleBlockingStub simpleBlockingStub;

    /**
     * 從etcd查詢gRPC服務端的地址
     * @return
     */
    public String[] getGrpcServerInfo() {
        // 創建client類
        KV kvClient = Client.builder().endpoints(etcdendpoints.split(",")).build().getKVClient();

        GetResponse response = null;

        // 去etcd查詢/grpc/local-server這個key的值
        try {
            response = kvClient.get(ByteSequence.from(GRPC_SERVER_INFO_KEY, UTF_8)).get();
        } catch (Exception exception) {
            log.error("get grpc key from etcd error", exception);
        }

        if (null==response || response.getKvs().isEmpty()) {
            log.error("empty value of key [{}]", GRPC_SERVER_INFO_KEY);
            return null;
        }

        // 從response中取得值
        String rawAddrInfo = response.getKvs().get(0).getValue().toString(UTF_8);

        // rawAddrInfo是「192.169.0.1:8080」這樣的字元串,即一個IP和一個埠,用":"分割,
        // 這裡用":"分割成數組返回
        return null==rawAddrInfo ? null : rawAddrInfo.split(":");
    }

    /**
     * 每次註冊bean都會執行的方法,
     * 該方法從etcd取得gRPC服務端地址,
     * 用於實例化成員變數SimpleBlockingStub
     */
    @PostConstruct
    public void simpleBlockingStub() {
        // 從etcd獲取地址資訊
        String[] array = getGrpcServerInfo();

        log.info("create stub bean, array info from etcd {}", Arrays.toString(array));

        // 數組的第一個元素是gRPC服務端的IP地址,第二個元素是埠
        if (null==array || array.length<2) {
            log.error("can not get valid grpc address from etcd");
            return;
        }

        // 數組的第一個元素是gRPC服務端的IP地址
        String addr = array[0];
        // 數組的第二個元素是埠
        int port = Integer.parseInt(array[1]);

        // 根據剛才獲取的gRPC服務端的地址和埠,創建channel
        Channel channel = ManagedChannelBuilder
                .forAddress(addr, port)
                .usePlaintext()
                .build();

        // 根據channel創建stub
        simpleBlockingStub = SimpleGrpc.newBlockingStub(channel);
    }
}
  • GrpcClientService是封裝了StubWrapper的服務類:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.HelloReply;
import com.bolingcavalry.grpctutorials.lib.HelloRequest;
import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.grpc.StatusRuntimeException;
import lombok.Setter;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class GrpcClientService {

    @Autowired(required = false)
    @Setter
    private StubWrapper stubWrapper;

    public String sendMessage(final String name) {
        // 很有可能simpleStub對象為null
        if (null==stubWrapper) {
            return "invalid SimpleBlockingStub, please check etcd configuration";
        }

        try {
            final HelloReply response = stubWrapper.getSimpleBlockingStub().sayHello(HelloRequest.newBuilder().setName(name).build());
            return response.getMessage();
        } catch (final StatusRuntimeException e) {
            return "FAILED with " + e.getStatus().getCode().name();
        }
    }
}
  • 新增一個controller類GrpcClientController,提供一個http介面,裡面會調用GrpcClientService的方法,最終完成遠程gRPC調用:
package com.bolingcavalry.dynamicrpcaddr;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public String printMessage(@RequestParam(defaultValue = "will") String name) {
        return grpcClientService.sendMessage(name);
    }
}
  • 接下來新增一個controller類RefreshStubInstanceController,對外提供一個http介面refreshstub,作用是刪掉stubWrapper這個bean,再重新註冊一次,這樣每當外部調用refreshstub介面,就可以從etcd取得服務端資訊再重新實例化SimpleBlockingStub成員變數,這樣就達到了客戶端動態獲取服務端地址的效果:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RefreshStubInstanceController implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Autowired
    private GrpcClientService grpcClientService;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @RequestMapping("/refreshstub")
    public String refreshstub() {

        String beanName = "stubWrapper";

        //獲取BeanFactory
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();

        // 刪除已有bean
        defaultListableBeanFactory.removeBeanDefinition(beanName);

        //創建bean資訊.
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(StubWrapper.class);

        //動態註冊bean.
        defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());

        // 更新引用關係(注意,applicationContext.getBean方法很重要,會觸發StubWrapper實例化操作)
        grpcClientService.setStubWrapper(applicationContext.getBean(StubWrapper.class));

        return "Refresh success";
    }
}
  • 編碼完成,開始驗證;

部署gRPC服務端應用

部署gRPC服務端應用很簡單,啟動local-server應用即可:

在這裡插入圖片描述

部署etcd

  • 為了簡化操作,我這裡的etcd集群是用docker部署的,對應的docker-compose.yml文件內容如下:
version: '3'
services:
  etcd1:
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd1'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=//etcd1:2380'
      - '--listen-peer-urls=//0.0.0.0:2380'
      - '--listen-client-urls=//0.0.0.0:2379'
      - '--advertise-client-urls=//etcd1:2379'
      - '--initial-cluster-token=etcd-cluster'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=//etcd1:2380,etcd2=//etcd2:2380,etcd3=//etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - 2379:2379
    volumes:
      - ./store/etcd1/data:/etcd_data
  etcd2:
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd2'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=//etcd2:2380'
      - '--listen-peer-urls=//0.0.0.0:2380'
      - '--listen-client-urls=//0.0.0.0:2379'
      - '--advertise-client-urls=//etcd2:2379'
      - '--initial-cluster-token=etcd-cluster'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=//etcd1:2380,etcd2=//etcd2:2380,etcd3=//etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - 2380:2379
    volumes:
      - ./store/etcd2/data:/etcd_data
  etcd3:
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd3'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=//etcd3:2380'
      - '--listen-peer-urls=//0.0.0.0:2380'
      - '--listen-client-urls=//0.0.0.0:2379'
      - '--advertise-client-urls=//etcd3:2379'
      - '--initial-cluster-token=etcd-cluster'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=//etcd1:2380,etcd2=//etcd2:2380,etcd3=//etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - 2381:2379
    volumes:
      - ./store/etcd3/data:/etcd_data
  • 準備好上述文件後,執行docker-compose up -d即可創建集群;

將服務端應用的IP地址和埠寫入etcd

  • 我這邊local-server所在伺服器IP是192.168.50.5,埠9898,所以執行以下命令將local-server資訊寫入etcd:
docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9898

啟動客戶端應用

  • 打開DynamicServerAddressDemoApplication.java,點擊下圖紅框位置,即可啟動客戶端應用:

在這裡插入圖片描述

  • 注意下圖紅框中的日誌,該日誌證明客戶端應用從etcd獲取服務端資訊成功:

在這裡插入圖片描述

  • 瀏覽器訪問應用get-service-addr-from-etcd的http介面,成功收到響應,證明gRPC調用成功:

在這裡插入圖片描述

  • 去看local-server的控制台,如下圖紅框,證明遠程調用確實執行了:

在這裡插入圖片描述

重啟服務端,重啟的時候修改埠

  • 為了驗證動態獲取服務端資訊是否有效,咱們先把local-server應用的埠改一下,如下圖紅框,改成9899

在這裡插入圖片描述

  • 改完重啟local-server,如下圖紅框,可見gRPC埠已經改為9899:

在這裡插入圖片描述

  • 這時候再訪問get-service-addr-from-etcd的http介面,由於get-service-addr-from-etcd不知道local-server的監聽埠發生了改變,因此還是去訪問9898埠,毫無意外的返回了失敗:

在這裡插入圖片描述

修改etcd中服務端的埠資訊

現在執行以下命令,將etcd中的服務端資訊改為正確的:

docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9899

調用介面觸發客戶端重新實例化Stub對象

  • 聰明的您一定知道接下來要做的事情了:讓StubWrapper的bean重新在spring環境註冊,也就是調用RefreshStubInstanceController提供的http介面refreshstub:

在這裡插入圖片描述

  • 查看get-service-addr-from-etcd應用的控制台,如下圖紅框,StubWrapper已經重新註冊了,並且從etcd取得了最新的服務端資訊:

在這裡插入圖片描述

驗證客戶端能否正常調用修改了埠的服務端服務

  • 再次訪問get-service-addr-from-etcd應用的web介面,如下圖,gRPC調用成功:

在這裡插入圖片描述

  • 至此,在不修改配置不重啟服務的情況下,客戶端也可以適應服務端的變化了,當然了,本文只是提供基本的操作參考,實際上的微服務環境會更複雜,例如refreshstub介面可能被其他服務調用,這樣服務端有了變化可以更加及時地被更新,還有客戶端本身也肯能是gRPC服務提供方,那也要把自己註冊到etcd上去,還有利用etcd的watch功能監控指定的服務端是否一直存活,以及同一個gRPC服務的多個實例如何做負載均衡,等等,這些都要根據您的實際情況來訂製;

  • 本篇內容過多,可見對於這些官方不支援的微服務環境,咱們自己去做註冊發現的適配很費時費力的,如果設計和選型能自己做主,我們更傾向於使用現成的註冊中心,接下來的文章,咱們就一起嘗試使用eureka為gRPC提供註冊發現服務;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 資料庫+中間件系列
  6. DevOps系列

歡迎關注公眾號:程式設計師欣宸

微信搜索「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界…
//github.com/zq2599/blog_demos