Python使用gRPC协议通信
简单介绍
gRPC
是谷歌开源的通信协议,支持多开发语言,可以实现跨语言调用,函数调用的形式非常直观,需要编写Protobuf
文件,生成对应开发语言的模块文件。Protobuf
数据序列化传输是二进制协议传输,相对json、xml
等格式要更加轻量。是目前微服务最流行使用的协议。
安装
1 | pip install grpcio #安装grpc |
gRPC模式
gRPC
提供了四种服务模式:
一元
RPC
,其中客户端向服务端发送单个请求并获得 单响应返回,就像正常的函数调用一样。1
rpc SayHello(HelloRequest) returns (HelloResponse);
服务端流式
RPC
,其中客户端向服务端发送请求,并获得一个流来读取一系列消息。客户端从返回的流中读取,直到没有更多消息为止。gRPC
保证在单个RPC
调用中进行消息排序。1
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
客户端流式
RPC
,其中客户端写入一系列消息并将其发送到服务端,同样使用提供的流。一旦客户端完成了消息的写入,它将等待服务端读取它们并返回其响应。同样,gRPC
保证单个RPC
调用中的消息顺序。1
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
双向流式
RPC
,其中双方使用读写流发送一系列消息。这两个流是独立运行的,因此客户端和服务端可以按照他们喜欢的顺序读写: 例如,服务端可以在写响应之前等待接收所有客户端消息,或者它可以交替读消息然后写消息,或者其他读写组合。保留了每个流中消息的顺序1
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
Protobuf文件格式
常见关键字 | 解释 |
---|---|
syntax | 指定protobuf版本 |
package | 包名,可以不填 |
import | 导入一些插件,一般go用的比较多 |
message | 定义传输的数据结构 |
service | 定义服务 |
rpc | 定义服务中的方法 |
stream | 定义方法中数据的传输方式为流传输 |
常见数据类型 | 解释 |
---|---|
string | 默认值为空白字符, 字符串必须始终包含UTF-8编码或7位ASCII文本。 |
int32/int64 | 对应长短整型,默认值是0 |
bool | bool类型 |
float | 浮点型 |
repeated | 对应于python列表类型,但不完全一样,数据类型只能为一种,不能动态变换 |
map | 对应于python字典类型,但不完全一样,数据类型只能为一种,不能动态变换 |
bytes | 比特类型,默认值是空白字节,可能包含任何字节序列 |
一元模式
编写Protobuf文件
首先定义Protobuf
文件,通常以.proto
文件名结尾。如下example.proto
1 | syntax = "proto3"; // protobuf版本 |
编写完Protobuf
文件,需要使用grpc-tools
生成对应的python
代码,生成的代码供客户端和服务端调用。
1 | python -m grpc_tools.protoc -I./protos/ --python_out=. --pyi_out=. --grpc_python_out=. example.proto |
服务端
gRPC
常用于异步编程中,这里介绍异步和同步版本的调用分别如何使用。缩进代码预留,这里去除掉了导入Protobuf
文件生成的模块。
异步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18class Greeter(example_pb2_grpc.UserInfoServicer):
async def Info(self, request, context):
return example_pb2.Info(message='Hello, %s!' % request.name)
async def serve(port) -> None:
# port = '50051'
server = grpc.aio.server()
example_pb2_grpc.add_UserInfoServicer_to_server(UserInfo(), server)
listen_addr = '[::]:' + str(port)
server.add_insecure_port(listen_addr)
logging.info("Starting server on %s", listen_addr)
await server.start()
await server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(serve(50051))同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19class Greeter(example_pb2_grpc.UserInfoServicer):
def Info(self, request, context):
return example_pb2.Info(message='Hello, %s!' % request.name)
def serve():
port = '50051'
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
example_pb2_grpc.add_UserInfoServicer_to_server(UserInfo(), server)
server.add_insecure_port('[::]:' + port)
server.start()
print("Server started, listening on " + port)
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig()
serve()
客户端
异步
1
2
3
4
5
6
7
8
9
10
11
12async def run() -> None:
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = example_pb2_grpc.UserInfoStub(channel)
response: example_pb2.response = await stub.Info(example_pb2.request(
age=18,
name="daoji"
))
print("Greeter client received: " + response.message)
if __name__ == '__main__':
logging.basicConfig()
asyncio.run(run())同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14def run():
# NOTE(gRPC Python Team): .close() is possible on a channel and should be
# used in circumstances in which the with statement does not fit the needs
# of the code.
print("Will try to greet world ...")
with grpc.insecure_channel('localhost:50051') as channel:
stub = example_pb2_grpc.UserInfoStub(channel)
response = stub.Info(example_pb2.request(name='you'))
print("Greeter client received: " + response.message)
if __name__ == '__main__':
logging.basicConfig()
run()
服务端流式
Protobuf文件
1 | // .proto |
服务端
1 | class StreamTest(example_pb2_grpc.StreamTestServicer): |
客户端
1 | async def client_stram(stub: example_pb2_grpc.StreamTestStub) -> None: |
客户端流式
双向流式
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 盗计のBlog!
评论