基于grpc从零开始搭建一个准生产分布式应用(4) - 03 - grpc详解流式服务

原生grpc使用第二篇内容:流式服务,这种讯用方式非常适合大流量或长尾服务的场景。话不多说,见下面代码例子。

一、proto定义

syntax = "proto3";
import "google/protobuf/wrappers.proto";
package ecommerce;
service OrderManagement {
    rpc addOrder(Order) returns (google.protobuf.StringValue);
    rpc getOrder(google.protobuf.StringValue) returns (Order);
    rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
    rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
    rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);
}

message Order {
    string id = 1;
    repeated string items = 2;
    string description = 3;
    float price = 4;
    string destination = 5;
}

message CombinedShipment {
    string id = 1;
    string status = 2;
    repeated Order ordersList = 3;
}

二、一元流

即上一章节实现的内容,一问一答类似http的机制。

客户端

OrderManagementGrpc.OrderManagementBlockingStub stub = OrderManagementGrpc.newBlockingStub(channel);
OrderManagementGrpc.OrderManagementStub asyncStub = OrderManagementGrpc.newStub(channel);
OrderManagementOuterClass.Order order = OrderManagementOuterClass.Order
        .newBuilder()
        .setId("101")
        .addItems("iPhone XS").addItems("Mac Book Pro")
        .setDestination("San Jose, CA")
        .setPrice(2300)
        .build();
StringValue result = stub.addOrder(order);

服务端

public void addOrder(OrderManagementOuterClass.Order request, StreamObserver<StringValue> responseObserver) {
        logger.info("Order Added - ID: " + request.getId() + ", Destination : " + request.getDestination());
        orderMap.put(request.getId(), request);
        StringValue id = StringValue.newBuilder().setValue("100500").build();
        responseObserver.onNext(id);
        responseObserver.onCompleted();
        // ToDo  Handle errors
        // responseObserver.onError();
 }

三、服务端流

@Override
    public void searchOrders(StringValue request, StreamObserver<OrderManagementOuterClass.Order> responseObserver) {

        for (Map.Entry<String, OrderManagementOuterClass.Order> orderEntry : orderMap.entrySet()) {
            OrderManagementOuterClass.Order order = orderEntry.getValue();
            int itemsCount = order.getItemsCount();
            for (int index = 0; index < itemsCount; index++) {
                String item = order.getItems(index);
                if (item.contains(request.getValue())) {
                    logger.info("Item found " + item);
                    responseObserver.onNext(order); //持续写入流,这是一个持续发送的过程
                    break;
                }
            }
        }
        responseObserver.onCompleted(); //发送流结束标记
    }
// Search Orders
StringValue searchStr = StringValue.newBuilder().setValue("Google").build();
Iterator<OrderManagementOuterClass.Order> matchingOrdersItr;
matchingOrdersItr = stub.searchOrders(searchStr);
while (matchingOrdersItr.hasNext()) {
    OrderManagementOuterClass.Order matchingOrder = matchingOrdersItr.next();
    logger.info("Search Order Response -> Matching Order - " + matchingOrder.getId());
    logger.info(" Order : " + order.getId() + "\n "
            + matchingOrder.toString());
}

四、客户端流

// Client Streaming
    @Override
    public StreamObserver<OrderManagementOuterClass.Order> updateOrders(StreamObserver<StringValue> responseObserver) {
        return new StreamObserver<OrderManagementOuterClass.Order>() {
            StringBuilder updatedOrderStrBuilder = new StringBuilder().append("Updated Order IDs : ");

            @Override
            public void onNext(OrderManagementOuterClass.Order value) {
                if (value != null) {
                    orderMap.put(value.getId(), value);
                    updatedOrderStrBuilder.append(value.getId()).append(", ");
                    logger.info("Order ID : " + value.getId() + " - Updated");
                }
            }

            @Override
            public void onError(Throwable t) {
                logger.info("Order ID update error " + t.getMessage());
            }

            @Override
            public void onCompleted() {
                logger.info("Update orders - Completed");
                StringValue updatedOrders = StringValue.newBuilder().setValue(updatedOrderStrBuilder.toString()).build();
                responseObserver.onNext(updatedOrders);
                responseObserver.onCompleted();
            }
        };
    }
private static void invokeOrderUpdate(OrderManagementGrpc.OrderManagementStub asyncStub) {

    OrderManagementOuterClass.Order updOrder1 = OrderManagementOuterClass.Order.newBuilder()
            .setId("102")
            .addItems("Google Pixel 3A").addItems("Google Pixel Book")
            .setDestination("Mountain View, CA")
            .setPrice(1100)
            .build();
    OrderManagementOuterClass.Order updOrder2 = OrderManagementOuterClass.Order.newBuilder()
            .setId("103")
            .addItems("Apple Watch S4").addItems("Mac Book Pro").addItems("iPad Pro")
            .setDestination("San Jose, CA")
            .setPrice(2800)
            .build();
    OrderManagementOuterClass.Order updOrder3 = OrderManagementOuterClass.Order.newBuilder()
            .setId("104")
            .addItems("Google Home Mini").addItems("Google Nest Hub").addItems("iPad Mini")
            .setDestination("Mountain View, CA")
            .setPrice(2200)
            .build();

    final CountDownLatch finishLatch = new CountDownLatch(1);

    StreamObserver<StringValue> updateOrderResponseObserver = new StreamObserver<StringValue>() {
        @Override
        public void onNext(StringValue value) {
            logger.info("Update Orders Res : " + value.getValue());
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onCompleted() {
            logger.info("Update orders response  completed!");
            finishLatch.countDown();
        }
    };

    StreamObserver<OrderManagementOuterClass.Order> updateOrderRequestObserver = asyncStub.updateOrders(updateOrderResponseObserver);
    updateOrderRequestObserver.onNext(updOrder1);
    updateOrderRequestObserver.onNext(updOrder2);
    updateOrderRequestObserver.onNext(updOrder3);
    updateOrderRequestObserver.onNext(updOrder3);


    if (finishLatch.getCount() == 0) {
        logger.warning("RPC completed or errored before we finished sending.");
        return;
    }
    updateOrderRequestObserver.onCompleted();

    // Receiving happens asynchronously

    try {
        if (!finishLatch.await(10, TimeUnit.SECONDS)) {
            logger.warning("FAILED : Process orders cannot finish within 10 seconds");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

五、双向流

@Override
    public StreamObserver<StringValue> processOrders(StreamObserver<OrderManagementOuterClass.CombinedShipment> responseObserver) {

        return new StreamObserver<StringValue>() {
            int batchMarker = 0;
            @Override
            public void onNext(StringValue value) {
                logger.info("Order Proc : ID - " + value.getValue());
                OrderManagementOuterClass.Order currentOrder = orderMap.get(value.getValue());
                if (currentOrder == null) {
                    logger.info("No order found. ID - " + value.getValue());
                    return;
                }
                // Processing an order and increment batch marker to
                batchMarker++;
                String orderDestination = currentOrder.getDestination();
                OrderManagementOuterClass.CombinedShipment existingShipment = combinedShipmentMap.get(orderDestination);

                if (existingShipment != null) {
                    existingShipment = OrderManagementOuterClass.CombinedShipment.newBuilder(existingShipment).addOrdersList(currentOrder).build();
                    combinedShipmentMap.put(orderDestination, existingShipment);
                } else {
                    OrderManagementOuterClass.CombinedShipment shipment = OrderManagementOuterClass.CombinedShipment.newBuilder().build();
                    shipment = shipment.newBuilderForType()
                            .addOrdersList(currentOrder)
                            .setId("CMB-" + new Random().nextInt(1000)+ ":" + currentOrder.getDestination())
                            .setStatus("Processed!")
                            .build();
                    combinedShipmentMap.put(currentOrder.getDestination(), shipment);
                }

                if (batchMarker == BATCH_SIZE) {
                    // Order batch completed. Flush all existing shipments.
                    for (Map.Entry<String, OrderManagementOuterClass.CombinedShipment> entry : combinedShipmentMap.entrySet()) {
                        responseObserver.onNext(entry.getValue());
                    }
                    // Reset batch marker
                    batchMarker = 0;
                    combinedShipmentMap.clear();
                }
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onCompleted() {
                for (Map.Entry<String, OrderManagementOuterClass.CombinedShipment> entry : combinedShipmentMap.entrySet()) {
                    responseObserver.onNext(entry.getValue());
                }
                responseObserver.onCompleted();
            }

        };
    }
private static void invokeOrderProcess(OrderManagementGrpc.OrderManagementStub asyncStub) {

    final CountDownLatch finishLatch = new CountDownLatch(1);


    StreamObserver<OrderManagementOuterClass.CombinedShipment> orderProcessResponseObserver = new StreamObserver<OrderManagementOuterClass.CombinedShipment>() {
        @Override
        public void onNext(OrderManagementOuterClass.CombinedShipment value) {
            logger.info("Combined Shipment : " + value.getId() + " : " + value.getOrdersListList());
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onCompleted() {
            logger.info("Order Processing completed!");
            finishLatch.countDown();
        }
    };

    StreamObserver<StringValue> orderProcessRequestObserver =  asyncStub.processOrders(orderProcessResponseObserver);

    orderProcessRequestObserver.onNext(StringValue.newBuilder().setValue("102").build());
    orderProcessRequestObserver.onNext(StringValue.newBuilder().setValue("103").build());
    orderProcessRequestObserver.onNext(StringValue.newBuilder().setValue("104").build());
    orderProcessRequestObserver.onNext(StringValue.newBuilder().setValue("101").build());

    if (finishLatch.getCount() == 0) {
        logger.warning("RPC completed or errored before we finished sending.");
        return;
    }
    orderProcessRequestObserver.onCompleted();


    try {
        if (!finishLatch.await(120, TimeUnit.SECONDS)) {
            logger.warning("FAILED : Process orders cannot finish within 60 seconds");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

相关推荐

  1. gRPC-Go基础(3)基础gRPC服务

    2023-12-11 19:16:01       63 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-11 19:16:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-11 19:16:01       100 阅读
  3. 在Django里面运行非项目文件

    2023-12-11 19:16:01       82 阅读
  4. Python语言-面向对象

    2023-12-11 19:16:01       91 阅读

热门阅读

  1. openssl编译和集成

    2023-12-11 19:16:01       97 阅读
  2. python一点通:参数列表里面有星号 * 什么意思?

    2023-12-11 19:16:01       115 阅读
  3. 力扣labuladong一刷day34天

    2023-12-11 19:16:01       57 阅读
  4. ubuntu apt指令集学习心得

    2023-12-11 19:16:01       46 阅读
  5. 动态规划算法介绍

    2023-12-11 19:16:01       81 阅读
  6. Oracle中decode函数使用

    2023-12-11 19:16:01       55 阅读
  7. MySQL面试

    2023-12-11 19:16:01       57 阅读
  8. 【redis笔记】分布式锁

    2023-12-11 19:16:01       60 阅读
  9. 理解Go语言中的defer

    2023-12-11 19:16:01       61 阅读
  10. 初次参加软考就想报高级,哪个相对容易考?

    2023-12-11 19:16:01       71 阅读
  11. C++可以函数重载而C不可以的原因

    2023-12-11 19:16:01       55 阅读
  12. Springboot 集成 RocketMq5+ (gRPC 协议)

    2023-12-11 19:16:01       79 阅读