Rust —— 并发

安全高效地处理并发编程是 Rust 的主要目标之一。并发编程(concurrent programming)是指程序的不同部分独立执行,而并行编程(parallel programming)是指程序的不同部分同时执行。随着计算机利用多处理器的优势越来越多,并发和并行编程变得日益重要。

Rust 所有权和类型系统是一套强大的工具,能够同时帮助管理内存安全和并发问题,通过利用所有权和类型检查,许多并发错误在 Rust 中是编译时错误,而不是运行时错误。因此可以在编写代码时就修复问题,而不是在部署到生产环境后才发现,无畏并发让你能够编写没有细微 bug 的代码,并且易于重构而不会引入新的 bug。

本节内容

  • 如何创建线程来同时运行多段代码
  • 消息传递并发:通过通道在线程间发送消息
  • 共享状态并发:多个线程访问同一数据
  • SyncSend trait:将 Rust 的并发保证扩展到用户定义的类型

1. 使用线程同时运行代码

在大多数操作系统中,程序的代码在一个进程(process)中运行,操作系统同时管理多个进程。在程序内部,你也可以有多个同时运行的独立部分,运行这些部分的功能称为线程(thread)。

将程序中的计算拆分到多个线程可以提升性能,但也增加了复杂性。因为线程同时运行,无法保证不同线程中的代码执行顺序,这可能导致:

  • 竞态条件(race conditions):线程以不一致的顺序访问数据或资源
  • 死锁(deadlocks):两个线程互相等待对方,阻止它们继续运行
  • 只在特定情况下发生的 bug,难以可靠地复现和修复

1.1 使用 spawn 创建新线程

使用 thread::spawn 函数创建新线程,传递一个闭包包含要在新线程中运行的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use std::thread;
use std::time::Duration;

fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});

for i in 1..5 {
println!("hi number {} from main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}

注意:当主线程结束时,所有派生线程都会被关闭,无论它们是否执行完毕。

1.2 使用 join 等待线程完成

thread::spawn 返回一个 JoinHandle,调用它的 join 方法可以等待线程完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use std::thread;
use std::time::Duration;

fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});

for i in 1..5 {
println!("hi number {} from main thread!", i);
thread::sleep(Duration::from_millis(1));
}

handle.join().unwrap(); // 等待派生线程完成
}

调用 join 会阻塞当前线程,直到 handle 代表的线程终止。

1.3 在线程中使用 move 闭包

move 闭包常常与 thread::spawn 一起使用,它允许你在一个线程中使用另一个线程的数据。

1
2
3
4
5
6
7
8
9
10
11
use std::thread;

fn main() {
let v = vec![1, 2, 3];

let handle = thread::spawn(move || {
println!("这是 vector: {:?}", v);
});

handle.join().unwrap();
}

通过在闭包前使用 move 关键字,我们强制闭包获取它使用的值的所有权,而不是让 Rust 推断它应该借用值。

如果不使用 move,Rust 会尝试借用 v,但编译器无法确定派生线程会运行多久,所以无法确定 v 的引用是否始终有效。

2. 使用消息传递在线程间传输数据

一个日益流行的确保安全并发的方法是消息传递(message passing),线程或 actor 通过互相发送包含数据的消息来通信。Go 语言文档中的口号是:

不要通过共享内存来通信;而是通过通信来共享内存。

Rust 实现消息传递并发的主要工具是通道(channel)。

2.1 通道的概念

通道有两部分组成:

  • 发送者(transmitter)
  • 接收者(receiver)

当发送者或接收者任一被丢弃时,我们称通道被关闭(closed)了。

2.2 创建通道

使用 mpsc::channel 函数创建通道。mpsc 代表 multiple producer, single consumer(多个生产者,单个消费者):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});

let received = rx.recv().unwrap();
println!("收到:{}", received);
}
  • mpsc::channel 返回一个元组:第一个元素是发送端,第二个元素是接收端
  • 通常使用 txrx 作为变量名(transmitter 和 receiver 的缩写)
  • 使用 movetx 移动到闭包中,派生线程就拥有了发送端

2.3 发送和接收消息

发送端的 send 方法:

  • 接受要发送的值
  • 返回 Result<T, E> 类型
  • 如果接收端已经被丢弃,发送操作会返回错误

接收端有两个方法:

recv 方法

  • 阻塞主线程执行直到有值被发送
  • 返回 Result<T, E>
  • 当发送端关闭时返回错误

try_recv 方法

  • 不会阻塞,立即返回
  • 返回 Result<T, E>:如果有消息返回 Ok,否则返回 Err
  • 适合在循环中调用,处理消息的同时做其他工作

2.4 通道与所有权转移

发送值时会转移所有权,这防止了在发送后意外再次使用该值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
// println!("val is {}", val); // 错误!val 的所有权已转移
});

let received = rx.recv().unwrap();
println!("收到:{}", received);
}

2.5 发送多个值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

for received in rx {
println!("收到:{}", received);
}
}

在这个例子中:

  • 派生线程发送多个消息,每次发送后暂停 1 秒
  • 主线程中将 rx 当作迭代器使用,不再显式调用 recv
  • 当通道关闭时,迭代结束

2.6 通过克隆发送者创建多个生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

for received in rx {
println!("收到:{}", received);
}
}

输出可能类似于:

1
2
3
4
5
6
7
8
收到:hi
收到:more
收到:from
收到:messages
收到:the
收到:for
收到:thread
收到:you

具体顺序取决于系统的调度。

3. 共享状态并发

消息传递是一种很好的处理并发的方式,但并不是唯一的。另一种方式是多个线程访问相同的共享数据。

回忆一下 Go 语言的口号:不要通过共享内存来通信。那么,通过共享内存来通信是什么样的呢?

某种程度上,任何编程语言中的通道都类似于单所有权,因为一旦将值传送到通道中,就不应该再使用这个值了。共享内存并发类似于多所有权:多个线程可以同时访问相同的内存位置。

3.1 互斥器(Mutex)

互斥器(mutex)是 mutual exclusion 的缩写,意为在任意时刻,mutex 只允许一个线程访问某些数据。

要访问互斥器中的数据,线程必须首先发出信号表明它想要获取互斥器的(lock)。锁是一个数据结构,它记录谁有数据的独占访问权。因此,mutex 通过锁系统保护(guarding)其持有的数据。

使用 mutex 的两个规则:

  1. 在使用数据之前必须尝试获取锁
  2. 使用完 mutex 守护的数据后,必须解锁数据,以便其他线程可以获取锁

3.2 Mutex<T> 的基本使用

1
2
3
4
5
6
7
8
9
10
11
12
use std::sync::Mutex;

fn main() {
let m = Mutex::new(5);

{
let mut num = m.lock().unwrap();
*num = 6;
}

println!("m = {:?}", m);
}
  • 使用 Mutex::new 创建 Mutex<T>
  • 使用 lock 方法获取锁。这个调用会阻塞当前线程,直到获取锁为止
  • lock 返回 LockResult,它是一个智能指针 MutexGuard
  • MutexGuard 实现了 Deref 指向内部数据,实现了 Drop 在离开作用域时自动释放锁

3.3 在多线程中共享 Mutex<T>

错误的尝试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::sync::Mutex;
use std::thread;

fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];

for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("结果:{}", *counter.lock().unwrap());
}

这段代码无法编译,因为 counter 的所有权在第一次循环时就被移动到了第一个线程中。

3.4 多线程多所有权

在第 15 章中,我们使用智能指针 Rc<T> 来创建引用计数值,以便拥有多个所有者。让我们尝试用 Rc<T> 包装 Mutex<T>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("结果:{}", *counter.lock().unwrap());
}

编译时会得到错误:

1
`Rc<Mutex<i32>>` cannot be sent between threads safely

问题在于 Rc<T> 不是线程安全的。当 Rc<T> 管理引用计数时,它在每次调用 clone 时增加计数,并在每次克隆被丢弃时减少计数。但它没有使用任何并发原语来确保改变计数的操作不会被另一个线程打断。

3.5 使用 Arc<T> 实现原子引用计数

Arc<T> 是一个类似 Rc<T> 的类型,但可以安全地用于并发场景。字母 ‘a’ 代表原子性(atomic),意为它是一个原子引用计数(atomically reference counted)类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("结果:{}", *counter.lock().unwrap());
}

输出:

1
结果:10

这段代码成功编译并运行!

3.6 Rc<T> vs Arc<T>

为什么所有原始类型不默认都是原子性的?为什么标准库类型不默认使用 Arc<T>

原因是:线程安全带有性能损失,只有在真正需要时才应该付出这个代价。如果只是在单线程中操作值,代码可以运行得更快。

3.7 RefCell<T> / Rc<T>Mutex<T> / Arc<T> 的相似性

你可能注意到 counter 是不可变的,但我们可以获取其内部值的可变引用。这意味着 Mutex<T> 提供了内部可变性,就像 Cell 系列类型一样。

正如我们使用 RefCell<T> 来允许改变 Rc<T> 中的内容,我们使用 Mutex<T> 来改变 Arc<T> 中的内容。

另一个值得注意的细节是:使用 Mutex<T> 有造成死锁(deadlock)的风险。当一个操作需要锁定两个资源,而两个线程各持有一个锁,并试图获取另一个锁时,它们会永远等待对方。

4. 使用 SendSync trait 扩展并发

有趣的是,Rust 语言本身的并发特性很少。本章讨论的几乎所有并发特性都是标准库的一部分,而不是语言本身。你的并发选项不限于语言或标准库,你可以编写自己的并发功能或使用其他人编写的。

然而,有两个并发概念内嵌于语言中:std::marker trait SyncSend

4.1 Send trait:允许在线程间转移所有权

Send 标记 trait 表明实现了 Send 的类型值的所有权可以在线程间传送。

几乎所有 Rust 类型都是 Send 的,但有一些例外,包括 Rc<T>:这不能是 Send 的,因为如果克隆了 Rc<T> 的值并尝试将克隆的所有权转移到另一个线程,两个线程可能同时更新引用计数。

因此,Rc<T> 被实现为用于单线程场景。

任何完全由 Send 类型组成的类型也会自动标记为 Send。几乎所有基本类型都是 Send 的,除了裸指针。

4.2 Sync trait:允许多线程访问

Sync 标记 trait 表明一个实现了 Sync 的类型可以安全地从多个线程中引用。

换句话说,对于任意类型 T,如果 &TT 的不可变引用)是 Send 的,那么 T 就是 Sync 的,这意味着引用可以安全地发送到另一个线程。

类似于 Send,基本类型都是 Sync 的,完全由 Sync 类型组成的类型也是 Sync 的。

Rc<T> 也不是 Sync 的,原因与它不是 Send 的相同。RefCell<T> 类型和 Cell<T> 系列类型不是 Sync 的。RefCell<T> 在运行时进行的借用检查不是线程安全的。

智能指针 Mutex<T>Sync 的,可以用来在多线程中共享访问。

4.3 手动实现 SendSync 是不安全的

因为由 SendSync trait 组成的类型会自动成为 SendSync,所以我们不必手动实现这些 trait。作为标记 trait,它们甚至没有任何需要实现的方法。

手动实现这些 trait 涉及实现不安全的 Rust 代码。我们将在第 19 章讨论使用不安全 Rust 代码。现在重要的是,构建由非 SendSync 部分组成的新并发类型需要仔细思考以维持安全保证。

5. 并发模式对比

5.1 消息传递 vs 共享状态

消息传递(使用通道):

  • ✅ 更容易推理,减少共享状态
  • ✅ 避免了锁和死锁问题
  • ✅ 符合”通过通信来共享内存”的理念
  • ❌ 可能有一些性能开销

共享状态(使用 Mutex):

  • ✅ 在某些场景下性能更好
  • ✅ 对某些问题更自然(如共享配置)
  • ❌ 需要小心处理锁,避免死锁
  • ❌ 更容易引入竞态条件

Rust 允许你根据具体情况选择最合适的并发方式。

5.2 线程安全类型对照表

类型 线程安全 用途
Rc<T> 单线程引用计数
Arc<T> 多线程引用计数
RefCell<T> 单线程内部可变性
Mutex<T> 多线程内部可变性
Cell<T> ❌ (但 Send) 单线程内部可变性(Copy 类型)

总结

Rust 提供了强大的并发编程工具:

  • 线程:使用 thread::spawn 创建,用 join 等待完成
  • 消息传递:使用 mpsc::channel 在线程间安全传递数据
  • 共享状态:使用 Mutex<T>Arc<T> 实现线程安全的共享
  • SendSync trait:确保类型可以安全地在线程间使用

Rust 的类型系统和所有权规则使得编写并发代码时,许多并发错误都能在编译期捕获,而不是在运行时才暴露。这就是 Rust 的无畏并发:你可以放心地编写并发代码,因为编译器会帮你检查大部分问题。

一旦代码能够编译通过,你就可以确信它能在多线程环境中正确运行,而不会出现其他语言中常见的难以追踪的 bug。

掌握 Rust 的并发特性,能让你充分利用现代多核处理器的能力,编写高性能、安全可靠的并发程序!

Hooray!并发小节完成!!!