rust 并发
Caution
老笔记 ,待整理…
1 实现模型
rust 标准库就提供1:1 模型的实现, 第三方crate 有m:n模型的实现
2 使用线程
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}线程没有完全执行完,就退出了
等待线程完成
use std::thread;
use std::time::Duration;
fn main() {
//thread::spawn 的返回类型是 JoinHandle,它有值的所有权
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
// 等待其他线程执行完成
handle.join().unwrap();
}3 move 闭包
报错
use std::thread;
fn main() {
let v = vec![1, 2, 3];
// 闭包使用外部数据,是一种借用
// rust无法确定 子线程什么时候结束, 而线程使用的借用数据可能被其他线程修改(比如主线程),这样就有问题了
let handle = thread::spawn(|| {
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
}使用move
4 channel 使用消息传递
类似go的channel, 线程之间通过发送消息来通信
mpsc : multiple producer, single consumer
use std::sync::mpsc;
use std::thread;
fn main() {
//tx 发送端, rx 接收端
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
// send() 返回 Result<T, E>
tx.send(val).unwrap();
});
// recv 会堵塞, 直到channel 有数据发送过来
// 成功返回 Result<T,E> 发送端关闭,会返回错误
let received = rx.recv().unwrap();
println!("Got: {}", received);
}使用for in来接收数据
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}多个生产者
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}5 共享状态的并发
Shared-State Concurrency
多所有权, 多个线程同时访问一块内存
5.1 Mutex
use std::sync::Mutex;
fn main() {
// 返回Mutex<T> 也是智能指针
let m = Mutex::new(5);
{
// 通过.lock() 获取锁, 如果获取不到,就会阻塞当前线程
// 返回值是 MutexGuard 类型,它实现了 Deref trait 和Drop trait
let mut num = m.lock().unwrap();
*num = 6;
// 报错, 锁还没释放, 下面的代码不能执行
// let mut num2 = m.lock().unwrap();
}
// 这里 m 已经释放锁(drop 操作), 下面的代码可以执行
let mut num2 = m.lock().unwrap();
println!("m = {:?}", m);
}5.2 Rc? 多个线程send
报错了, rc 是不能在线程间安全发送,因为没有实现 Send trait
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}5.3 Arc
Arc= atomic rc
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}6 Send Sync trait
- Send
- 实现Send trait的类型, 就可以线程间转移所有权
- 几乎所有类型都实现了Send, 但是Rc 没有实现,它只能用于单线程
- 由Send 成员组成的类型 也是Send
- Sync
- 实现Sync trait的类型,可以安全的被多线程引用
- T 是 Sync, 那么 &T 就是 Send
- 基础类型都实现了 Sync, 完全由Sync组成的类型 也是Sync
- Rc,RefCell, Cell不是 Sync, Mutex是Sync
手动实现Send和Sync 是很难做到安全的