Rumah >pembangunan bahagian belakang >Tutorial Python >Tutorial gRPC menggunakan bahasa Python untuk melaksanakan penghantaran mesej

Tutorial gRPC menggunakan bahasa Python untuk melaksanakan penghantaran mesej

PHPz
PHPzke hadapan
2023-04-27 17:13:171607semak imbas

1. Pemasangan pakej sumber terbuka grpc

# conda
$ conda create -n grpc_env python=3.9
 
# install grpc
$ pip install grpc -i https://pypi.doubanio.com/simple
$ pip install grpc-tools -i https://pypi.doubanio.com/simple
 
# 有时proto生成py文件不对就是得换换grpc两个包的版本

2. Penggunaan grpc untuk menghantar mesej

Struktur keseluruhan, client.py server.py dan example.proto dalam direktori proto

Tutorial gRPC menggunakan bahasa Python untuk melaksanakan penghantaran mesej

1) Tentukan badan penghantaran dalam contoh.proto

// 声明
syntax = "proto3";
package proto;
 
// service创建
service HelloService{
  rpc Hello(Request) returns (Response) {}  // 单单传送消息
}
 
// 请求参数消息体 1、2是指参数顺序
message Request {
  string data = 1;
}
 
// 返回参数消息体
message Response {
  int32 ret = 1;    //返回码
  string data = 2;
}
 
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2) Gunakan arahan untuk menjana fail py dalam persekitaran maya

$ conda activate grpc_env
$ f:
$ cd F:examples
$ python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./ example.proto

Dua fail py akan dijana dalam direktori proto, seperti yang ditunjukkan di bawah:

Tutorial gRPC menggunakan bahasa Python untuk melaksanakan penghantaran mesej

3) Edit client.py dan pelayan .py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
 
    data = "hello 123"
    request = example_pb2.Request(data=data)
    res = cli.Hello(request)
    print(f"ret:{res.ret}, data:{res.data}")
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

3. Konfigurasi saiz pemindahan data untuk kegunaan grpc

Secara lalai, gRPC mengehadkan mesej masuk kepada 4 MB. Tiada sekatan pada mesej keluar.

1) Takrif contoh.proto kekal tidak berubah

2) Edit client.py dan server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
 
    data = "hello 123" * 1024 * 1024
    request = example_pb2.Request(data=data)
    res = cli.Hello(request)
    print(f"ret:{res.ret}, data:{res.data}")
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

4 konfigurasi tamat masa untuk penggunaan grpc

1) Takrif contoh.proto kekal tidak berubah

2) Edit client.py dan server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = "hello 123"
        request = example_pb2.Request(data=data)
        res = cli.Hello(request, timeout=1)  # timeout 单位:秒
        print(f"ret:{res.ret}, data:{res.data}")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

Hasil berjalan:

grpc.RpcError Tarikh Akhir Melebihi

5. Strim penghantaran fail besar dalam grpc

1) Takrifkan semula badan penghantaran dalam contoh.proto

// 声明
syntax = "proto3";
package proto;
 
// service创建
service HelloService{
  rpc Hello(Request) returns (Response) {}  // 单单传送消息
  rpc ClientTOServer(stream UpFileRequest) returns (Response) {}  // 流式上传文件
  rpc ServerTOClient(Request) returns (stream UpFileRequest) {}  // 流式下载文件
}
 
// 请求参数消息体 1、2是指参数顺序
message Request {
  string data = 1;
}
 
// 返回参数消息体
message Response {
  int32 ret = 1;    //返回码
  string data = 2;
}
 
message UpFileRequest {
  string filename = 1;
  int64 sendsize = 2;
  int64 totalsize = 3;
  bytes data = 4;
}
 
 
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2) Gunakan arahan untuk menjana dalam fail py persekitaran maya, rujuk 2. 2)

3) Edit client.py dan server.py

# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
    def ClientTOServer(self, request_iterator, context):
        """上传文件"""
        data = bytearray()
        for UpFileRequest in request_iterator:
            file_name = UpFileRequest.filename
            file_size = UpFileRequest.totalsize
            file_data = UpFileRequest.data
            print(f"文件名称:{file_name}, 文件总长度:{file_size}")
            data.extend(file_data)  # 拼接两个bytes
            print(f"已接收长度:{len(data)}")
        if len(data) == file_size:
            with open("242_copy.mp3", "wb") as fw:
                fw.write(data)
            print(f"{file_name=} 下载完成")
            (ret, res) = (0, file_name)
        else:
            print(f"{file_name=} 下载失败")
            (ret, res) = (-1, file_name)
        return example_pb2.Response(ret=ret, data=res)
 
    def ServerTOClient(self, request, context):
        """下载文件"""
        fp = request.data
        print(f"下载文件:{fp=}")
        # 获取文件名和文件大小
        file_name = os.path.basename(fp)
        file_size = os.path.getsize(fp)  # 获取文件大小
        # 发送文件内容
        part_size = 1024 * 1024  # 每次读取1MB数据
        count = 1
 
        with open(fp, "rb") as fr:
            while True:
                try:
                    if count == 1:
                        count += 1
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                    else:
                        context = fr.read(part_size)
                        if context:
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
                                                            sendsize=len(context),
                                                            data=context)
                        else:
                            print(f"发送完毕")
                            return 0
                except Exception as es:
                    print(es)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def send_stream_data(fp: str):
    """迭代器发送大文件"""
    # 获取文件名和文件大小
    file_name = os.path.basename(fp)
    file_size = os.path.getsize(fp)  # 获取文件大小
    # 发送文件内容
    part_size = 1024 * 1024  # 每次读取1MB数据
    count = 1
 
    with open(fp, "rb") as fr:
        while True:
            try:
                if count == 1:
                    count += 1
                    yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                else:
                    context = fr.read(part_size)
                    if context:
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
                                                        data=context)
                    else:
                        print(f"发送完毕")
                        return 0
            except Exception as es:
                print(es)
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = "hello 123"
        request = example_pb2.Request(data=data)
        res = cli.Hello(request, timeout=1)  # timeout 单位:秒
        print(f"ret:{res.ret}, data:{res.data}")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
def client_to_server(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        request = send_stream_data(fp=fp)
        res = cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒
        print(f"ret:{res.ret}, data:{res.data}")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
def server_to_client(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = bytearray()
        request = example_pb2.Request(data=fp)
        filename = ""
        for res in cli.ServerTOClient(request, timeout=300):
            filename = res.filename
            total_size = res.totalsize
            data += res.data
        if total_size == len(data):
            with open("242_1.mp3", "wb") as fw:
                fw.write(data)
            print(f"{filename=} : {total_size=} 下载完成!")
        else:
            print(f"{filename=} 下载失败!")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
if __name__ == '__main__':
    # client("127.0.0.1", 8000)
    # client_to_server("127.0.0.1", 8000, "242.mp3")
    server_to_client("127.0.0.1", 8000, "242.mp3")

6 penghantaran tak segerak async grpc

# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
import asyncio
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
    def ClientTOServer(self, request_iterator, context):
        """上传文件"""
        data = bytearray()
        for UpFileRequest in request_iterator:
            file_name = UpFileRequest.filename
            file_size = UpFileRequest.totalsize
            file_data = UpFileRequest.data
            print(f"文件名称:{file_name}, 文件总长度:{file_size}")
            data.extend(file_data)  # 拼接两个bytes
            print(f"已接收长度:{len(data)}")
        if len(data) == file_size:
            with open("242_copy.mp3", "wb") as fw:
                fw.write(data)
            print(f"{file_name=} 下载完成")
            (ret, res) = (0, file_name)
        else:
            print(f"{file_name=} 下载失败")
            (ret, res) = (-1, file_name)
        return example_pb2.Response(ret=ret, data=res)
 
    def ServerTOClient(self, request, context):
        """下载文件"""
        fp = request.data
        print(f"下载文件:{fp=}")
        # 获取文件名和文件大小
        file_name = os.path.basename(fp)
        file_size = os.path.getsize(fp)  # 获取文件大小
        # 发送文件内容
        part_size = 1024 * 1024  # 每次读取1MB数据
        count = 1
 
        with open(fp, "rb") as fr:
            while True:
                try:
                    if count == 1:
                        count += 1
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                    else:
                        context = fr.read(part_size)
                        if context:
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
                                                            sendsize=len(context),
                                                            data=context)
                        else:
                            print(f"发送完毕")
                            return 0
                except Exception as es:
                    print(es)
 
 
async def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")
    await server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        await server.wait_for_termination()
    except Exception as es:
        print(es)
        await server.stop(None)
 
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([server("127.0.0.1", 8000)]))
    loop.close()
# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
import asyncio
 
 
def send_stream_data(fp: str):
    """迭代器发送大文件"""
    # 获取文件名和文件大小
    file_name = os.path.basename(fp)
    file_size = os.path.getsize(fp)  # 获取文件大小
    # 发送文件内容
    part_size = 1024 * 1024  # 每次读取1MB数据
    count = 1
 
    with open(fp, "rb") as fr:
        while True:
            try:
                if count == 1:
                    count += 1
                    yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                else:
                    context = fr.read(part_size)
                    if context:
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
                                                        data=context)
                    else:
                        print(f"发送完毕")
                        return 0
            except Exception as es:
                print(es)
 
 
async def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            data = "hello 123"
            request = example_pb2.Request(data=data)
            res = await cli.Hello(request, timeout=3)  # timeout 单位:秒
            print(f"ret:{res.ret}, data:{res.data}")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
 
 
async def client_to_server(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            request = send_stream_data(fp=fp)
            res = await cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒
            print(f"ret:{res.ret}, data:{res.data}")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
 
 
def server_to_client(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = bytearray()
        request = example_pb2.Request(data=fp)
        filename = ""
        for res in cli.ServerTOClient(request, timeout=300):
            filename = res.filename
            total_size = res.totalsize
            data += res.data
        if total_size == len(data):
            with open("242_1.mp3", "wb") as fw:
                fw.write(data)
            print(f"{filename=} : {total_size=} 下载完成!")
        else:
            print(f"{filename=} 下载失败!")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
if __name__ == '__main__':
    # asyncio.run(client("127.0.0.1", 8000))
    asyncio.run(client_to_server("127.0.0.1", 8000, "242.mp3"))
    # server_to_client("127.0.0.1", 8000, "242.mp3")

Atas ialah kandungan terperinci Tutorial gRPC menggunakan bahasa Python untuk melaksanakan penghantaran mesej. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam