Skip to content

Latest commit

 

History

History
 
 

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

README.md

gRPC 发送 Server 端流

helloworld 大致相同,不同的是 proto 定义修改了返回值,Client 端的返回值发生了变化, Server 端的发送变成了多个

修改 proto

service HelloService{
- rpc SayHello(HelloMessage) returns (HelloResponse){
+ rpc SayHello(HelloMessage) returns (stream HelloResponse){
  }
}

实现 Server 端

  • 服务实现
@Slf4j
class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {

    @Override
    public void sayHello(HelloMessage request, StreamObserver<HelloResponse> responseObserver) {
        log.info("收到客户端请求: " + request.getMessage());
+       AtomicInteger counter = new AtomicInteger();
+       while (counter.get() < 100) {
            // 构建响应
            HelloResponse response = HelloResponse.newBuilder()
                                                  .setMessage(counter.get() + ": Hello " + request.getMessage())
                                                  .build();

            // 发送响应
            responseObserver.onNext(response);
+           counter.getAndIncrement();
        }
        
        responseObserver.onCompleted();
    }
}

实现 Client 端

使用 BlockingStub 发起请求,这个调用是同步的,返回值由 HelloResponse变成了Iterator<HelloResponse>

@Slf4j
public class HelloWorldClient {

    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9090)
                                                      .usePlaintext()
                                                      .build();
        // 这里用 FutureStub 或者 AsyncStub 也是类似的,返回值变成 Future 或者传入 StreamObserver 即可
        HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);

        HelloMessage message = HelloMessage.newBuilder()
                                           .setMessage("Blocking Stub")
                                           .build();

-       HelloResponse helloResponse = blockingStub.sayHello(message);
        // 发送消息,并返回响应
+       Iterator<HelloResponse> helloResponses = blockingStub.sayHello(message);
+       while (helloResponses.hasNext()) {
            log.info(helloResponses.next().getMessage());
+       }
        channel.awaitTermination(5, TimeUnit.SECONDS);
    }
}