用Python进行gRPC接口测试(二)
- 2020 年 2 月 21 日
- 筆記
各位被困在家中的小伙伴们,大家新年好~今天将继续为大家带来用Python进行gRPC接口测试的续集,上次主要讲了一下前期准备工作和简单RPC通信方式的实现,这次我们将着眼于另一类gRPC接口的通信形式——流式RPC。
上期回顾:用Python进行gRPC接口测试
一、流式RPC的三种具体形式
流式RPC不同于简单RPC只有“单发单收“一种形式,而是可以分为三种不同的形式——“应答流式RPC”,“请求流式RPC”,“双向流式RPC”。对于这三种不同的形式,python有不同的请求及接收方式,下面就让我们来具体了解一下。(对于下面操作有疑问的同学可以去看上一期的内容)
首先接口协议是有区别的,我们来看三种形式的接口定义:
应答流式RPC:
rpc ListFeatures(Rectangle) returns (stream Feature) {}
请求流式RPC:
rpc RecordRoute(stream Point) returns (RouteSummary) {}
双向流式RPC:
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
可以看到,请求和响应参数中流式内容的前面会有一个stream标识,代表这是一个流式的内容。应答流式RPC只有返回是流式的,请求流式RPC只有请求是流式的,而双向流式RPC请求和返回都是流式的。
一个包含接口的完整proto协议文件(route_guide.proto)内容如下:
syntax = "proto3"; option java_multiple_files = true; option java_package = "io.grpc.examples.routeguide"; option java_outer_classname = "RouteGuideProto"; option objc_class_prefix = "RTG"; package routeguide; service RouteGuide { rpc ListFeatures(Rectangle) returns (stream Feature) {} rpc RecordRoute(stream Point) returns (RouteSummary) {} rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} } message Point { int32 latitude = 1; int32 longitude = 2; } message Rectangle { Point lo = 1; Point hi = 2; } message Feature { string name = 1; Point location = 2; } message RouteNote { Point location = 1; string message = 2; } message RouteSummary { int32 point_count = 1; int32 feature_count = 2; int32 distance = 3; int32 elapsed_time = 4; }
根据协议文件生成route_guide_pb2.py、route_guide_pb2_grpc.py两个必要的模块文件,然后就可以根据他们来创建客户端了。

二、客户端实现
1、应答流式RPC
应答流式RPC返回的内容为流式,一次请求,n次返回。我们可以用for循环来接收返回的内容:
def guide_list_features(stub): rectangle = route_guide_pb2.Rectangle( lo=route_guide_pb2.Point(latitude=400000000, longitude=-750000000), hi=route_guide_pb2.Point(latitude=420000000, longitude=-730000000)) print("Looking for features between 40, -75 and 42, -73") features = stub.ListFeatures(rectangle) for feature in features: print("Feature called %s at %s" % (feature.name, feature.location))
2、请求流式RPC
请求流式RPC请求的内容为流式,n次请求,一次返回。我们可以用迭代器来发送若干份请求数据:
def generate_route(feature_list): for _ in range(0, 10): random_feature = feature_list[random.randint(0, len(feature_list) - 1)] print("Visiting point %s" % random_feature.location) yield random_feature.location def guide_record_route(stub): feature_list = route_guide_resources.read_route_guide_database() route_iterator = generate_route(feature_list) route_summary = stub.RecordRoute(route_iterator) print("Finished trip with %s points " % route_summary.point_count) print("Passed %s features " % route_summary.feature_count) print("Travelled %s meters " % route_summary.distance) print("It took %s seconds " % route_summary.elapsed_time)
其中route_iterator为一个迭代器。
3、双向流式RPC
双向流式RPC请求的内容为流式,返回内容也为流式,n次请求,n次返回。我们可以用迭代器来发送若干份请求数据,通过for循环来接收返回结果:
def generate_messages(): messages = [ make_route_note("First message", 0, 0), make_route_note("Second message", 0, 1), make_route_note("Third message", 1, 0), make_route_note("Fourth message", 0, 0), make_route_note("Fifth message", 1, 0), ] for msg in messages: print("Sending %s at %s" % (msg.message, msg.location)) yield msg def guide_route_chat(stub): responses = stub.RouteChat(generate_messages()) for response in responses: print("Received message %s at %s" % (response.message, response.location))

三、实际应用
在录音笔项目中,需要对转写后的文本进行分段语义整理,由于文本内容可能较多,服务端需要采用流式的方式进行接收,并通过流式的方式将结果返给客户端,于是这里采用了双向流式RPC形式的接口。
接口协议如下(仅为演示需要,只展示部分内容):
syntax = "proto3"; package sogou.parrot.inner.semantic.v1; import "google/protobuf/duration.proto"; import "record.proto"; option go_package = "git.speech.sogou/semantic/v1;semantic"; service discourse_understand{ rpc UnderstandFullText(stream UnderstandFullTextRequest) returns(stream UnderstandFullTextResponse); } message UnderstandFullTextRequest{ repeated SubSentence sub_sentences = 1; repeated sogou.parrot.record.v1.NonSpeechSoundInfo sound_infos = 2; repeated sogou.parrot.record.v1.AIMark ai_marks = 3; } message UnderstandFullTextResponse{ UnderstandFullTextResult result = 2; }
实现客户端的关键代码如下:
def gen_iterator(request): for r in [request]: yield r def get_understand_full_textresponse(stub, ai_marks, sound_infos, sub_sentences): request = UnderstandFullTextRequest() request.sub_sentences.extend(sub_sentences) request.sound_infos.extend(sound_infos) request.ai_marks.extend(ai_marks) request_iter = gen_iterator(request) try: resps = stub.UnderstandFullText(request_iter) for resp in resps: resp_str = json.dumps(json.loads(MessageToJson(resp)),indent=4, ensure_ascii=False) print(resp_str) except Exception as e: print (e) def run(): ai_marks, sound_infos, sub_sentences = extract_data() with grpc.insecure_channel(sys.argv[2]) as channel: stub = discourse_understandStub(channel) print("-------------- UnderstandFullText --------------") get_understand_full_textresponse(stub, ai_marks, sound_infos, sub_sentences) if __name__ == '__main__': run()
运行客户端,可以成功返回结果:

进一步,如果需要对接口进行并发下的稳定性测试,依然可以将客户端编译成可执行程序或利用shell脚本,再结合jmeter等自动化测试工具进行测试,以编译可执行程序的方法为例:
首先利用pyinstaller工具将脚本编译为可执行程序,接下来用jmeter编写自动化测试脚本,在线程组下添加OS Process Sampler,传入所需参数(下面的三个参数值为:文本,地址,句子起始编号):

运行脚本,即可自动化进行测试并得到结果,从中可以得到性能、稳定性相关指标:

此外还可以结合jmeter的参数化功能和随机功能设置一些参数值,比如文本文件和句子起始id,从而更加全面地对接口进行测试:

小结
本文介绍了用python实现其他三种形式gRPC接口测试的方法,这样四种形式的gRPC接口我们就都可以比较方便地进行测试了,对于今后需要测试gRPC接口的同学可以提供一些借鉴,当然有更好地方法欢迎大家一起讨论交流。好了,本期就到这里,我们下期再见~