1. 线程的概念

【操作系统】2.进程和线程 - imXuan - 博客园 (cnblogs.com)

  • 线程:light weight process(LWP)轻量级的进程,在 Linux 中本质上仍然是一个进程
  • 进程:有独立的地址空间,独立PCB,可以当作只有一个线程的进程。进程是计算机资源分配的最小单位
  • 线程:有独立的PCB,共享物理地址空间,是最小的执行单位。cpu时间片划分以PCB为依据,是调度的基本单位
  • LWP号:cpu划分时间片的依据。指令 “ ps -Lf pid “ 查看

1.1 线程共享的资源

    1. 文件描述符表
    1. 每种信号的处理方式(多个线程会争抢一个信号)
    1. 当前工作目录
    1. 用户ID和组ID
  • 内存地址空间 (.text/.data/.bss/heap/共享库) -> 堆区全局共享变量是共享的(进程是读时共享写时复制,实际上就是非共享)

1.2 线程非共享资源

    1. 线程id
    1. 处理器现场和栈指针(内核栈
    1. 独立的栈空间(用户空间栈
    1. errno变量(不是设置全局errno,直接返回errno)
    1. 信号屏蔽字(虽然共享信号,多个信号会争抢一个信号,但是可以使用信号屏蔽字)
    1. 调度优先级

1.3 线程优缺点

  • 优点:
    • 提高程序并发性
    • 开销小
    • 数据通信、共享数据方便
  • 缺点:
    • 库函数,不稳定
    • 调试、编写困难、gdb不支持
    • 对信号支持不好
    • 优点相对突出,缺点均不是硬伤。Linux下由于实现方法导致进程、线程差别不是很大。

2. 线程常用操作

  • 创建线程:pthread_create
  • 线程获取:pthread_self
  • 线程退出:
    • 线程内部:return void* (0);
    • 线程内部:pthread_exit(void *(0));
    • 线程外部:pthread_canel(返回值是 -1,需要一个取消点)
  • 线程回收
    • 手动回收:pthread_join
    • 自动回收:pthread_detach

2.1 创建线程 pthread_create

  功能:创建一个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <pthread.h>

int pthread_create(pthread_t *thread,
const pthread_attr_t *attr,
void *(*start_routine)(void *),
void *arg );
/* 参数:
thread:传出参数。线程标识符地址(一个无符号数)。
attr:线程属性结构体地址,通常设置为 NULL。
start_routine:线程函数的入口地址。
arg:传给线程函数的参数。
返回值:
成功:0
失败:非 0
*/

2.2.1 线程中处理出错

1
2
3
4
#include <string.h>
char *strerror(int errnum);

fprintf(stderr, "xxx error: %s\n", strerror(错误号));

2.2 获取线程ID pthread_self

  功能:获取线程号(与ps -Lf 查看的 id 不同)

1
2
3
4
5
6
#include <pthread.h>

pthread_t pthread_self(void);
/* 参数:无
返回值 调用线程的线程 ID 。
*/

2.3 线程退出 pthread_exit

  功能:退出调用线程。一个进程中的多个线程是共享该进程的数据段,因此,通常线程退出后所占用的资源并不会释放。

  • return:返回到调用者
  • exit:退出进程
  • pthread_exit:退出线程
1
2
3
4
5
#include <pthread.h>

void pthread_exit(void *retval);
/* 参数:retval:存储线程退出状态的指针。
返回值:无 */

2.4 线程回收 pthread_join

  功能:等待线程结束(此函数会阻塞),并回收线程资源,类似进程的 wait() 函数。如果线程已经结束,那么该函数会立即返回。

1
2
3
4
5
6
7
8
9
10
#include <pthread.h>

int pthread_join(pthread_t thread, void **retval);
/* 参数:
thread:被回收的线程号。
retval:用来存储线程退出状态的指针的地址
(pthread_exit 退出返回值是 void*, 这里是一个指针, 指向 void*指针, 所以是void**)
返回值:
成功:0
失败:非 0 */

2.5 线程分离 pthread_detach

  功能:使调用线程与当前进程分离,分离后不代表此线程不依赖与当前进程,线程分离的目的是将线程资源的回收工作交由系统自动来完成,也就是说当被分离的线程结束之后,系统会自动回收它的PCB资源。所以,此函数不会阻塞。

1
2
3
4
5
6
7
#include <pthread.h>

int pthread_detach(pthread_t thread);
/* 参数:thread:线程号。
返回值:
成功:0
失败:非0 */

2.6 线程取消 pthread_cancel

  功能:杀死(取消)线程

  • 被 pthread_cancel() 杀死的线程,使用 pthread_join() 再进行回收,会得到返回值 -1
  • 使用 pthread_cancel() 杀死线程必须有一个保存点才能生效,否则无法杀死线程。应该在被 cancel 函数调用的线程函数里自己添加一个取消点 pthread_testcancel(); 实际上就是进入系统内核,给他一个杀死线程的机会
1
2
3
4
5
6
7
#include <pthread.h>

int pthread_cancel(pthread_t thread);
/* 参数:thread : 目标线程ID。
返回值:
成功:0
失败:出错编号 */

3. 线程同步

3.1 互斥锁 pthread_mutex_t

  互斥锁是一种简单的加锁的方法来控制对共享资源的访问,互斥锁只有两种状态,即加锁( lock )和解锁( unlock )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <pthread.h>

// 创建互斥锁
pthread_mutex_t mutex;

// 静态初始化 互斥锁
mutex = PTHREAD_MUTEX_INITIALIZER;

// 动态初始化 互斥锁, attr:设置互斥量的属性, NULL表示默认
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

// 销毁指定的一个互斥锁。互斥锁在使用完毕后,必须要对互斥锁进行销毁,以释放资源
int pthread_mutex_destroy(pthread_mutex_t *mutex);

// 对互斥锁上锁,若互斥锁已经上锁,则调用者阻塞,直到互斥锁解锁后再上锁
int pthread_mutex_lock(pthread_mutex_t *mutex);

// 尝试对互斥锁上锁,若已经上锁跳过执行后面的代码
int pthread_mutex_trylock(pthread_mutex_t *mutex);

// 对指定的互斥锁解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);

3.2 读写锁 pthread_rwlock_t

  当有一个线程已经持有互斥锁时,互斥锁将所有试图进入临界区的线程都阻塞住。但是当前持有互斥锁的线程只是要读访问共享资源,而同时有其它几个线程也想读取这个共享资源,但是由于互斥锁的排它性,所有其它线程都无法获取锁,也就无法读访问共享资源了,但是实际上多个线程同时读访问共享资源并不会导致问题。

  在对数据的读写操作中,更多的是读操作,写操作较少,例如对数据库数据的读写应用。为了满足当前能够允许多个读出,但只允许一个写入的需求,线程提供了读写锁来实现。

读写锁的特点:

  • 如果有其它线程读数据,则允许其它线程执行读操作,但不允许写操作
  • 如果有其它线程写数据,则其它线程都不允许读、写操作

读写锁分为读锁和写锁,规则如下:

  • 如果某线程申请了读锁,其它线程可以再申请读锁,但不能申请写锁
  • 如果某线程申请了写锁,其它线程不能申请读锁,也不能申请写锁

举例子:线程1 给读写锁加了读锁,此时 线程2 请求读锁、线程3 请求写锁;则 线程2 的读锁会被阻塞,等 线程1 读锁释放后,线程3 进行写,之后 线程2 再读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <pthread.h>

// 初始化一个读写锁(restrict 修饰指针变量, 被变量修饰的内存操作只能由本指针操作)
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

// 销毁一个读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

// 阻塞上读锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
// 非堵塞上读锁
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);

// 阻塞上写锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
// 非阻塞上写锁
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

// 全解锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

3.3 条件变量 pthread_cond_t

  与互斥锁不同,条件变量是用来等待而不是用来上锁的,条件变量本身不是锁,条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。生产者消费者模型中比较常用

  条件变量的两个动作:

  • 条件不满, 阻塞线程
  • 当条件满足, 通知阻塞的线程开始工作

  条件变量的类型: pthread_cond_t

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <pthread.h>

// 初始化一个条件变量
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

// 销毁一个条件变量
int pthread_cond_destroy(pthread_cond_t *cond);

// 等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
/*
* 功能:
* 1.阻塞等待一个条件变量
* 2.解锁已经加锁成功的互斥量 (1.2为原子操作)
* .....等待.....
* 3.当条件满足,函数返回时,重新加锁互斥量
*/

// 等待条件满足, 超时退出
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec*restrict abstime);

// 唤醒阻塞在条件变量上的线程
int pthread_cond_signal(pthread_cond_t *cond);

// 唤醒所有阻塞在条件变量上的线程
int pthread_cond_broadcast(pthread_cond_t *cond);

  timespec结构体(abs_time 表示绝对时间,从1970年1月1日 00:00:00计算)

1
2
3
4
5
6
7
8
9
struct timespec {
time_t tv_sec; /* seconds */ // 秒
long tv_nsec; /* nanosecondes*/ // 纳秒
}

time_t cur = time(NULL); //获取当前时间。
struct timespec t; //定义timespec 结构体变量t
t.tv_sec = cur + 1; // 定时1秒
pthread_cond_timedwait(&cond, &t);

  一个示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

void err_thread(int ret, char* str)
{
if(ret!=0){
fprintf(stderr, "%s:%s\n", str, strerror(ret));
pthread_exit(NULL);
}
}

// 创建公共区
struct msg{
int num;
struct msg *next;
};
struct msg *head = NULL;
// 创建互斥量,初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 创建条件变量,初始化
pthread_cond_t has_data = PTHREAD_COND_INITIALIZER;

void* producer(void* arg)
{
int i = 0;
while(1){
struct msg *p = malloc(sizeof(struct msg));
// 生产数据
p->num = ++i;
p->next = NULL;
printf("product: %d\n", p->num);

// 将数据保存到公共区
pthread_mutex_lock(&mutex);
p->next = head;
head = p;
pthread_mutex_unlock(&mutex);

// 通知消费者
pthread_cond_signal(&has_data);

sleep(rand() % 3);
}
return NULL;
}

void* consumer(void* arg)
{
while(1){
struct msg *mp;
// 加锁互斥量
pthread_mutex_lock(&mutex);
//
while (head==NULL) // 注意 while 循环才可以解决多消费者抢锁的问题
{ // 因为头节点为空,解锁互斥量让其他人访问,并在这里阻塞等待
pthread_cond_wait(&has_data, &mutex);
} // 结束阻塞后自动重新加锁,进行下面的消费操作

// 如果头节点不为空则消费一个产品
mp = head;
head = mp->next;

// 操作公共区结束, 立即解锁
pthread_mutex_unlock(&mutex);
printf("consumer:%d\n", mp->num);
free(mp);

sleep(rand() % 3);
}

return NULL;
}

int main()
{
int ret;
pthread_t pid1, pid2, cid1, cid2, cid3;

srand(time(NULL));

ret = pthread_create(&pid1, NULL, producer, NULL);
if(ret!=0)
err_thread(ret, "pthread_create producer:");
ret = pthread_create(&pid2, NULL, producer, NULL);
if(ret!=0)
err_thread(ret, "pthread_create producer:");

ret = pthread_create(&cid1, NULL, consumer, NULL);
if(ret!=0)
err_thread(ret, "pthread_create consumer:");
ret = pthread_create(&cid2, NULL, consumer, NULL);
if(ret!=0)
err_thread(ret, "pthread_create consumer:");
ret = pthread_create(&cid3, NULL, consumer, NULL);
if(ret!=0)
err_thread(ret, "pthread_create consumer:");

pthread_join(pid1, NULL);
pthread_join(pid2, NULL);
pthread_join(cid1, NULL);
pthread_join(cid2, NULL);
pthread_join(cid3, NULL);

return 0;
}

3.4 信号量 semaphore

  信号量广泛用于进程或线程间的同步和互斥,信号量本质上是一个非负的整数计数器,它被用来控制对公共资源的访问。编程时可根据操作信号量值的结果判断是否对公共资源具有访问的权限,当信号量值大于 0 时,则可以访问,否则将阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <semaphore.h>

// 创建一个信号量并初始化它的值。一个无名信号量在被使用前必须先初始化。
// pshared 0:线程同步; 1: 进程同步
// value: 信号量的初值
// 成功返回0, 失败返回-1, 设置errno
int sem_init(sem_t *sem, int pshared, unsigned int value);

// 删除 sem 标识的信号量。
int sem_destroy(sem_t *sem);

// 将信号量的值减 1。操作前,先检查信号量(sem)的值是否为 0,若信号量为 0,此函数会阻塞,直到信号量大于 0 时才进行减 1 操作。
int sem_wait(sem_t *sem);

// 以非阻塞的方式来对信号量进行减 1 操作。
// 若操作前,信号量的值等于 0,则对信号量的操作失败,函数立即返回。
int sem_trywait(sem_t *sem);

// 限时尝试将信号量的值减 1
// abs_timeout:绝对时间
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

// 将信号量的值加 1 并发出信号唤醒等待线程(sem_wait())。
int sem_post(sem_t *sem);

// 获取 sem 标识的信号量的值,保存在 sval 中。
int sem_getvalue(sem_t *sem, int *sval);

  一个示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <semaphore.h>
#include <pthread.h>
#define NUM 5

int queue[NUM];
sem_t blank_number, product_number;

// 消费者
void* consumer(void *arg)
{
int i = 0;
while(1)
{
sem_wait(&product_number); // 产品数量-- (0则阻塞)
printf("Consume:%d\n", queue[i]);
queue[i] = 0;
sem_post(&blank_number); // 空格数量++

i = (i+1) % NUM;
sleep(rand() % 6);
}
}
// 生产者
void* producer(void *arg)
{
int i = 0;
int number = 0;
while(1)
{
sem_wait(&blank_number); // 空闲数量-- (0则阻塞)
queue[i] = ++number;
printf("Produce:%d\n", number);
sem_post(&product_number); // 产品数量++

i = (i+1) % NUM;
sleep(rand() % 2);
}
}
int main()
{
pthread_t pid, cid;
sem_init(&blank_number, 0, NUM); // 初始化空闲区域
sem_init(&product_number, 0, 0); // 初始化产品数量

pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);

pthread_join(pid, NULL);
pthread_join(cid, NULL);

sem_destroy(&blank_number);
sem_destroy(&product_number);

return 0;
}