server stream
proto:
syntax = "proto3";
package streaming;
service ServerStreamingService {
rpc SendNumbers(NumberRequest) returns (stream NumberResponse) {}
}
message NumberRequest {
int32 number_of_responses = 1;
}
message NumberResponse {
int32 result = 1;
}
client端:
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "streaming.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReader;
using streaming::NumberRequest;
using streaming::NumberResponse;
using streaming::ServerStreamingService;
class ServerStreamingClient {
public:
ServerStreamingClient(std::shared_ptr<Channel> channel)
: stub_(ServerStreamingService::NewStub(channel)) {}
void GetNumbers() {
NumberRequest request;
request.set_number_of_responses(5); // Request 5 responses from the server
ClientContext context;
std::unique_ptr<ClientReader<NumberResponse>> reader(stub_->SendNumbers(&context, request));
NumberResponse response;
while (reader->Read(&response)) {
std::cout << "Received result: " << response.result() << std::endl;
}
Status status = reader->Finish();
if (!status.ok()) {
std::cout << "RPC failed with error: " << status.error_message() << std::endl;
}
}
private:
std::unique_ptr<ServerStreamingService::Stub> stub_;
};
int main() {
ServerStreamingClient client(grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()));
client.GetNumbers();
return 0;
}
在这个示例中,客户端创建了一个 ServerStreamingClient
类,并在 GetNumbers
方法中发送了一个 NumberRequest
消息,要求服务器端返回 5 个 NumberResponse
消息。
客户端使用 ClientReader
接收服务器端返回的流式消息,并在循环中打印出每个结果。最后,客户端调用 Finish
方法来完成整个 RPC 调用。
这个示例展示了如何在 gRPC C++ 中实现服务器端流式 RPC,客户端可以根据需要修改请求的参数和处理返回结果的逻辑。
server端:
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "streaming.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerReaderWriter;
using streaming::NumberRequest;
using streaming::NumberResponse;
using streaming::ServerStreamingService;
class ServerStreamingServiceImpl final : public ServerStreamingService::Service {
public:
Status SendNumbers(ServerContext* context, const NumberRequest* request, ServerWriter<NumberResponse>* writer) override {
int num_responses = request->number_of_responses();
for (int i = 1; i <= num_responses; ++i) {
NumberResponse response;
response.set_result(i * 10); // Just an example calculation
writer->Write(response);
}
return Status::OK;
}
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
ServerStreamingServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main() {
RunServer();
return 0;
}
client stream
client端
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "streaming.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using streaming::NumberRequest;
using streaming::NumberResponse;
using streaming::StreamingService;
class StreamingClient {
public:
StreamingClient(std::shared_ptr<Channel> channel) : stub_(StreamingService::NewStub(channel)) {}
int SendNumbers(const std::vector<int>& numbers) {
NumberResponse response;
NumberRequest request;
std::unique_ptr<ClientContext> context = std::make_unique<ClientContext>();
std::shared_ptr<grpc::ClientWriter<NumberRequest>> writer(stub_->SendNumbers(context.get(), &response));
for (const auto& num : numbers) {
request.set_number(num);
writer->Write(request);
}
writer->WritesDone();
Status status = writer->Finish();
if (status.ok()) {
return response.sum();
} else {
std::cout << "RPC failed: " << status.error_code() << ": " << status.error_message() << std::endl;
return -1;
}
}
private:
std::unique_ptr<StreamingService::Stub> stub_;
};
int main() {
std::shared_ptr<Channel> channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials());
StreamingClient client(channel);
std::vector<int> numbers = {1, 2, 3, 4, 5};
int sum = client.SendNumbers(numbers);
std::cout << "Sum of numbers: " << sum << std::endl;
return 0;
}
server端:
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "streaming.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using streaming::NumberRequest;
using streaming::NumberResponse;
using streaming::StreamingService;
class StreamingServiceImpl final : public StreamingService::Service {
public:
explicit StreamingServiceImpl() {}
Status SendNumbers(ServerContext* context, grpc::ServerReader<NumberRequest>* reader, NumberResponse* response) override {
int sum = 0;
NumberRequest request;
while (reader->Read(&request)) {
sum += request.number();
}
response->set_sum(sum);
return Status::OK;
}
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
StreamingServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main() {
RunServer();
return 0;
}
双向stream
server端:
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "streaming.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerReaderWriter;
using streaming::NumberRequest;
using streaming::NumberResponse;
using streaming::BidirectionalStreamingService;
class BidirectionalStreamingServiceImpl final : public BidirectionalStreamingService::Service {
public:
Status SendAndReceive(ServerContext* context, ServerReaderWriter<NumberResponse, NumberRequest>* stream) override {
NumberRequest request;
NumberResponse response;
int sum = 0;
while (stream->Read(&request)) {
sum += request.number();
response.set_result(sum);
stream->Write(response);
}
return Status::OK;
}
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
BidirectionalStreamingServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main() {
RunServer();
return 0;
}
client端:
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "streaming.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReaderWriter;
using streaming::NumberRequest;
using streaming::NumberResponse;
using streaming::BidirectionalStreamingService;
class BidirectionalStreamingClient {
public:
BidirectionalStreamingClient(std::shared_ptr<Channel> channel)
: stub_(BidirectionalStreamingService::NewStub(channel)) {}
void SendAndReceiveNumbers() {
ClientContext context;
std::shared_ptr<ClientReaderWriter<NumberRequest, NumberResponse>> stream(stub_->SendAndReceive(&context));
for (int i = 1; i <= 5; ++i) {
NumberRequest request;
request.set_number(i);
stream->Write(request);
NumberResponse response;
stream->Read(&response);
std::cout << "Received result: " << response.result() << std::endl;
}
stream->WritesDone();
Status status = stream->Finish();
if (!status.ok()) {
std::cout << "RPC failed with error: " << status.error_message() << std::endl;
}
}
private:
std::unique_ptr<BidirectionalStreamingService::Stub> stub_;
};
int main() {
BidirectionalStreamingClient client(grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()));
client.SendAndReceiveNumbers();
return 0;
}