Rust 实战练习 - 11. Rust异步的基石 tokio

前言

Tokio是一个异步运行时。同时支持embedded devices.

  • 对异步代码的多线程运行时
  • 对标准库的异步实现 (这个可以省很多事情)
  • 生态系统丰富,非常多的工具库实现

Tokio不是万能的,部分场景不建议,可以考虑使用其他的:

  • 多CPU计算密集型,并行计算。Tokio主要解决多个任务IO等待问题,对于并行计算没有太大优势。
  • 大量文件读取。Tokio没有提供异步文件API. 使用它与普通线程读取文件没有区别。
  • 单个Web请求,或者阻塞型请求。因为Tokio优势在多请求下的异步处理,简单场景有没有优势。

关键功能

use tokio::net; // 提供 TCP/UDP 等的异步实现
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; // read/write 异步实现, 异步io copy等
use tokio::fs::File; // 异步操作文件的实现
use tokio::process; // 异步的多进程实现
use tokio::time; // 时间的异步实现
use tokio::sync::Mutex;  // 支持异步的mutex, 替代 use std::sync::Mutex;

// 异步channel模型
// mpsc 多生产,单消费
// oneshot 单生产,单消费
// broadcast 多对多
// watch 单生产,多消费
use tokio::sync::{mpsc, oneshot, broadcast, watch}; 
tokio::select!{}  // 对多个channel同时进行loop的宏

// 异步main函数与宏
#[tokio::main]
async fn main(){}

tokio::spawn(async move { ... }).await; // tokio::task::spawn 绿色线程task.

tokio::main宏等价

// method 1
tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })


// method 2
let rt = Builder::new_current_thread()
    .enable_all()
    .build()
    .unwrap();

rt.block_on(async move {
    while let Some(task) = recv.recv().await {
        tokio::spawn(handle_task(task));
    }

    // Once all senders have gone out of scope,
    // the `.recv()` call returns None and it will
    // exit from the while loop and shut down the
    // thread.
});

实现一个自定义Future

use std::{env, future::Future, task::Poll, thread, time::{Duration, Instant}};
use tokio;

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    // poll 实现的是一个状态机
    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        if Instant::now() >= self.when {
            println!("time is over!");
            Poll::Ready("done")
        }else{
            println!("time not ready...");
            // 启动一个唤醒task
            // 如果没有这个waker,future会一直在pending状态,恢复不回来
            let waker = cx.waker().clone();
            let when = self.when;
            thread::spawn(move || {
                let now = Instant::now();
                if now < when {
                    thread::sleep(when-now);
                }
                waker.wake();
            });
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main(){
    let f = Delay{when: Instant::now()+Duration::from_millis(100)};
    println!("{:?}", f.await);
}

TcpServer+Client的tokio版本

之前我们借助async-std已经实现了基础的tcp server和client. 这里我们使用tokio.

[dependencies]
tokio = {version = "*", features = ["full"]}

code

use std::{env, thread, time::Duration};
use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, time::sleep};

#[tokio::main]
async fn main() {
    let args = env::args().into_iter().collect::<Vec<_>>();
    if args.len()>1 {
        match args[1].as_str() {
            "-s" => tcp_server().await,
            "-c" => tcp_client().await,
            _ => println!("unknown cmd {}", args[1]),
        }
    }else{
        println!("Usage:\r\n\t-s Open Tcp Server.\r\n\t-c Open Tcp client to connect the server.")
    }
}

async fn tcp_server() {
    let s = TcpListener::bind("0.0.0.0:8000").await.unwrap();
    println!("Listen on addr: {}\r\n=============", s.local_addr().unwrap().to_string());
    loop{
        let (c, _ ) = s.accept().await.unwrap();
        // Tokio 任务是一个异步绿色线程。非常轻量级。
        // 使用tokio内部调度程序分配,并不一定在新的线程
        tokio::spawn(async move {
            handler_tcp(c).await;
        });
    }
}

async fn handler_tcp(mut c: TcpStream) {
    let mut buf = [0u8;1024];
    let info = format!("[{:?}] => client in: {}", thread::current().id(), c.peer_addr().unwrap().to_string());
    let n = c.read(&mut buf).await.unwrap();
    println!("Read Len: {} \r\n{}", n, String::from_utf8_lossy(&buf[..n]));
    
    // 模拟长时间耗时操作
    sleep(Duration::from_secs(8)).await;

    _ = c.write(format!("HTTP/1.1 200 OK\r\n\r\n{}\r\n", info).as_bytes()).await;
    println!("Peer Inf: {}\r\n========================", info);
    
}

async fn tcp_client() {
    let mut c = TcpStream::connect("127.0.0.1:8000").await.unwrap();
    c.set_nodelay(true).unwrap();
    _ = c.write("GET / HTTP/1.1\r\nAccept: */*\r\n\r\n".as_bytes()).await;
    let mut strbuf = String::new();
    _ = c.read_to_string(&mut strbuf).await;
    println!("resp: {}", strbuf);
}

一定要注意,所有的异步操作要使用await进行异步等待,否则这个调用并没有真正执行,达不到想要的效果。

多个task之间的数据共享

使用常规的arc等异步共享功能。

相关推荐

  1. Rust 实战练习 - 11. Rust异步基石 tokio

    2024-04-27 23:46:01       30 阅读
  2. Tokio强大Rust异步框架

    2024-04-27 23:46:01       36 阅读
  3. rust-tokio发布考古

    2024-04-27 23:46:01       29 阅读
  4. rust语言tokio库spawn, blocking_spawn等使用

    2024-04-27 23:46:01       59 阅读
  5. Rust基础拾遗--并发和异步编程

    2024-04-27 23:46:01       47 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-27 23:46:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-27 23:46:01       101 阅读
  3. 在Django里面运行非项目文件

    2024-04-27 23:46:01       82 阅读
  4. Python语言-面向对象

    2024-04-27 23:46:01       91 阅读

热门阅读

  1. http请求与响应,结合springboot

    2024-04-27 23:46:01       33 阅读
  2. 使用buildozer 打包 apk时遇到的问题

    2024-04-27 23:46:01       24 阅读
  3. c++类基础知识

    2024-04-27 23:46:01       36 阅读
  4. vue3前端调用后端接口实现批量删除

    2024-04-27 23:46:01       38 阅读
  5. Websocket

    2024-04-27 23:46:01       31 阅读
  6. 【前端技术】CSS基础入门篇

    2024-04-27 23:46:01       31 阅读
  7. 机器人系统能用MQTT5.0代替ROS2吗?

    2024-04-27 23:46:01       28 阅读
  8. LeetCode 1146. 快照数组【哈希表+二分查找】中等

    2024-04-27 23:46:01       37 阅读
  9. 大文件分片上传前端手写

    2024-04-27 23:46:01       32 阅读