Dubbo分层设计之Exchange层

前言

Dubbo 框架采用分层设计,自上而下共分为十层。Exchange 层位于倒数第三层,它在 协议层 的下方、数据传输层的上方。
第一次看源码的时候,大家应该都会有一个疑问:都已经有 Transport 层了,为啥还要定义 Exchange 层?
Dubbo 这么做自然有它的原因,今天我们一起解开这个疑惑。

理解Exchange

Exchange 层也叫 数据交换层,它和数据传输层有什么区别呢?
Transport 层是对 Netty、Mina 的统一封装,用来做网络数据传输的。一次 RPC 调用在 Dubbo 看来,本质上也就是一次请求报文和响应报文的传输过程。这么一看,好像完全没必要再单独抽象出 Exchange 层嘛。但是我们忽略了一个事情,那就是 Transport 层并没有实现 请求-应答 消息交换模式。

一般来说,我们发起一次 RPC 调用以后,业务线程会阻塞,期望拿到一个服务端发来的结果,再继续往下走。
Transport 层只有一个 tcp 长连接,tcp 本身是没有 Request、Response 概念的。它只具备消息收发的能力,至于收发的消息是 Request 还是 Response 它是不知道的,消息的语义需要靠上层来定义,一般是在协议头用一个专门的比特位来标记。以 HTTP 协议为例,它是七层协议,在传输层看来,报文是不分 Request、Response 的,这完全靠 HTTP 服务器自行实现。

正因如此,Dubbo 要直接基于 tcp 来实现 RPC 调用,就得自己实现 Request-Response 模型。

设计实现

首先我们要清楚 Dubbo 的调用流程,才好去理解这些接口的作用。
客户端发送 Request 和收到 Response 的流程是这样的:
image.png
服务端处理请求的流程是这样的:
image.png

Exchanger

Dubbo Exchange 层的核心 SPI 接口是org.apache.dubbo.remoting.exchange.Exchanger ,同样也分别提供了bindconnect 方法供服务端和客户端使用。

@SPI(HeaderExchanger.NAME)
public interface Exchanger {
   

    @Adaptive({
   Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    @Adaptive({
   Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}

它和 Transporter 主要区别是:Transporter Channel 处理器是 ChannelHandler 接口,只具备消息收发的能力。Exchanger Channel 处理器是 ExchangeHandler,它在前者的基础上增加了reply 能力,也就是对于一个 Request,服务端可以回复一个 Response,这就是 请求-应答 模型。

public interface ExchangeHandler extends ChannelHandler, TelnetHandler {
   

    CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException;
}

至于 ExchangeServer 和 ExchangeClient,只是在 Transport 层的 RemotingServer、Client 上做了一些封装。

HeaderExchanger

Exchanger 官方只提供了一种实现:HeaderExchanger,因为是在 Transport 上层,所以是基于 Transporter 二次封装。主要是创建了 HeaderExchangeClient 和 HeaderExchangeServer,核心是 HeaderExchangeHandler 实现。

public class HeaderExchanger implements Exchanger {
   

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
   
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
   
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}

HeaderExchangeServer

HeaderExchangeServer 是对 RemotingServer 的二次封装,主要是把传输层的 Channel 封装成了交换层的 ExchangeChannel。

@Override
public Collection<ExchangeChannel> getExchangeChannels() {
   
    Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();
    Collection<Channel> channels = server.getChannels();
    if (CollectionUtils.isNotEmpty(channels)) {
   
        for (Channel channel : channels) {
   
            exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));
        }
    }
    return exchangeChannels;
}

HeaderExchangeClient

HeaderExchangeClient 是对传输层 Client 的二次封装,主要是把 Client 封装成了 HeaderExchangeChannel,实现了 Request-Response 语义。

public HeaderExchangeClient(Client client, boolean startTimer) {
   
    Assert.notNull(client, "Client can't be null");
    this.client = client;
    this.channel = new HeaderExchangeChannel(client);

    if (startTimer) {
   
        URL url = client.getUrl();
        startReconnectTask(url);
        startHeartBeatTask(url);
    }
}

HeaderExchangeHandler

HeaderExchangeHandler 封装了协议层传过来的 ExchangeHandler,重写了receivedsent 方法,实现了对 Request、Response 对象的处理。

消息发送时,它会把 Channel 封装成 HeaderExchangeChannel 再交给后续 handler 处理。

@Override
public void sent(Channel channel, Object message) throws RemotingException {
   
    Throwable exception = null;
    try {
   
        // 封装Channel
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        handler.sent(exchangeChannel, message);
    } catch (Throwable t) {
   
        exception = t;
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
    if (message instanceof Request) {
   
        Request request = (Request) message;
        // 记录发送时间
        DefaultFuture.sent(channel, request);
    }
    ......
}

收到消息时,会针对 Request、Response 分别做处理。如果收到的是 Request,会调用业务 handler 执行业务逻辑,再返回结果。

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
   
    Response res = new Response(req.getId(), req.getVersion());
    Object msg = req.getData();
    try {
   
        // 调用业务handler、执行业务逻辑
        CompletionStage<Object> future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
   
            try {
   
                if (t == null) {
   
                    res.setStatus(Response.OK);
                    res.setResult(appResult);
                } else {
   
                    res.setStatus(Response.SERVICE_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
                // 发送结果
                channel.send(res);
            } catch (RemotingException e) {
   
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            }
        });
    } catch (Throwable e) {
   
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

如果收到的是 Response,说明是服务端对客户端请求的响应结果,则会给 DefaultFuture 设置 Result,唤醒业务线程。

static void handleResponse(Channel channel, Response response) throws RemotingException {
   
    if (response != null && !response.isHeartbeat()) {
   
        DefaultFuture.received(channel, response);
    }
}
public static void received(Channel channel, Response response, boolean timeout) {
   
    try {
   
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
   
            Timeout t = future.timeoutCheckTask;
            if (!timeout) {
   
                t.cancel();
            }
            // 设置结果,业务线程被唤醒
            future.doReceived(response);
        }
    } finally {
   
        CHANNELS.remove(response.getId());
    }
}

HeaderExchangeChannel

最后是 HeaderExchangeChannel,它是 交换层 ExchangeChannel 的实现。ExchangeChannel 是对传输层 Channel 的增强。Channel 只定义了send() 数据发送的能力,ExchangeChannel 增加了request() 支持发送 Request,拿到 Response。

public interface ExchangeChannel extends Channel {
   
   
    CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException;

    CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException;

    ExchangeHandler getExchangeHandler();

    @Override
    void close(int timeout);
}

HeaderExchangeChannel 主要是对 Channel 的一个二次封装,它会把实例化自身并放到 Channel 属性里

static HeaderExchangeChannel getOrAddChannel(Channel ch) {
   
    if (ch == null) {
   
        return null;
    }
    HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);
    if (ret == null) {
   
        ret = new HeaderExchangeChannel(ch);
        if (ch.isConnected()) {
   
            ch.setAttribute(CHANNEL_KEY, ret);
        }
    }
    return ret;
}

自定义Exchange

Dubbo Exchanger 也可以基于 SPI 一键替换,我们实现一个自定义的 Exchanger,加深理解。
首先,我们新建一个模块dubbo-extension-exchange-custom,并引入依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-remoting-api</artifactId>
        <version>${dubbo.version}</version>
    </dependency>
</dependencies>

新建 dubbo.extension.remoting.exchange.CustomExchanger,重写 Exchanger 接口,返回我们自定义的 Server、Client 实现。

public class CustomExchanger implements Exchanger {
   

    public static final String NAME = "custom";

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
   
        return new CustomeExchangeServer(Transporters.bind(url, new DecodeHandler(new CustomExchangeHandler(handler))));
    }

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
   
        return new CustomExchangeClient(Transporters.connect(url, new DecodeHandler(new CustomExchangeHandler(handler))));
    }
}

CustomeExchangeServer 本身只是对 RemotingServer 的一个封装,核心是把 Channel 封装成 ExchangeChannel

public class CustomeExchangeServer extends RemotingServerDelegate {
   

    public CustomeExchangeServer(RemotingServer server) {
   
        super(server);
    }

    @Override
    protected ExchangeChannel toExchangeChannel(Channel channel) {
   
        return CustomExchangeChannel.getOrAddChannel(channel);
    }
}

CustomExchangeClient 也只是为了把 Client 封装成 ExchangeChannel,让 传输层的 Channel 拥有request能力

public class CustomExchangeClient extends ClientDelegate {
   

    private final ExchangeChannel exchangeChannel;

    public CustomExchangeClient(Client client) {
   
        super(client);
        this.exchangeChannel = new CustomExchangeChannel(client);
    }

    @Override
    public CompletableFuture<Object> request(Object request) throws RemotingException {
   
        return exchangeChannel.request(request);
    }
    ......省略几个request()
}

CustomExchangeHandler 是对业务 ExchangeHandler 的封装,增加对 Request、Response 对象的处理

public class CustomExchangeHandler implements ChannelHandlerDelegate {
   

    private final ExchangeHandler handler;

    public CustomExchangeHandler(ExchangeHandler handler) {
   
        this.handler = handler;
    }

    @Override
    public ChannelHandler getHandler() {
   
        return handler;
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
   
        handler.connected(toExchangeChannel(channel));
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
   
        handler.disconnected(toExchangeChannel(channel));
    }

    @Override
    public void sent(Channel channel, Object message) throws RemotingException {
   
        handler.sent(toExchangeChannel(channel), message);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
   
        System.err.println("CustomExchangeHandler received:" + message);
        ExchangeChannel exchangeChannel = toExchangeChannel(channel);
        if (message instanceof Request) {
   
            handleRequest(exchangeChannel, (Request) message);
        } else if (message instanceof Response) {
   
            handleResponse(exchangeChannel, (Response) message);
        } else {
   
            handler.received(exchangeChannel, message);
        }
    }

    private void handleResponse(ExchangeChannel exchangeChannel, Response response) {
   
        DefaultFuture.received(exchangeChannel, response);
    }

    private void handleRequest(ExchangeChannel exchangeChannel, Request req) {
   
        try {
   
            Response res = new Response(req.getId(), req.getVersion());
            CompletableFuture<Object> future = handler.reply(exchangeChannel, req.getData());
            future.whenComplete((r, e) -> {
   
                if (e == null) {
   
                    res.setStatus((byte) 20);
                    res.setResult(r);
                } else {
   
                    res.setStatus((byte) 70);
                    res.setErrorMessage(e.getMessage());
                }
                try {
   
                    exchangeChannel.send(res);
                } catch (Exception exception) {
   
                    exception.printStackTrace();
                }
            });
        } catch (Exception e) {
   
            e.printStackTrace();
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
   
        handler.caught(toExchangeChannel(channel), exception);
    }

    private ExchangeChannel toExchangeChannel(Channel channel) {
   
        return CustomExchangeChannel.getOrAddChannel(channel);
    }
}

最后是 CustomExchangeChannel,它是对 Channel 的封装,增加了request的能力,发送请求后支持返回一个 CompletableFuture,并在收到响应后设置结果。

public class CustomExchangeChannel extends ExchangeChannelDelegate {
   
    private static final String CHANNEL_KEY = CustomExchangeChannel.class.getName() + ".CHANNEL";

    public CustomExchangeChannel(Channel channel) {
   
        super(channel);
    }

    @Override
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
   
        if (isClosed()) {
   
            throw new RemotingException(this.getLocalAddress(), (InetSocketAddress) null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        System.err.println("CustomExchangeChannel request:" + request);
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(this, req, timeout, executor);
        send(req);
        return future;
    }

    public static CustomExchangeChannel getOrAddChannel(Channel channel) {
   
        CustomExchangeChannel exchangeChannel = (CustomExchangeChannel) channel.getAttribute(CHANNEL_KEY);
        if (exchangeChannel == null) {
   
            channel.setAttribute(CHANNEL_KEY, exchangeChannel = new CustomExchangeChannel(channel));
        }
        return exchangeChannel;
    }
}

尾巴

Dubbo Exchange 层在 Transport 层之上实现了 Request-Response 模型。传输层只有一个 tcp 连接,只具备单纯的消息收发能力,对于消息收发的格式和语义是不关心的。tcp 没有 Request-Response 的概念,Dubbo 基于 tcp 长连接实现 RPC 调用,就必须自己实现一套 Request-Response 消息交换模型,Exchange 层就是对这套请求应答模型的抽象。

相关推荐

  1. Dubbo分层设计Serialize

    2024-01-19 01:10:01       42 阅读
  2. oracle分区表和分区exchange

    2024-01-19 01:10:01       41 阅读
  3. oracle分区表和非分区exchange

    2024-01-19 01:10:01       39 阅读
  4. <span style='color:red;'>dubbo</span>

    dubbo

    2024-01-19 01:10:01      54 阅读
  5. <span style='color:red;'>Dubbo</span>

    Dubbo

    2024-01-19 01:10:01      57 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-19 01:10:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-19 01:10:01       101 阅读
  3. 在Django里面运行非项目文件

    2024-01-19 01:10:01       82 阅读
  4. Python语言-面向对象

    2024-01-19 01:10:01       91 阅读

热门阅读

  1. MySQL 8.0中引入的选项和变量(四)

    2024-01-19 01:10:01       53 阅读
  2. 积木游戏

    2024-01-19 01:10:01       56 阅读
  3. 【git】git更新远程分支到本地

    2024-01-19 01:10:01       60 阅读
  4. Vue前端规范【一】

    2024-01-19 01:10:01       50 阅读
  5. MYSQL 1

    MYSQL 1

    2024-01-19 01:10:01      59 阅读
  6. Django——django与环境搭建

    2024-01-19 01:10:01       52 阅读
  7. CDH6.3.2,不互通的cdh平台互导hive数据

    2024-01-19 01:10:01       50 阅读
  8. 【PyTorch简介】4.Building the model layers 生成模型层

    2024-01-19 01:10:01       44 阅读
  9. 学习记录1.10

    2024-01-19 01:10:01       55 阅读
  10. SQL笔记 -- 索引失效情况

    2024-01-19 01:10:01       56 阅读
  11. go语言的部分的

    2024-01-19 01:10:01       52 阅读
  12. HBase学习三:集群部署

    2024-01-19 01:10:01       56 阅读