前言
Dubbo 框架采用分层设计,自上而下共分为十层。Exchange 层位于倒数第三层,它在 协议层 的下方、数据传输层的上方。
第一次看源码的时候,大家应该都会有一个疑问:都已经有 Transport 层了,为啥还要定义 Exchange 层?
Dubbo 这么做自然有它的原因,今天我们一起解开这个疑惑。
理解Exchange
Exchange 层也叫 数据交换层,它和数据传输层有什么区别呢?
Transport 层是对 Netty、Mina 的统一封装,用来做网络数据传输的。一次 RPC 调用在 Dubbo 看来,本质上也就是一次请求报文和响应报文的传输过程。这么一看,好像完全没必要再单独抽象出 Exchange 层嘛。但是我们忽略了一个事情,那就是 Transport 层并没有实现 请求-应答 消息交换模式。
一般来说,我们发起一次 RPC 调用以后,业务线程会阻塞,期望拿到一个服务端发来的结果,再继续往下走。
Transport 层只有一个 tcp 长连接,tcp 本身是没有 Request、Response 概念的。它只具备消息收发的能力,至于收发的消息是 Request 还是 Response 它是不知道的,消息的语义需要靠上层来定义,一般是在协议头用一个专门的比特位来标记。以 HTTP 协议为例,它是七层协议,在传输层看来,报文是不分 Request、Response 的,这完全靠 HTTP 服务器自行实现。
正因如此,Dubbo 要直接基于 tcp 来实现 RPC 调用,就得自己实现 Request-Response 模型。
设计实现
首先我们要清楚 Dubbo 的调用流程,才好去理解这些接口的作用。
客户端发送 Request 和收到 Response 的流程是这样的:
服务端处理请求的流程是这样的:
Exchanger
Dubbo Exchange 层的核心 SPI 接口是org.apache.dubbo.remoting.exchange.Exchanger
,同样也分别提供了bind
和connect
方法供服务端和客户端使用。
@SPI(HeaderExchanger.NAME)
public interface Exchanger {
@Adaptive({
Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({
Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
它和 Transporter 主要区别是:Transporter Channel 处理器是 ChannelHandler 接口,只具备消息收发的能力。Exchanger Channel 处理器是 ExchangeHandler,它在前者的基础上增加了reply
能力,也就是对于一个 Request,服务端可以回复一个 Response,这就是 请求-应答 模型。
public interface ExchangeHandler extends ChannelHandler, TelnetHandler {
CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException;
}
至于 ExchangeServer 和 ExchangeClient,只是在 Transport 层的 RemotingServer、Client 上做了一些封装。
HeaderExchanger
Exchanger 官方只提供了一种实现:HeaderExchanger,因为是在 Transport 上层,所以是基于 Transporter 二次封装。主要是创建了 HeaderExchangeClient 和 HeaderExchangeServer,核心是 HeaderExchangeHandler 实现。
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
HeaderExchangeServer
HeaderExchangeServer 是对 RemotingServer 的二次封装,主要是把传输层的 Channel 封装成了交换层的 ExchangeChannel。
@Override
public Collection<ExchangeChannel> getExchangeChannels() {
Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();
Collection<Channel> channels = server.getChannels();
if (CollectionUtils.isNotEmpty(channels)) {
for (Channel channel : channels) {
exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));
}
}
return exchangeChannels;
}
HeaderExchangeClient
HeaderExchangeClient 是对传输层 Client 的二次封装,主要是把 Client 封装成了 HeaderExchangeChannel,实现了 Request-Response 语义。
public HeaderExchangeClient(Client client, boolean startTimer) {
Assert.notNull(client, "Client can't be null");
this.client = client;
this.channel = new HeaderExchangeChannel(client);
if (startTimer) {
URL url = client.getUrl();
startReconnectTask(url);
startHeartBeatTask(url);
}
}
HeaderExchangeHandler
HeaderExchangeHandler 封装了协议层传过来的 ExchangeHandler,重写了received
和sent
方法,实现了对 Request、Response 对象的处理。
消息发送时,它会把 Channel 封装成 HeaderExchangeChannel 再交给后续 handler 处理。
@Override
public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
// 封装Channel
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
handler.sent(exchangeChannel, message);
} catch (Throwable t) {
exception = t;
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
if (message instanceof Request) {
Request request = (Request) message;
// 记录发送时间
DefaultFuture.sent(channel, request);
}
......
}
收到消息时,会针对 Request、Response 分别做处理。如果收到的是 Request,会调用业务 handler 执行业务逻辑,再返回结果。
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
Object msg = req.getData();
try {
// 调用业务handler、执行业务逻辑
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 发送结果
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
如果收到的是 Response,说明是服务端对客户端请求的响应结果,则会给 DefaultFuture 设置 Result,唤醒业务线程。
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
t.cancel();
}
// 设置结果,业务线程被唤醒
future.doReceived(response);
}
} finally {
CHANNELS.remove(response.getId());
}
}
HeaderExchangeChannel
最后是 HeaderExchangeChannel,它是 交换层 ExchangeChannel 的实现。ExchangeChannel 是对传输层 Channel 的增强。Channel 只定义了send()
数据发送的能力,ExchangeChannel 增加了request()
支持发送 Request,拿到 Response。
public interface ExchangeChannel extends Channel {
CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException;
CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException;
ExchangeHandler getExchangeHandler();
@Override
void close(int timeout);
}
HeaderExchangeChannel 主要是对 Channel 的一个二次封装,它会把实例化自身并放到 Channel 属性里
static HeaderExchangeChannel getOrAddChannel(Channel ch) {
if (ch == null) {
return null;
}
HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);
if (ret == null) {
ret = new HeaderExchangeChannel(ch);
if (ch.isConnected()) {
ch.setAttribute(CHANNEL_KEY, ret);
}
}
return ret;
}
自定义Exchange
Dubbo Exchanger 也可以基于 SPI 一键替换,我们实现一个自定义的 Exchanger,加深理解。
首先,我们新建一个模块dubbo-extension-exchange-custom
,并引入依赖:
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-remoting-api</artifactId>
<version>${dubbo.version}</version>
</dependency>
</dependencies>
新建 dubbo.extension.remoting.exchange.CustomExchanger,重写 Exchanger 接口,返回我们自定义的 Server、Client 实现。
public class CustomExchanger implements Exchanger {
public static final String NAME = "custom";
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new CustomeExchangeServer(Transporters.bind(url, new DecodeHandler(new CustomExchangeHandler(handler))));
}
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new CustomExchangeClient(Transporters.connect(url, new DecodeHandler(new CustomExchangeHandler(handler))));
}
}
CustomeExchangeServer 本身只是对 RemotingServer 的一个封装,核心是把 Channel 封装成 ExchangeChannel
public class CustomeExchangeServer extends RemotingServerDelegate {
public CustomeExchangeServer(RemotingServer server) {
super(server);
}
@Override
protected ExchangeChannel toExchangeChannel(Channel channel) {
return CustomExchangeChannel.getOrAddChannel(channel);
}
}
CustomExchangeClient 也只是为了把 Client 封装成 ExchangeChannel,让 传输层的 Channel 拥有request
能力
public class CustomExchangeClient extends ClientDelegate {
private final ExchangeChannel exchangeChannel;
public CustomExchangeClient(Client client) {
super(client);
this.exchangeChannel = new CustomExchangeChannel(client);
}
@Override
public CompletableFuture<Object> request(Object request) throws RemotingException {
return exchangeChannel.request(request);
}
......省略几个request()
}
CustomExchangeHandler 是对业务 ExchangeHandler 的封装,增加对 Request、Response 对象的处理
public class CustomExchangeHandler implements ChannelHandlerDelegate {
private final ExchangeHandler handler;
public CustomExchangeHandler(ExchangeHandler handler) {
this.handler = handler;
}
@Override
public ChannelHandler getHandler() {
return handler;
}
@Override
public void connected(Channel channel) throws RemotingException {
handler.connected(toExchangeChannel(channel));
}
@Override
public void disconnected(Channel channel) throws RemotingException {
handler.disconnected(toExchangeChannel(channel));
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {
handler.sent(toExchangeChannel(channel), message);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
System.err.println("CustomExchangeHandler received:" + message);
ExchangeChannel exchangeChannel = toExchangeChannel(channel);
if (message instanceof Request) {
handleRequest(exchangeChannel, (Request) message);
} else if (message instanceof Response) {
handleResponse(exchangeChannel, (Response) message);
} else {
handler.received(exchangeChannel, message);
}
}
private void handleResponse(ExchangeChannel exchangeChannel, Response response) {
DefaultFuture.received(exchangeChannel, response);
}
private void handleRequest(ExchangeChannel exchangeChannel, Request req) {
try {
Response res = new Response(req.getId(), req.getVersion());
CompletableFuture<Object> future = handler.reply(exchangeChannel, req.getData());
future.whenComplete((r, e) -> {
if (e == null) {
res.setStatus((byte) 20);
res.setResult(r);
} else {
res.setStatus((byte) 70);
res.setErrorMessage(e.getMessage());
}
try {
exchangeChannel.send(res);
} catch (Exception exception) {
exception.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
handler.caught(toExchangeChannel(channel), exception);
}
private ExchangeChannel toExchangeChannel(Channel channel) {
return CustomExchangeChannel.getOrAddChannel(channel);
}
}
最后是 CustomExchangeChannel,它是对 Channel 的封装,增加了request
的能力,发送请求后支持返回一个 CompletableFuture,并在收到响应后设置结果。
public class CustomExchangeChannel extends ExchangeChannelDelegate {
private static final String CHANNEL_KEY = CustomExchangeChannel.class.getName() + ".CHANNEL";
public CustomExchangeChannel(Channel channel) {
super(channel);
}
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (isClosed()) {
throw new RemotingException(this.getLocalAddress(), (InetSocketAddress) null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
System.err.println("CustomExchangeChannel request:" + request);
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(this, req, timeout, executor);
send(req);
return future;
}
public static CustomExchangeChannel getOrAddChannel(Channel channel) {
CustomExchangeChannel exchangeChannel = (CustomExchangeChannel) channel.getAttribute(CHANNEL_KEY);
if (exchangeChannel == null) {
channel.setAttribute(CHANNEL_KEY, exchangeChannel = new CustomExchangeChannel(channel));
}
return exchangeChannel;
}
}
尾巴
Dubbo Exchange 层在 Transport 层之上实现了 Request-Response 模型。传输层只有一个 tcp 连接,只具备单纯的消息收发能力,对于消息收发的格式和语义是不关心的。tcp 没有 Request-Response 的概念,Dubbo 基于 tcp 长连接实现 RPC 调用,就必须自己实现一套 Request-Response 消息交换模型,Exchange 层就是对这套请求应答模型的抽象。