在 选择 RPC 通信协议 一节提到,Streaming 是 Dubbo3 新提供的一种 RPC 数据传输模式,适用于以下场景:
Streaming 分为以下三种:
以下示例演示 triple streaming 流式通信的基本使用方法,涵盖了客户端流、服务端流、双向流等三种模式,示例使用 Protocol Buffers 的服务开发模式,对于 Java 接口模式的开发者可以在本文最后查看相应说明。可在此查看 本示例完整代码。
首先,可通过以下命令下载示例源码:
进入示例源码目录:
编译项目,由 IDL 生成代码,这会调用 dubbo 提供的 protoc 插件生成对应的服务定义代码:
运行以下命令,启动 server:
运行以下命令,启动 client:
与 使用 Protobuf(IDL) 开发 triple 协议服务 一节中提到的一样,这个示例使用 protobuf 定义服务,因此示例需要的依赖、配置等基本是一致的,请参考那一节了解完整详情。接下来,我们将重点讲解流式通信部分的内容。
在上面的 proto 文件中,我们定义了两个方法:
biStream(stream GreeterRequest) returns (stream GreeterReply)
双向流serverStream(GreeterRequest) returns (stream GreeterReply)
服务端流接下来,我们需要从 .proto 服务定义生成 Dubbo 客户端和服务器接口。protoc dubbo 插件可以帮助我们生成需要的代码,在使用 Gradle 或 Maven 时,protoc 构建插件可以生成必要的代码作为构建的一部分。具体 maven 配置及代码生成步骤我们在 上一节 中有具体的描述。
target/generated-sources/protobuf/java/org/apache/dubbo/samples/tri/streaming/ 目录中可以发现如下生成代码,其中我们将重点讲解 DubboGreeterTriple.java
:
首先,让我们看一下如何定义服务实现并启动提供者:
定义类 GreeterImpl
实现 DubboGreeterTriple.GreeterImplBase
。
GreeterImpl
实现了所有 rpc 定义中的方法。接下里,我们看一下 server-side streaming 的具体定义。
不同于普通的方法定义,serverStream
方法有两个参数,第一个参数 request
是入参,第二个参数 responseObserver
为响应值,其参数类型是 StreamObserver<GreeterReply>
。在方法实现中,我们不停的调用 responseObserver.onNext(...)
将结果发送回消费方,并在最后调用 onCompleted()
表示流式响应结束。
双向流方法 biStream
的参数和返回值都是 StreamObserver<...>
类型。但需要注意的是,它与我们传统方法定义中参数是请求值、返回值是响应的理解是反过来的,在这里,参数 StreamObserver<GreeterReply> responseObserver
是响应,我们通过 responseObserver 不停的写回响应。
请注意这里请求流
与响应流
是独立的,我们在写回响应流数据的过程中,随时可能有请求流到达,对于每个流而言,值都是有序的。
启动 Dubbo 服务的过程与普通应用完全一致:
和普通的 Dubbo 服务调用一样,我们首先需要声明 rpc 服务引用:
接下来,我们就可以利用 greeter
像调用本地方法一样发起调用了。
调用 serverStream()
传入能够处理流式响应的 SampleStreamObserver
对象,调用发起后即快速返回,之后流式响应会不停的发送到 SampleStreamObserver
。
以下是 SampleStreamObserver
类的具体定义,包含其收到响应后的具体处理逻辑。
调用 greeter.biStream()
方法会立即返回一个 requestStreamObserver
,同时,需要为方法传入一个能处理响应的 observer 对象 new SampleStreamObserver()
。
接下来,我们就可以用才刚才返回值中得到的 requestStreamObserver
持续发送请求 requestStreamObserver.onNext(request)
;此时,如果有响应返回,则会由 SampleStreamObserver
接收处理,其定义请参考上文。
对于不使用 Protobuf 的用户而言,你可以直接在接口中定义 streaming 格式的方法,这样你就能使用流式通信了。
其中,org.apache.dubbo.common.stream.StreamObserver
是 Dubbo 框架提供的流式通信参数类型,请务必按照以上示例所示的方式定义
Stream 方法的方法入参和返回值是严格约定的,为防止写错而导致问题,Dubbo3 框架侧做了对参数的检查, 如果出错则会抛出异常。 对于
双向流(BIDIRECTIONAL_STREAM)
, 需要注意参数中的StreamObserver
是响应流,返回参数中的StreamObserver
为请求流。