Linux Futex学习笔记

Futex

简介

概述: Futex(Fast Userspace Mutex)是linux的一种特有机制,设计目标是避免传统的线程同步原语(如mutex、条件变量等)在用户空间和内核空间之间频繁的上下文切换。Futex允许在用户空间处理锁定和等待的操作,只有在必要时进入内核,从而减少了不必要的开销。

对比:

  • SpinLock:如果上锁成功,立即进入临界区,开销很小;但是如果上锁失败,CPU空转,浪费资源
  • Mutex:如果上锁失败,内核会切换到其他线程在CPU运行,不用自旋等待锁;但是即使是上锁成功也要进出内核,使用系统调用,会消耗几百个指令
  • Futex:结合上述二者的优点,在用户态尝试上锁,上锁成功既可以不用进入内核态,上锁失败就通过系统调用睡眠,唤醒也要通过系统调用
    • 在用户空间尝试加锁通常通过原子操作来完成,如CAS(Compare-And-Swap)或者其他无锁的原子操作

原理

本质: futex直接通过虚拟地址(是一个用户空间地址,通常是一个32位的锁变量字段,0表示空闲,1表示有线程持有当前锁)来处理锁,而不是像以前一样创建一个需要初始化以及跨进程全局内核句柄

用户空间地址: futex用户空间地址可以通过mmap系统调用分配,而mmap分配的内存有三种:匿名内存、共享内存段、内存映射文件。匿名内存只能用在同一进程的线程之间使用,而共享内存段和内存映射文件可以在多个进程之间使用

主要功能:

  • Futex原子操作:用户空间线程首先执行一些原子操作(如使用atomic函数或cmpxchg指令)来尝试获取锁
  • 短路路径:如果锁在用户空间已经被释放,线程可以直接在用户空间完成同步,无需涉及内核
  • 等待:如果一个线程尝试获取的锁已经被另一个线程占用,它可以调用futex_wait()进入休眠
  • 唤醒:当锁被释放,另一个线程可以调用futex_wake()唤醒等待的线程

涉及的系统调用:

  • futex():允许用户空间线程等待或唤醒其他线程。

    1
    int futex(int *uddr, int op, int val, const struct timespec *timeout, int *uaddr2, int val2);
    • uaddr:用户空间地址,表示锁的位置
    • val:如果是FUTEX_WAIT,表示原子性地检查uaddr中的值是否为val,如果是则让进程睡眠,否则返回错误;如果是FUTEX_WAKE,表示最多唤醒val个等待在uaddr上的进程
    • op:操作类型:
      • FUTEX_WAIT:等待操作,线程会阻塞,直到指定的内存地址值发生变化
      • FUTEX_WAKE:唤醒操作,唤醒一个或多个等待该内存地址的线程
    • timeout:可选的等待时间,如果为空,那么调用会无限期睡眠。timeout默认会根据CLOCK_MONOTONIC时钟来计算,从linux4.5开始,可以再futex_op上指定FUTEX_CLOCK_REALTIME来选择CLOCK_REALTIME时钟。
    • uaddr2和val2在某些情况下用于双重条件等待
  • futex_wait()和futex_wake()是更高级的封装,用于在应用程序中更方便地实现等待和唤醒操作

实际应用

  • pthread_mutex:在linux中,pthread_mutex是基于futex实现的。
  • 内存屏障和自旋锁:在一些高效的同步机制中,futex也常常与自旋锁、内存屏障等技术结合使用

代码示例

代码仓库: 1037827920/Futex-Example-Program

futex本身的使用

Rust示例程序:(需要使用到libc crate)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use libc::{syscall, SYS_futex, SYS_gettid, FUTEX_WAIT, FUTEX_WAKE};
use std::{
panic,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
thread,
time::Duration,
};

const FUTEX_INIT: u32 = 0x0000_0000;
const FUTEX_WAITERS: u32 = 0x8000_0000;
const FUTEX_TID_MASK: u32 = 0x3fff_ffff;

fn main() {
test_futex();
}

fn futex_wait(futex: &AtomicU32, thread: &str, tid: i64) {
loop {
// 如果当前futex没有被其他线程持有
if (futex.load(Ordering::SeqCst) & FUTEX_TID_MASK) == 0 {
futex.swap(tid as u32, Ordering::SeqCst);
// 加锁后直接返回,这样就不用执行系统调用,减少一定开销
println!(
"线程{thread}上锁成功, futex值: {:#x}",
futex.load(Ordering::SeqCst)
);
return;
}

// 线程进入等待状态
futex.fetch_or(FUTEX_WAITERS, Ordering::SeqCst);
println!(
"线程{thread}正在等待futex, futex值: {:#x}",
futex.load(Ordering::SeqCst)
);
let ret = unsafe {
syscall(
SYS_futex,
futex as *const AtomicU32 as *mut u32,
FUTEX_WAIT,
futex.load(Ordering::SeqCst),
0,
0,
0,
)
};
if ret == -1 {
panic!("futex_wait系统调用执行失败");
}
}
}

fn futex_wake(futex: &AtomicU32, thread: &str) {
let ret = unsafe {
syscall(
SYS_futex,
futex as *const AtomicU32 as *mut u32,
FUTEX_WAKE,
1,
0,
0,
0,
)
};
if ret == -1 {
panic!("futex_wake系统调用执行失败");
}
futex.store(FUTEX_INIT, Ordering::SeqCst);
println!("线程{thread}释放锁");
}

/// 测试基本的futex使用
fn test_futex() {
// futex用户空间地址
let futex = Arc::new(AtomicU32::new(0));
let futex_clone1 = futex.clone();
let futex_clone2 = futex.clone();

// 线程1
let thread1 = thread::spawn(move || {
let tid = unsafe { syscall(SYS_gettid) };
// 尝试获取锁
futex_wait(&futex_clone1, "1", tid);
// 执行具体的业务逻辑
thread::sleep(Duration::from_secs(5));
// 释放锁
futex_wake(&futex_clone1, "1");
});

// 线程2
let thread2 = thread::spawn(move || {
let tid = unsafe { syscall(SYS_gettid) };
// 尝试获取锁
futex_wait(&futex_clone2, "2", tid);
// 执行具体的业务逻辑
thread::sleep(Duration::from_secs(5));
// 释放锁
futex_wake(&futex_clone2, "2");
});

thread1.join().unwrap();
thread2.join().unwrap();
}

C示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <linux/futex.h>

#define FUTEX_INIT 0x00000000
#define FUTEX_WAITERS 0x80000000
#define FUTEX_TID_MASK 0x3fffffff

// 定义一个结构体来封装参数
typedef struct {
atomic_uint* futex;
int thread;
} thread_args;

void* futex_wait(atomic_uint* futex, int thread, long tid) {
while (1) {
// 如果当前futex没有其他线程持有
if ((*futex & FUTEX_TID_MASK) == 0) {
atomic_exchange(futex, (unsigned int)tid);
// 加锁后直接返回
printf("线程%d上锁成功. futex值: 0x%x\n", thread, *futex);
return NULL;
}

// 线程进入等待状态
atomic_fetch_or(futex, FUTEX_WAITERS);
printf("线程%d正在等待futex, futex值: 0x%x\n", thread, *futex);
long ret = syscall(SYS_futex, (unsigned*)futex, FUTEX_WAIT, *futex, 0, 0, 0);
if (ret == -1) {
perror("futex_wait系统调用执行失败\n");
return NULL;
}
}
}

void* futex_wake(atomic_uint* futex, int thread) {
long ret = syscall(SYS_futex, (unsigned*)futex, FUTEX_WAKE, 1, 0, 0, 0);
if (ret == -1) {
perror("futex_wake系统调用执行失败\n");
return NULL;
}
atomic_store(futex, FUTEX_INIT);
printf("线程%d释放锁\n", thread);
return NULL;
}

void* thread_task(void* arg) {
thread_args* args = (thread_args*)arg;
// futex用户空间地址
atomic_uint* futex = args->futex;
// 线程号
int thread = args->thread;
// TID
long tid = syscall(SYS_gettid);

// 尝试获取锁
futex_wait(futex, thread, tid);
// 执行具体的业务逻辑
sleep(5);
// 释放锁
futex_wake(futex, thread);

return NULL;
}

int main() {
// 线程句柄
pthread_t t1, t2;

// futex用户空间地址
atomic_uint futex = 0;

thread_args args1 = { &futex, 1 };
thread_args args2 = { &futex, 2 };

// 创建两个线程同时递增cnt
pthread_create(&t1, NULL, thread_task, (void*)&args1);
pthread_create(&t2, NULL, thread_task, (void*)&args2);

// 等待线程结束
pthread_join(t1, NULL);
pthread_join(t2, NULL);

return 0;
}

运行结果:

pthread_mutex的使用

C示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

// 共享计数器
int shread_cnt = 0;
// 互斥锁
pthread_mutex_t cnt_mutex = PTHREAD_MUTEX_INITIALIZER;

// 线程执行的任务
void* thread_task(void* arg) {
// 线程ID
long tid = (long)arg;

// 循环5次,每次增加计数器
for (int i = 0; i < 5; ++i) {
// 加锁
pthread_mutex_lock(&cnt_mutex);

// 临界区:修改共享资源
int tmp = shread_cnt;
// 模拟一些处理时间
usleep(100);
shread_cnt = tmp + 1;

printf("线程 %ld: 计数器值 = %d\n", tid, shread_cnt);

// 解锁
pthread_mutex_unlock(&cnt_mutex);

// 模拟一些处理时间
usleep(200);
}

return NULL;
}

int main() {
// 定义线程句柄数组
pthread_t threads[3];

// 创建3个线程
for (long i = 0;i < 3; ++i) {
int ret = pthread_create(&threads[i], NULL, thread_task, (void*)i);

if (ret != 0) {
perror("线程创建失败");
return 1;
}
}

// 等待所有线程完成
for (int i = 0; i < 3; ++i) {
pthread_join(threads[i], NULL);
}

// 打印最终计数器值
printf("最终计数器值 = %d\n", shread_cnt);

// 销毁互斥锁
pthread_mutex_destroy(&cnt_mutex);

return 0;
}

原理浅析:

pthread_mutex_lock实际上会执行__pthread_mutex_lock ,然后这个函数实现里面会调用LLL_MUTEX_LOCK宏,这个宏会调用__lll_lock宏,在这个宏里面就会先尝试在用户态进行上锁(也就是通过CAS的原子操作进行上锁),然后上锁失败再调用__lll_lock_wait(或__lock_wait_private),这个函数在追踪下去就会执行futex_wait系统调用。这个流程就跟上面futex本身的使用差不多了。

想要看更详细的讲解可以看这个博客:pthread_mutex_lock实现 - pslydhh

参考


Linux Futex学习笔记
http://example.com/2025/01/25/Linux-Futex学习笔记/
作者
凌云行者
发布于
2025年1月25日
许可协议