前言
Tokio是一个异步运行时。同时支持embedded devices.
- 对异步代码的多线程运行时
- 对标准库的异步实现 (这个可以省很多事情)
- 生态系统丰富,非常多的工具库实现
Tokio不是万能的,部分场景不建议,可以考虑使用其他的:
- 多CPU计算密集型,并行计算。Tokio主要解决多个任务IO等待问题,对于并行计算没有太大优势。
- 大量文件读取。Tokio没有提供异步文件API. 使用它与普通线程读取文件没有区别。
- 单个Web请求,或者阻塞型请求。因为Tokio优势在多请求下的异步处理,简单场景有没有优势。
关键功能
use tokio::net; // 提供 TCP/UDP 等的异步实现
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; // read/write 异步实现, 异步io copy等
use tokio::fs::File; // 异步操作文件的实现
use tokio::process; // 异步的多进程实现
use tokio::time; // 时间的异步实现
use tokio::sync::Mutex; // 支持异步的mutex, 替代 use std::sync::Mutex;
// 异步channel模型
// mpsc 多生产,单消费
// oneshot 单生产,单消费
// broadcast 多对多
// watch 单生产,多消费
use tokio::sync::{mpsc, oneshot, broadcast, watch};
tokio::select!{} // 对多个channel同时进行loop的宏
// 异步main函数与宏
#[tokio::main]
async fn main(){}
tokio::spawn(async move { ... }).await; // tokio::task::spawn 绿色线程task.
tokio::main宏等价
// method 1
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
// method 2
let rt = Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
while let Some(task) = recv.recv().await {
tokio::spawn(handle_task(task));
}
// Once all senders have gone out of scope,
// the `.recv()` call returns None and it will
// exit from the while loop and shut down the
// thread.
});
实现一个自定义Future
use std::{env, future::Future, task::Poll, thread, time::{Duration, Instant}};
use tokio;
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
// poll 实现的是一个状态机
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
if Instant::now() >= self.when {
println!("time is over!");
Poll::Ready("done")
}else{
println!("time not ready...");
// 启动一个唤醒task
// 如果没有这个waker,future会一直在pending状态,恢复不回来
let waker = cx.waker().clone();
let when = self.when;
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when-now);
}
waker.wake();
});
Poll::Pending
}
}
}
#[tokio::main]
async fn main(){
let f = Delay{when: Instant::now()+Duration::from_millis(100)};
println!("{:?}", f.await);
}
TcpServer+Client的tokio版本
之前我们借助async-std
已经实现了基础的tcp server和client. 这里我们使用tokio.
[dependencies]
tokio = {version = "*", features = ["full"]}
code
use std::{env, thread, time::Duration};
use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, time::sleep};
#[tokio::main]
async fn main() {
let args = env::args().into_iter().collect::<Vec<_>>();
if args.len()>1 {
match args[1].as_str() {
"-s" => tcp_server().await,
"-c" => tcp_client().await,
_ => println!("unknown cmd {}", args[1]),
}
}else{
println!("Usage:\r\n\t-s Open Tcp Server.\r\n\t-c Open Tcp client to connect the server.")
}
}
async fn tcp_server() {
let s = TcpListener::bind("0.0.0.0:8000").await.unwrap();
println!("Listen on addr: {}\r\n=============", s.local_addr().unwrap().to_string());
loop{
let (c, _ ) = s.accept().await.unwrap();
// Tokio 任务是一个异步绿色线程。非常轻量级。
// 使用tokio内部调度程序分配,并不一定在新的线程
tokio::spawn(async move {
handler_tcp(c).await;
});
}
}
async fn handler_tcp(mut c: TcpStream) {
let mut buf = [0u8;1024];
let info = format!("[{:?}] => client in: {}", thread::current().id(), c.peer_addr().unwrap().to_string());
let n = c.read(&mut buf).await.unwrap();
println!("Read Len: {} \r\n{}", n, String::from_utf8_lossy(&buf[..n]));
// 模拟长时间耗时操作
sleep(Duration::from_secs(8)).await;
_ = c.write(format!("HTTP/1.1 200 OK\r\n\r\n{}\r\n", info).as_bytes()).await;
println!("Peer Inf: {}\r\n========================", info);
}
async fn tcp_client() {
let mut c = TcpStream::connect("127.0.0.1:8000").await.unwrap();
c.set_nodelay(true).unwrap();
_ = c.write("GET / HTTP/1.1\r\nAccept: */*\r\n\r\n".as_bytes()).await;
let mut strbuf = String::new();
_ = c.read_to_string(&mut strbuf).await;
println!("resp: {}", strbuf);
}
一定要注意,所有的异步操作要使用await
进行异步等待,否则这个调用并没有真正执行,达不到想要的效果。
多个task之间的数据共享
使用常规的arc等异步共享功能。