rust 并发

Created

2024-05-07 17:14:21

Updated

2024-10-30 15:22:47

Caution

老笔记 ,待整理…

1 实现模型

rust 标准库就提供1:1 模型的实现, 第三方crate 有m:n模型的实现

2 使用线程

use std::thread;
use std::time::Duration;

fn main() {

    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

线程没有完全执行完,就退出了

等待线程完成

use std::thread;
use std::time::Duration;

fn main() {
    //thread::spawn 的返回类型是 JoinHandle,它有值的所有权
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
    // 等待其他线程执行完成
    handle.join().unwrap();
}

3 move 闭包

报错

use std::thread;

fn main() {
    let v = vec![1, 2, 3];
    // 闭包使用外部数据,是一种借用
    // rust无法确定 子线程什么时候结束, 而线程使用的借用数据可能被其他线程修改(比如主线程),这样就有问题了
    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

使用move

use std::thread;

fn main() {
    let v = vec![1, 2, 3];
    // v的所有权被move到闭包中
    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });
    // println!("{:?}", v);  // 提示v 已经被move了, 无法借用
    handle.join().unwrap();
}

4 channel 使用消息传递

类似go的channel, 线程之间通过发送消息来通信

mpsc : multiple producer, single consumer

use std::sync::mpsc;
use std::thread;

fn main() {
    //tx 发送端, rx 接收端
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        // send() 返回    Result<T, E>
        tx.send(val).unwrap();
    });
    // recv 会堵塞, 直到channel 有数据发送过来
    // 成功返回 Result<T,E> 发送端关闭,会返回错误
    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

使用for in来接收数据

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

多个生产者

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

5 共享状态的并发

Shared-State Concurrency

多所有权, 多个线程同时访问一块内存

5.1 Mutex

use std::sync::Mutex;

fn main() {
    // 返回Mutex<T>  也是智能指针
    let m = Mutex::new(5);

    {
        // 通过.lock() 获取锁, 如果获取不到,就会阻塞当前线程
        // 返回值是 MutexGuard 类型,它实现了 Deref trait 和Drop trait
        let mut num = m.lock().unwrap();
        *num = 6;
        // 报错, 锁还没释放, 下面的代码不能执行
        // let mut num2 = m.lock().unwrap();
    }
    // 这里 m 已经释放锁(drop 操作), 下面的代码可以执行
    let mut num2 = m.lock().unwrap();

    println!("m = {:?}", m);
}

5.2 Rc? 多个线程send

报错了, rc 是不能在线程间安全发送,因为没有实现 Send trait

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

5.3 Arc

Arc= atomic rc

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

6 Send Sync trait

  • Send
    • 实现Send trait的类型, 就可以线程间转移所有权
    • 几乎所有类型都实现了Send, 但是Rc 没有实现,它只能用于单线程
    • 由Send 成员组成的类型 也是Send
  • Sync
    • 实现Sync trait的类型,可以安全的被多线程引用
    • T 是 Sync, 那么 &T 就是 Send
    • 基础类型都实现了 Sync, 完全由Sync组成的类型 也是Sync
    • Rc,RefCell, Cell不是 Sync, Mutex是Sync

手动实现Send和Sync 是很难做到安全的

Back to top