关键词搜索

源码搜索 ×
×

Python Module — grpcio gRPC 远程调用示例程序

发布2022-02-18浏览1610次

详情内容

目录

Python gRPC demo

实现 2 个微服务,调用链关系如下图所示:

  1. Marketplace(图书集市):一个简单的 Web 程序,向用户展示所有图书列表。
  2. Recommendations(图书推荐):一个微服务程序,向用户展示推荐的图书列表。

项目结构:

py3_grpc_demo
├── docker-compose.yml
├── marketplace
│   ├── Dockerfile
│   ├── marketplace.py
│   ├── recommendations_pb2_grpc.py
│   ├── recommendations_pb2.py
│   ├── requirements.txt
│   └── templates
│       └── homepage.html
├── protobufs
│   └── recommendations.proto
└── recommendations
    ├── Dockerfile
    ├── recommendations_pb2_grpc.py
    ├── recommendations_pb2.py
    ├── recommendations.py
    └── requirements.txt

    在这里插入图片描述

    1、使用 Protocol Buffers 来定义 gRPC API

    • py3_grpc_demo/protobufs/recommendations.proto
    syntax = "proto3";
    
    enum BookCategory {
        MYSTERY = 0;
        SCIENCE_FICTION = 1;
        SELF_HELP = 2;
    }
    
    message BookRecommendation {
        int32 id = 1;
        string title = 2;
    }
    
    message RecommendationRequest {
        int32 user_id = 1;
        BookCategory category = 2;
        int32 max_results = 3;
    }
    
    message RecommendationResponse {
        repeated BookRecommendation recommendations = 1;
    }
    
    service Recommendations {
        rpc Recommend (RecommendationRequest) returns (RecommendationResponse);
    }
    
      19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • gRPC API Service:Recommendations
    • gRPC API Request Message:RecommendationRequest
    • gRPC API Response Message:RecommendationResponse

    2、实现 Recommendations 微服务

    安装依赖

    • py3_grpc_demo/recommendations/requirements.txt
    grpc-interceptor ~= 0.12.0
    grpcio-tools ~= 1.30
    pytest ~= 5.4
    
    • 1
    • 2
    • 3
    • 安装
    $ cd py3_grpc_demo/recommendations
    $ pip3 install -r requirements.txt
    
    • 1
    • 2

    基于 .proto 文件生成 gRPC 代码

    $ cd py3_grpc_demo/recommendations
    
    $ python3 -m grpc_tools.protoc -I ../protobufs --python_out=. --grpc_python_out=. ../protobufs/recommendations.proto
    
    $ ll
    ...
    -rw-r--r--. 1 root root 2531 128 02:34 recommendations_pb2_grpc.py
    -rw-r--r--. 1 root root 8345 128 02:34 recommendations_pb2.py
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    自动生成的 2 个文件,包含了与 gRPC API 通讯所需要的 Python 类型和函数。注意这 2 个文件都是不可被编辑:

    1. gRPC 消息传输数据类型定义文件:recommendations_pb2_grpc.py
    2. gRPC C/S 基本通讯框架定义文件:recommendations_pb2.py

    指令行模拟 gRPC Client

    recommendations_pb2.py 文件的可读性很差,通常我们不需要关心这个文件的内容,根据 .proto 文件就能理解 gRPC Client 端所需要的操作内容。

    通过从 recommendations_pb2 Module 导入 “请求消息传输数据类型:RecommendationRequest”,就可以构建 Client 需要的 gRPC Request 了。

    >>> from recommendations_pb2 import BookCategory, RecommendationRequest
    >>> request = RecommendationRequest(
    ...     user_id=1, category=BookCategory.SCIENCE_FICTION, max_results=3
    ... )
    >>>
    >>> request.category
    1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    值得注意的是,虽然基于 .proto 文件生成的代码里面包含了 API 输入数据的校验,不会接受错误的输入数据类型。使用 Protocol buffers,使得 gRPC 就像是调用本地函数一样简明,但底层实际上是一个网络调用。

    但是 proto3 版本中,.proto message 中定义的所有字段是可选的,如果其中某些字段未提供实参,则会填充默认值,例如:数值类型则默认为 0,字符类型默认为 None。如下所示:

    >>> request = RecommendationRequest(
    ...     user_id=1, category=BookCategory.SCIENCE_FICTION)
    >>>
    >>> request.max_results
    0
    
    • 1
    • 2
    • 3
    • 4
    • 5

    也就是说,开发者依旧需要自己来校验输入函数的实参列表是否正确的设置了所有数值。

    当 Client 基于 recommendations_pb2 Module 构建好了 gRPC Request Msg 之后,就可以再基于 recommendations_pb2_grpc Module 来发送这个 Request 了。

    gRPC 会为 .proto 文件中定义的每个 service 定义一个 Stub(存根),例如:RecommendationsStub。这些 Stub 类实现了 Recommend,用于 Send gRPC Request 和 Receive gRPC Response,并且 Request 和 Response 的数据结构在 .proto 文件的 service 定义中也明确了。

    >>> import grpc
    >>> from recommendations_pb2_grpc import RecommendationsStub
    >>> from recommendations_pb2 import BookCategory, RecommendationRequest
    
    >>> channel = grpc.insecure_channel("localhost:50051")
    >>> client = RecommendationsStub(channel)
    >>> request = RecommendationRequest(
    ...     user_id=1, category=BookCategory.SCIENCE_FICTION, max_results=3
    ... )
    >>> client.Recommend(request)
    Traceback (most recent call last):
    ...
    grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    	status = StatusCode.UNAVAILABLE
    	details = "failed to connect to all addresses"
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    上述例子可知,RecommendationsStub 接收一个 gRPC Channel 实例作为实参,并且这个 Channel 会帮的一个 Endpoint(IP:Port)。上述错误的意思是,没有找到一个运行在 localhost:50051(50051 是 gRPC 的标准端口)之上的 gRPC Server。

    实现 gRPC Server

    Server 端的实现,首先需要一个 gRPC Server 对象,由 gRPC 库来提供。然后还需要将任意个 gRPC Services 对象注册到 gRPC Server 中。示例中的 RecommendationsServicer 根据 .proto 文件中的 service Recommendations 自动生成,在 recommendations_pb2_grpc 模块中被定义,代表一个具体的 API Service。

    • py3_grpc_demo/recommendations/recommendations.py
    from concurrent import futures
    import random
    
    import grpc
    # grpc_interceptor 是 Python 实现的 gRPC 拦截器库。
    from grpc_interceptor import ServerInterceptor
    from grpc_interceptor import ExceptionToStatusInterceptor
    
    from recommendations_pb2 import (
        BookCategory,
        BookRecommendation,
        RecommendationResponse,
    )
    import recommendations_pb2_grpc
    
    # 构造测试数据,在实际的微服务项目中,这些数据应该被存放在数据库中。
    books_by_category = {
        BookCategory.MYSTERY: [
            BookRecommendation(id=1, title="The Maltese Falcon"),
            BookRecommendation(id=2, title="Murder on the Orient Express"),
            BookRecommendation(id=3, title="The Hound of the Baskervilles"),
        ],
        BookCategory.SCIENCE_FICTION: [
            BookRecommendation(
                id=4, title="The Hitchhiker's Guide to the Galaxy"
            ),
            BookRecommendation(id=5, title="Ender's Game"),
            BookRecommendation(id=6, title="The Dune Chronicles"),
        ],
        BookCategory.SELF_HELP: [
            BookRecommendation(
                id=7, title="The 7 Habits of Highly Effective People"
            ),
            BookRecommendation(
                id=8, title="How to Win Friends and Influence People"
            ),
            BookRecommendation(id=9, title="Man's Search for Meaning"),
        ],
    }
    
    
    # 定义微服务接口访问拦截器,当存在之前未捕获的异常时,会调用 log_error 方法。
    class ErrorLogger(ServerInterceptor):
    
        def intercept(self, method, request, context, method_name):
            try:
                return method(request, context)
            except Exception as e:
                self.log_error(e)
                raise
    
        def log_error(self, err):
            # do something...
            pass
    
    
    # 微服务实现
    class RecommendationService(recommendations_pb2_grpc.RecommendationsServicer):
    
        # 重载父类的 Recommend 方法。
        def Recommend(self, request, context):
            if request.category not in books_by_category:
                context.abort(grpc.StatusCode.NOT_FOUND, "Category not found")
    
            books_for_category = books_by_category[request.category]
            num_results = min(request.max_results, len(books_for_category))
            books_to_recommend = random.sample(
                books_for_category, num_results
            )
    
            return RecommendationResponse(recommendations=books_to_recommend)
    
    
    def serve():
        # 将拦截器添加到 Server,使得所有微服务的请求和返回都会经过拦截器,可以以此来统计有多少请求是错误的。
        # 通过调用 context.abort(),拦截器将为你捕获抛出的异常,无须自行实现。
        interceptors = [ExceptionToStatusInterceptor(), ErrorLogger()]
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=2),
                             interceptors=interceptors)
        recommendations_pb2_grpc.add_RecommendationsServicer_to_server(
            RecommendationService(), server
        )
        server.add_insecure_port("[::]:50051")
        server.start()
        server.wait_for_termination()
    
    
    if __name__ == "__main__":
        serve()
    
      19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    基于拦截器(Interceptors)的微服务可观测性

    使用 gRPC 来构建微服务程序时,就会要求具备可观测性。你需要监控的项目包括:

    1. 每个微服务收到的请求。
    2. 错误请求,以及错误内容。
    3. 请求延时。
    4. 异常堆栈。
    5. 等。

    在上述代码中,我们使用了 grpc-interceptor 来实现拦截器。而在某些场景下,我们可以使用 Service Mesh 来实现拦截器的功能,所有微服务的请求和返回都会经过 Service Mesh 的代理,从代理层面可以自动处理日志或者记录错误次数。为了获取更精确地错误输出,你的微服务应该要设置准确的状态码。

    运行测试

    • 启动 gRPC Server:
    $ cd py3_grpc_demo/recommendations
    $ python recommendations.py
    
    • 1
    • 2
    • Client 发出请求:
    >>> import grpc
    >>> from recommendations_pb2 import BookCategory, RecommendationRequest
    >>> from recommendations_pb2_grpc import RecommendationsStub
    
    >>> channel = grpc.insecure_channel("localhost:50051")
    >>> client = RecommendationsStub(channel)
    
    >>> request = RecommendationRequest(
    ...    user_id=1, category=BookCategory.SCIENCE_FICTION, max_results=3)
    >>> client.Recommend(request)
    recommendations {
      id: 6
      title: "The Dune Chronicles"
    }
    recommendations {
      id: 5
      title: "Ender\'s Game"
    }
    recommendations {
      id: 4
      title: "The Hitchhiker\'s Guide to the Galaxy"
    }
    
      19
    • 20
    • 21
    • 22

    3、实现 Marketplace Web 程序

    安装依赖

    • py3_grpc_demo/marketplace/requirements.txt
    flask ~= 1.1
    grpcio-tools ~= 1.30
    Jinja2 ~= 2.11
    pytest ~= 5.4
    
    • 1
    • 2
    • 3
    • 4
    • 安装
    $ pip3 install -r requirements.txt
    
    • 1

    基于 .proto 文件生成 gRPC 代码

    cd py3_grpc_demo/marketplace
    python -m grpc_tools.protoc -I ../protobufs --python_out=. --grpc_python_out=. ../protobufs/recommendations.proto
    
    • 1
    • 2

    实现 Marketplace Web Server 和 gRPC Client

    $ vi py3_grpc_demo/marketplace/marketplace.py
    
    import os
    
    from flask import Flask, render_template
    import grpc
    
    from recommendations_pb2 import BookCategory, RecommendationRequest
    from recommendations_pb2_grpc import RecommendationsStub
    
    
    app = Flask(__name__)
    
    # 通过环境变量来获取 Server IP 地址。
    recommendations_host = os.getenv("RECOMMENDATIONS_HOST", "localhost")
    recommendations_channel = grpc.insecure_channel(
        f"{recommendations_host}:50051"
    )
    recommendations_client = RecommendationsStub(recommendations_channel)
    
    
    @app.route("/")
    def render_homepage():
        recommendations_request = RecommendationRequest(
            user_id=1, category=BookCategory.MYSTERY, max_results=3
        )
        recommendations_response = \
            recommendations_client.Recommend(recommendations_request)
        return render_template(
            "homepage.html",
            recommendations=recommendations_response.recommendations,
        )
    
      19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    vi py3_grpc_demo/marketplace/templates/homepage.html
    
    <!-- homepage.html -->
    <!doctype html>
    <html lang="en">
    <head>
        <title>Online Books For You</title>
    </head>
    <body>
        <h1>Mystery books you may like</h1>
        <ul>
        {% for book in recommendations %}
            <li>{{ book.title }}</li>
        {% endfor %}
        </ul>
    </body>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    运行测试

    • 启动:Marketplace REST API Server
    FLASK_APP=marketplace.py flask run -h 0.0.0.0
    
    • 1
    • 访问:http://{server_ip}:5000
      在这里插入图片描述

    4、容器化部署

    Dockerfile

    • py3_grpc_demo/recommendations/Dockerfile
    FROM python
    
    RUN mkdir /service
    COPY protobufs/ /service/protobufs/
    COPY recommendations/ /service/recommendations/
    WORKDIR /service/recommendations
    RUN python -m pip install --upgrade pip
    RUN python -m pip install -r requirements.txt
    RUN python -m grpc_tools.protoc -I ../protobufs --python_out=. \
               --grpc_python_out=. ../protobufs/recommendations.proto
    
    EXPOSE 50051
    ENTRYPOINT [ "python", "recommendations.py" ]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • py3_grpc_demo/marketplace/Dockerfile
    FROM python
    
    RUN mkdir /service
    COPY protobufs/ /service/protobufs/
    COPY marketplace/ /service/marketplace/
    WORKDIR /service/marketplace
    RUN python -m pip install --upgrade pip
    RUN python -m pip install -r requirements.txt
    RUN python -m grpc_tools.protoc -I ../protobufs --python_out=. \
               --grpc_python_out=. ../protobufs/recommendations.proto
    
    EXPOSE 5000
    ENV FLASK_APP=marketplace.py
    ENTRYPOINT [ "flask", "run", "--host=0.0.0.0"]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • py3_grpc_demo/docker-compose.yml
    version: "3.3"
    
    services:
    
        marketplace:
            build:
                context: .
                dockerfile: marketplace/Dockerfile
            environment:
                RECOMMENDATIONS_HOST: recommendations
            image: marketplace
            networks:
                - microservices
            ports:
                - 5000:5000
    
        recommendations:
            build:
                context: .
                dockerfile: recommendations/Dockerfile
            image: recommendations
            networks:
                - microservices
    
    networks:
        microservices:
    
      19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    运行测试

    docker-compose up
    
    • 1

    参考文档

    https://sunqi.site/posts/python-microservices-grpc-6/

    相关技术文章

    点击QQ咨询
    开通会员
    返回顶部
    ×
    微信扫码支付
    微信扫码支付
    确定支付下载
    请使用微信描二维码支付
    ×

    提示信息

    ×

    选择支付方式

    • 微信支付
    • 支付宝付款
    确定支付下载