【Python】Python Flask 和 gRPC 简单项目

Python Flask 和 gRPC 示例项目

本文将介绍如何在 Python 中使用 Flask 和 gRPC 创建一个简单的示例应用程序,并使用 requests 库进行测试。

环境设置

首先,确保您已经安装了 Python。然后,创建一个虚拟环境以管理您的依赖项。

python -m venv myenv
source myenv/bin/activate  # Windows 使用 `myenv\Scripts\activate`

安装必要的包:

pip install Flask grpcio grpcio-tools requests

定义 gRPC 服务

创建一个 .proto 文件来定义 gRPC 服务。保存文件名为 service.proto

syntax = "proto3";

package demo;

service DemoService {
  rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

从 Proto 文件生成 Python 代码

使用 grpc_tools 生成 Python 代码:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. service.proto

这将生成两个文件:service_pb2.pyservice_pb2_grpc.py

实现 gRPC 服务器

创建一个名为 grpc_server.py 的文件:

from concurrent import futures
import grpc
import service_pb2
import service_pb2_grpc

class DemoService(service_pb2_grpc.DemoServiceServicer):
    def SayHello(self, request, context):
        return service_pb2.HelloResponse(message=f'Hello, {request.name}!')

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    service_pb2_grpc.add_DemoServiceServicer_to_server(DemoService(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print("gRPC server started on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

实现 Flask 服务器

创建一个名为 flask_server.py 的文件:

from flask import Flask, request, jsonify
import grpc
import service_pb2
import service_pb2_grpc

app = Flask(__name__)

def get_grpc_client():
    channel = grpc.insecure_channel('localhost:50051')
    stub = service_pb2_grpc.DemoServiceStub(channel)
    return stub

@app.route('/hello', methods=['POST'])
def say_hello():
    data = request.json
    name = data.get('name')
    stub = get_grpc_client()
    response = stub.SayHello(service_pb2.HelloRequest(name=name))
    return jsonify({'message': response.message})

if __name__ == '__main__':
    app.run(port=5000, debug=True)

运行服务器

打开两个终端窗口或标签。

  1. 在第一个终端中启动 gRPC 服务器:

    python grpc_server.py
    
  2. 在第二个终端中启动 Flask 服务器:

    python flask_server.py
    

使用 requests 进行测试

创建一个名为 test_flask_server.py 的文件,并添加以下代码:

import requests

def test_say_hello():
    url = 'http://localhost:5000/hello'
    data = {'name': 'World'}
    response = requests.post(url, json=data)
    
    if response.status_code == 200:
        print('Success!')
        print('Response JSON:', response.json())
    else:
        print('Failed!')
        print('Status Code:', response.status_code)
        print('Response Text:', response.text)

if __name__ == '__main__':
    test_say_hello()

运行测试

  1. 确保 grpc_server.pyflask_server.py 正在运行。

  2. 在另一个终端中运行 test_flask_server.py

    python test_flask_server.py
    

您应该看到如下输出:

Success!
Response JSON: {'message': 'Hello, World!'}

运行截图

在这里插入图片描述

这个测试脚本发送一个包含 name"World" 的 JSON 对象的 POST 请求到 http://localhost:5000/hello,然后打印出服务器返回的响应。如果服务器正常工作,您会看到成功消息和返回的 JSON 数据。

相关推荐

  1. 【Go系列】RPCgrpc

    2024-07-12 10:22:03       23 阅读
  2. Docker笔记:简单部署 nodejs 项目 golang 项目

    2024-07-12 10:22:03       53 阅读
  3. GRPC服务端客户端DEMO

    2024-07-12 10:22:03       39 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-12 10:22:03       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-12 10:22:03       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-12 10:22:03       58 阅读
  4. Python语言-面向对象

    2024-07-12 10:22:03       69 阅读

热门阅读

  1. ubuntu文件夹加密

    2024-07-12 10:22:03       23 阅读
  2. OpenCV在构建时确实没有启用CUDA支持

    2024-07-12 10:22:03       20 阅读
  3. 编程题-函数模板

    2024-07-12 10:22:03       22 阅读
  4. Opencv中的直方图均衡

    2024-07-12 10:22:03       20 阅读
  5. cannot connect to X server

    2024-07-12 10:22:03       22 阅读
  6. Perl 语言进阶学习

    2024-07-12 10:22:03       28 阅读
  7. 超参数的艺术:Mojo模型与动态超参数调整

    2024-07-12 10:22:03       27 阅读
  8. 浅拷贝和深拷贝浅析

    2024-07-12 10:22:03       24 阅读