Heim >Backend-Entwicklung >Python-Tutorial >gRPC-Tutorial für die Nachrichtenübermittlung mithilfe der Python-Sprache

gRPC-Tutorial für die Nachrichtenübermittlung mithilfe der Python-Sprache

PHPz
PHPznach vorne
2023-04-27 17:13:171604Durchsuche

1. Installation des grpc-Open-Source-Pakets

# conda
$ conda create -n grpc_env python=3.9
 
# install grpc
$ pip install grpc -i https://pypi.doubanio.com/simple
$ pip install grpc-tools -i https://pypi.doubanio.com/simple
 
# 有时proto生成py文件不对就是得换换grpc两个包的版本

2. Verwendung von grpc zur Übertragung von Nachrichten

Gesamtstruktur, client.py server.py und example.proto im Proto-Verzeichnis

gRPC-Tutorial für die Nachrichtenübermittlung mithilfe der Python-Sprache

1) Im Beispiel definieren. proto Transmission body

// 声明
syntax = "proto3";
package proto;
 
// service创建
service HelloService{
  rpc Hello(Request) returns (Response) {}  // 单单传送消息
}
 
// 请求参数消息体 1、2是指参数顺序
message Request {
  string data = 1;
}
 
// 返回参数消息体
message Response {
  int32 ret = 1;    //返回码
  string data = 2;
}
 
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2) Verwenden Sie den Befehl, um Py-Dateien in der virtuellen Umgebung zu generieren

$ conda activate grpc_env
$ f:
$ cd F:examples
$ python -m grpc_tools.protoc -I ./ -- python_out=. / --grpc_python_out=./ ./example.proto

Zwei py-Dateien werden im Proto-Verzeichnis generiert, wie in der Abbildung unten gezeigt:

gRPC-Tutorial für die Nachrichtenübermittlung mithilfe der Python-Sprache

3) Bearbeiten Sie client.py und server. py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
 
    data = "hello 123"
    request = example_pb2.Request(data=data)
    res = cli.Hello(request)
    print(f"ret:{res.ret}, data:{res.data}")
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

3 . Konfiguration der Datenübertragungsgröße für die GRPC-Nutzung

Standardmäßig begrenzt gRPC eingehende Nachrichten auf 4 MB. Es gibt keine Einschränkungen für ausgehende Nachrichten.

1) Die Definition von example.proto bleibt unverändert

2) Bearbeiten Sie client.py und server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
 
    data = "hello 123" * 1024 * 1024
    request = example_pb2.Request(data=data)
    res = cli.Hello(request)
    print(f"ret:{res.ret}, data:{res.data}")
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

4. Timeout-Konfiguration für grpc-Nutzung

1) Die Definition von example.proto bleibt unverändert

2) Bearbeiten Sie client.py und server.py.

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = "hello 123"
        request = example_pb2.Request(data=data)
        res = cli.Hello(request, timeout=1)  # timeout 单位:秒
        print(f"ret:{res.ret}, data:{res.data}")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)
Verwenden Sie Befehle, um py-Dateien in der virtuellen Umgebung zu generieren, siehe 2. 2)

3) Bearbeiten Sie client.py und server.py
// 声明
syntax = "proto3";
package proto;
 
// service创建
service HelloService{
  rpc Hello(Request) returns (Response) {}  // 单单传送消息
  rpc ClientTOServer(stream UpFileRequest) returns (Response) {}  // 流式上传文件
  rpc ServerTOClient(Request) returns (stream UpFileRequest) {}  // 流式下载文件
}
 
// 请求参数消息体 1、2是指参数顺序
message Request {
  string data = 1;
}
 
// 返回参数消息体
message Response {
  int32 ret = 1;    //返回码
  string data = 2;
}
 
message UpFileRequest {
  string filename = 1;
  int64 sendsize = 2;
  int64 totalsize = 3;
  bytes data = 4;
}
 
 
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto
# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
    def ClientTOServer(self, request_iterator, context):
        """上传文件"""
        data = bytearray()
        for UpFileRequest in request_iterator:
            file_name = UpFileRequest.filename
            file_size = UpFileRequest.totalsize
            file_data = UpFileRequest.data
            print(f"文件名称:{file_name}, 文件总长度:{file_size}")
            data.extend(file_data)  # 拼接两个bytes
            print(f"已接收长度:{len(data)}")
        if len(data) == file_size:
            with open("242_copy.mp3", "wb") as fw:
                fw.write(data)
            print(f"{file_name=} 下载完成")
            (ret, res) = (0, file_name)
        else:
            print(f"{file_name=} 下载失败")
            (ret, res) = (-1, file_name)
        return example_pb2.Response(ret=ret, data=res)
 
    def ServerTOClient(self, request, context):
        """下载文件"""
        fp = request.data
        print(f"下载文件:{fp=}")
        # 获取文件名和文件大小
        file_name = os.path.basename(fp)
        file_size = os.path.getsize(fp)  # 获取文件大小
        # 发送文件内容
        part_size = 1024 * 1024  # 每次读取1MB数据
        count = 1
 
        with open(fp, "rb") as fr:
            while True:
                try:
                    if count == 1:
                        count += 1
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                    else:
                        context = fr.read(part_size)
                        if context:
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
                                                            sendsize=len(context),
                                                            data=context)
                        else:
                            print(f"发送完毕")
                            return 0
                except Exception as es:
                    print(es)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)

6

Das obige ist der detaillierte Inhalt vongRPC-Tutorial für die Nachrichtenübermittlung mithilfe der Python-Sprache. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen