Maison >développement back-end >Tutoriel Python >Tutoriel gRPC utilisant le langage Python pour implémenter la transmission de messages

Tutoriel gRPC utilisant le langage Python pour implémenter la transmission de messages

PHPz
PHPzavant
2023-04-27 17:13:171607parcourir

1. Installation du package open source grpc

# conda
$ conda create -n grpc_env python=3.9
 
# install grpc
$ pip install grpc -i https://pypi.doubanio.com/simple
$ pip install grpc-tools -i https://pypi.doubanio.com/simple
 
# 有时proto生成py文件不对就是得换换grpc两个包的版本

2. Utilisation de grpc pour transmettre des messages

Structure globale, client.py server.py et example.proto dans le répertoire proto

Tutoriel gRPC utilisant le langage Python pour implémenter la transmission de messages

1) Définir dans l'exemple. proto Transmission body

// 声明
syntax = "proto3";
package proto;
 
// service创建
service HelloService{
  rpc Hello(Request) returns (Response) {}  // 单单传送消息
}
 
// 请求参数消息体 1、2是指参数顺序
message Request {
  string data = 1;
}
 
// 返回参数消息体
message Response {
  int32 ret = 1;    //返回码
  string data = 2;
}
 
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2) Utilisez la commande pour générer des fichiers py dans l'environnement virtuel

$ conda activate grpc_env
$ f:
$ cd F:examples
$ python -m grpc_tools.protoc -I ./ -- python_out=. / --grpc_python_out=./ ./example.proto

Deux fichiers py seront générés dans le répertoire proto, comme indiqué ci-dessous :

Tutoriel gRPC utilisant le langage Python pour implémenter la transmission de messages

3) Modifiez client.py et server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
 
    data = "hello 123"
    request = example_pb2.Request(data=data)
    res = cli.Hello(request)
    print(f"ret:{res.ret}, data:{res.data}")
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

3 . Configuration de la taille du transfert de données pour une utilisation grpc

Par défaut, gRPC limite les messages entrants à 4 Mo. Il n'y a aucune restriction sur les messages sortants.

1) La définition de example.proto reste inchangée

2) Modifiez client.py et server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
 
    data = "hello 123" * 1024 * 1024
    request = example_pb2.Request(data=data)
    res = cli.Hello(request)
    print(f"ret:{res.ret}, data:{res.data}")
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

4 Configuration du délai d'attente pour l'utilisation de grpc

1) La définition de example.proto reste inchangée

2) Modifier le client.py et server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = "hello 123"
        request = example_pb2.Request(data=data)
        res = cli.Hello(request, timeout=1)  # timeout 单位:秒
        print(f"ret:{res.ret}, data:{res.data}")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

Résultats de l'opération :

grpc.RpcError Deadline Exceeded

5. Transmission de flux de fichiers volumineux grpc

1) Redéfinir le corps de transmission dans example.proto

// 声明
syntax = "proto3";
package proto;
 
// service创建
service HelloService{
  rpc Hello(Request) returns (Response) {}  // 单单传送消息
  rpc ClientTOServer(stream UpFileRequest) returns (Response) {}  // 流式上传文件
  rpc ServerTOClient(Request) returns (stream UpFileRequest) {}  // 流式下载文件
}
 
// 请求参数消息体 1、2是指参数顺序
message Request {
  string data = 1;
}
 
// 返回参数消息体
message Response {
  int32 ret = 1;    //返回码
  string data = 2;
}
 
message UpFileRequest {
  string filename = 1;
  int64 sendsize = 2;
  int64 totalsize = 3;
  bytes data = 4;
}
 
 
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2) Dans. Utilisez des commandes pour générer des fichiers py dans l'environnement virtuel, reportez-vous à 2. 2)

3) Modifiez client.py et server.py

# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
    def ClientTOServer(self, request_iterator, context):
        """上传文件"""
        data = bytearray()
        for UpFileRequest in request_iterator:
            file_name = UpFileRequest.filename
            file_size = UpFileRequest.totalsize
            file_data = UpFileRequest.data
            print(f"文件名称:{file_name}, 文件总长度:{file_size}")
            data.extend(file_data)  # 拼接两个bytes
            print(f"已接收长度:{len(data)}")
        if len(data) == file_size:
            with open("242_copy.mp3", "wb") as fw:
                fw.write(data)
            print(f"{file_name=} 下载完成")
            (ret, res) = (0, file_name)
        else:
            print(f"{file_name=} 下载失败")
            (ret, res) = (-1, file_name)
        return example_pb2.Response(ret=ret, data=res)
 
    def ServerTOClient(self, request, context):
        """下载文件"""
        fp = request.data
        print(f"下载文件:{fp=}")
        # 获取文件名和文件大小
        file_name = os.path.basename(fp)
        file_size = os.path.getsize(fp)  # 获取文件大小
        # 发送文件内容
        part_size = 1024 * 1024  # 每次读取1MB数据
        count = 1
 
        with open(fp, "rb") as fr:
            while True:
                try:
                    if count == 1:
                        count += 1
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                    else:
                        context = fr.read(part_size)
                        if context:
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
                                                            sendsize=len(context),
                                                            data=context)
                        else:
                            print(f"发送完毕")
                            return 0
                except Exception as es:
                    print(es)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def send_stream_data(fp: str):
    """迭代器发送大文件"""
    # 获取文件名和文件大小
    file_name = os.path.basename(fp)
    file_size = os.path.getsize(fp)  # 获取文件大小
    # 发送文件内容
    part_size = 1024 * 1024  # 每次读取1MB数据
    count = 1
 
    with open(fp, "rb") as fr:
        while True:
            try:
                if count == 1:
                    count += 1
                    yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                else:
                    context = fr.read(part_size)
                    if context:
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
                                                        data=context)
                    else:
                        print(f"发送完毕")
                        return 0
            except Exception as es:
                print(es)
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = "hello 123"
        request = example_pb2.Request(data=data)
        res = cli.Hello(request, timeout=1)  # timeout 单位:秒
        print(f"ret:{res.ret}, data:{res.data}")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
def client_to_server(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        request = send_stream_data(fp=fp)
        res = cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒
        print(f"ret:{res.ret}, data:{res.data}")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
def server_to_client(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = bytearray()
        request = example_pb2.Request(data=fp)
        filename = ""
        for res in cli.ServerTOClient(request, timeout=300):
            filename = res.filename
            total_size = res.totalsize
            data += res.data
        if total_size == len(data):
            with open("242_1.mp3", "wb") as fw:
                fw.write(data)
            print(f"{filename=} : {total_size=} 下载完成!")
        else:
            print(f"{filename=} 下载失败!")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
if __name__ == '__main__':
    # client("127.0.0.1", 8000)
    # client_to_server("127.0.0.1", 8000, "242.mp3")
    server_to_client("127.0.0.1", 8000, "242.mp3")

6 la transmission asynchrone asynchrone du flux de fichiers volumineux de grpc

# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
import asyncio
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
    def ClientTOServer(self, request_iterator, context):
        """上传文件"""
        data = bytearray()
        for UpFileRequest in request_iterator:
            file_name = UpFileRequest.filename
            file_size = UpFileRequest.totalsize
            file_data = UpFileRequest.data
            print(f"文件名称:{file_name}, 文件总长度:{file_size}")
            data.extend(file_data)  # 拼接两个bytes
            print(f"已接收长度:{len(data)}")
        if len(data) == file_size:
            with open("242_copy.mp3", "wb") as fw:
                fw.write(data)
            print(f"{file_name=} 下载完成")
            (ret, res) = (0, file_name)
        else:
            print(f"{file_name=} 下载失败")
            (ret, res) = (-1, file_name)
        return example_pb2.Response(ret=ret, data=res)
 
    def ServerTOClient(self, request, context):
        """下载文件"""
        fp = request.data
        print(f"下载文件:{fp=}")
        # 获取文件名和文件大小
        file_name = os.path.basename(fp)
        file_size = os.path.getsize(fp)  # 获取文件大小
        # 发送文件内容
        part_size = 1024 * 1024  # 每次读取1MB数据
        count = 1
 
        with open(fp, "rb") as fr:
            while True:
                try:
                    if count == 1:
                        count += 1
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                    else:
                        context = fr.read(part_size)
                        if context:
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
                                                            sendsize=len(context),
                                                            data=context)
                        else:
                            print(f"发送完毕")
                            return 0
                except Exception as es:
                    print(es)
 
 
async def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")
    await server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        await server.wait_for_termination()
    except Exception as es:
        print(es)
        await server.stop(None)
 
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([server("127.0.0.1", 8000)]))
    loop.close()
# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
import asyncio
 
 
def send_stream_data(fp: str):
    """迭代器发送大文件"""
    # 获取文件名和文件大小
    file_name = os.path.basename(fp)
    file_size = os.path.getsize(fp)  # 获取文件大小
    # 发送文件内容
    part_size = 1024 * 1024  # 每次读取1MB数据
    count = 1
 
    with open(fp, "rb") as fr:
        while True:
            try:
                if count == 1:
                    count += 1
                    yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                else:
                    context = fr.read(part_size)
                    if context:
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
                                                        data=context)
                    else:
                        print(f"发送完毕")
                        return 0
            except Exception as es:
                print(es)
 
 
async def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            data = "hello 123"
            request = example_pb2.Request(data=data)
            res = await cli.Hello(request, timeout=3)  # timeout 单位:秒
            print(f"ret:{res.ret}, data:{res.data}")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
 
 
async def client_to_server(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            request = send_stream_data(fp=fp)
            res = await cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒
            print(f"ret:{res.ret}, data:{res.data}")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
 
 
def server_to_client(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = bytearray()
        request = example_pb2.Request(data=fp)
        filename = ""
        for res in cli.ServerTOClient(request, timeout=300):
            filename = res.filename
            total_size = res.totalsize
            data += res.data
        if total_size == len(data):
            with open("242_1.mp3", "wb") as fw:
                fw.write(data)
            print(f"{filename=} : {total_size=} 下载完成!")
        else:
            print(f"{filename=} 下载失败!")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
if __name__ == '__main__':
    # asyncio.run(client("127.0.0.1", 8000))
    asyncio.run(client_to_server("127.0.0.1", 8000, "242.mp3"))
    # server_to_client("127.0.0.1", 8000, "242.mp3")
.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer