Heim >Backend-Entwicklung >Python-Tutorial >gRPC-Tutorial für die Nachrichtenübermittlung mithilfe der Python-Sprache
# conda $ conda create -n grpc_env python=3.9 # install grpc $ pip install grpc -i https://pypi.doubanio.com/simple $ pip install grpc-tools -i https://pypi.doubanio.com/simple # 有时proto生成py文件不对就是得换换grpc两个包的版本
Gesamtstruktur, client.py server.py und example.proto im Proto-Verzeichnis
1) Im Beispiel definieren. proto Transmission body
// 声明 syntax = "proto3"; package proto; // service创建 service HelloService{ rpc Hello(Request) returns (Response) {} // 单单传送消息 } // 请求参数消息体 1、2是指参数顺序 message Request { string data = 1; } // 返回参数消息体 message Response { int32 ret = 1; //返回码 string data = 2; } //python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto
2) Verwenden Sie den Befehl, um Py-Dateien in der virtuellen Umgebung zu generieren
$ conda activate grpc_env
$ f:
$ cd F:examples
$ python -m grpc_tools.protoc -I ./ -- python_out=. / --grpc_python_out=./ ./example.proto
Zwei py-Dateien werden im Proto-Verzeichnis generiert, wie in der Abbildung unten gezeigt:
3) Bearbeiten Sie client.py und server. py
# server.py import time import grpc from concurrent import futures from proto import example_pb2_grpc, example_pb2 class ServiceBack(example_pb2_grpc.HelloServiceServicer): """接口的具体功能实现""" def Hello(self, request, context): """hello""" data = request.data print(data) ret_data = "Response:" + data return example_pb2.Response(ret=0, data=ret_data) def server(ip: str, port: int) -> None: server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # ⼤⼩为10的线程池 ai_servicer = ServiceBack() example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server) server.add_insecure_port(f"{ip}:{port}") server.start() try: print(f"server is started! ip:{ip} port:{str(port)}") while True: time.sleep(60 * 60) except Exception as es: print(es) server.stop(0) if __name__ == '__main__': server("127.0.0.1", 8000) # client.py import grpc from proto import example_pb2_grpc, example_pb2 def client(ip: str, port: int) -> None: target = str(ip) + ":" + str(port) channel = grpc.insecure_channel(target) # 连接rpc服务器 cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub data = "hello 123" request = example_pb2.Request(data=data) res = cli.Hello(request) print(f"ret:{res.ret}, data:{res.data}") if __name__ == '__main__': client("127.0.0.1", 8000)
Standardmäßig begrenzt gRPC eingehende Nachrichten auf 4 MB. Es gibt keine Einschränkungen für ausgehende Nachrichten.
1) Die Definition von example.proto bleibt unverändert
2) Bearbeiten Sie client.py und server.py
# server.py import time import grpc from concurrent import futures from proto import example_pb2_grpc, example_pb2 class ServiceBack(example_pb2_grpc.HelloServiceServicer): """接口的具体功能实现""" def Hello(self, request, context): """hello""" data = request.data print(data) ret_data = "Response:" + data return example_pb2.Response(ret=0, data=ret_data) def server(ip: str, port: int) -> None: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1G options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ⼤⼩为10的线程池 ai_servicer = ServiceBack() example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server) server.add_insecure_port(f"{ip}:{port}") server.start() try: print(f"server is started! ip:{ip} port:{str(port)}") while True: time.sleep(60 * 60) except Exception as es: print(es) server.stop(0) if __name__ == '__main__': server("127.0.0.1", 8000)
# client.py import grpc from proto import example_pb2_grpc, example_pb2 def client(ip: str, port: int) -> None: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1G options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] target = str(ip) + ":" + str(port) channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器 cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub data = "hello 123" * 1024 * 1024 request = example_pb2.Request(data=data) res = cli.Hello(request) print(f"ret:{res.ret}, data:{res.data}") if __name__ == '__main__': client("127.0.0.1", 8000)
1) Die Definition von example.proto bleibt unverändert
2) Bearbeiten Sie client.py und server.py.
# server.py import time import grpc from concurrent import futures from proto import example_pb2_grpc, example_pb2 class ServiceBack(example_pb2_grpc.HelloServiceServicer): """接口的具体功能实现""" def Hello(self, request, context): """hello""" data = request.data print(data) time.sleep(2) ret_data = "Response:" + data return example_pb2.Response(ret=0, data=ret_data) def server(ip: str, port: int) -> None: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1G options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ⼤⼩为10的线程池 ai_servicer = ServiceBack() example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server) server.add_insecure_port(f"{ip}:{port}") server.start() try: print(f"server is started! ip:{ip} port:{str(port)}") while True: time.sleep(60 * 60) except Exception as es: print(es) server.stop(0) if __name__ == '__main__': server("127.0.0.1", 8000)
# client.py import sys import grpc from proto import example_pb2_grpc, example_pb2 def client(ip: str, port: int) -> None: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1G options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] target = str(ip) + ":" + str(port) channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器 cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub try: data = "hello 123" request = example_pb2.Request(data=data) res = cli.Hello(request, timeout=1) # timeout 单位:秒 print(f"ret:{res.ret}, data:{res.data}") except grpc.RpcError as rpc_error: print("grpc.RpcError", rpc_error.details()) except Exception as es: print(es) finally: sys.exit(-1) if __name__ == '__main__': client("127.0.0.1", 8000)Verwenden Sie Befehle, um py-Dateien in der virtuellen Umgebung zu generieren, siehe 2. 2)3) Bearbeiten Sie client.py und server.py
// 声明 syntax = "proto3"; package proto; // service创建 service HelloService{ rpc Hello(Request) returns (Response) {} // 单单传送消息 rpc ClientTOServer(stream UpFileRequest) returns (Response) {} // 流式上传文件 rpc ServerTOClient(Request) returns (stream UpFileRequest) {} // 流式下载文件 } // 请求参数消息体 1、2是指参数顺序 message Request { string data = 1; } // 返回参数消息体 message Response { int32 ret = 1; //返回码 string data = 2; } message UpFileRequest { string filename = 1; int64 sendsize = 2; int64 totalsize = 3; bytes data = 4; } //python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto# server.py import os import time import grpc from concurrent import futures from proto import example_pb2_grpc, example_pb2 class ServiceBack(example_pb2_grpc.HelloServiceServicer): """接口的具体功能实现""" def Hello(self, request, context): """hello""" data = request.data print(data) time.sleep(2) ret_data = "Response:" + data return example_pb2.Response(ret=0, data=ret_data) def ClientTOServer(self, request_iterator, context): """上传文件""" data = bytearray() for UpFileRequest in request_iterator: file_name = UpFileRequest.filename file_size = UpFileRequest.totalsize file_data = UpFileRequest.data print(f"文件名称:{file_name}, 文件总长度:{file_size}") data.extend(file_data) # 拼接两个bytes print(f"已接收长度:{len(data)}") if len(data) == file_size: with open("242_copy.mp3", "wb") as fw: fw.write(data) print(f"{file_name=} 下载完成") (ret, res) = (0, file_name) else: print(f"{file_name=} 下载失败") (ret, res) = (-1, file_name) return example_pb2.Response(ret=ret, data=res) def ServerTOClient(self, request, context): """下载文件""" fp = request.data print(f"下载文件:{fp=}") # 获取文件名和文件大小 file_name = os.path.basename(fp) file_size = os.path.getsize(fp) # 获取文件大小 # 发送文件内容 part_size = 1024 * 1024 # 每次读取1MB数据 count = 1 with open(fp, "rb") as fr: while True: try: if count == 1: count += 1 yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"") else: context = fr.read(part_size) if context: yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context), data=context) else: print(f"发送完毕") return 0 except Exception as es: print(es) def server(ip: str, port: int) -> None: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1G options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ⼤⼩为10的线程池 ai_servicer = ServiceBack() example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server) server.add_insecure_port(f"{ip}:{port}") server.start() try: print(f"server is started! ip:{ip} port:{str(port)}") while True: time.sleep(60 * 60) except Exception as es: print(es) server.stop(0) if __name__ == '__main__': server("127.0.0.1", 8000)6
Das obige ist der detaillierte Inhalt vongRPC-Tutorial für die Nachrichtenübermittlung mithilfe der Python-Sprache. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!