在了解了《同步与互斥的区别》之后,我们来看看几个经典的线程同步的例子。相信通过具体场景可以让我们学会分析和解决这类线程同步的问题,以便以后应用在实际的项目中。
一、生产者-消费者问题问题描述:
一组生产者进程和一组消费者进程共享一个初始为空、大小为 n 的缓冲区,只有缓冲区没满时,生产者才能把消息放入到缓冲区,否则必须等待;只有缓冲区不空时,消费者才能从中取出消息,否则必须等待。由于缓冲区是临界资源,它只允许一个生产者放入消息,或者一个消费者从中取出消息。
分析:
-
关系分析:生产者和消费者对缓冲区互斥访问是互斥关系,同时生产者和消费者又是一个相互协作的关系,只有生产者生产之后,消费者才能消费,它们也是同步关系。
-
整理思路:这里比较简单,只有生产者和消费者两个进程,且这两个进程存在着互斥关系和同步关系。那么需要解决的是互斥和同步的PV操作的位置。
-
信号量设置:信号量
mutex
作为互斥信号量,用于控制互斥访问缓冲池,初值为1;信号量full
用于记录当前缓冲池中“满”缓冲区数,初值为 0;信号量empty
用于记录当前缓冲池中“空”缓冲区数,初值为n。
代码示例:(semaphore类的封装见下文)
#include
#include // sleep
#include
#include"semaphore.h"
using namespace std;
#define N 5
semaphore mutex("/", 1); // 临界区互斥信号量
semaphore empty("/home", N); // 记录空缓冲区数,初值为N
semaphore full("/home/songlee",0); // 记录满缓冲区数,初值为0
int buffer[N]; // 缓冲区,大小为N
int i=0;
int j=0;
void* producer(void* arg)
{
empty.P(); // empty减1
mutex.P();
buffer[i] = 10 + rand() % 90;
printf("Producer %d write Buffer[%d]: %d\n",arg,i+1,buffer[i]);
i = (i+1) % N;
mutex.V();
full.V(); // full加1
}
void* consumer(void* arg)
{
full.P(); // full减1
mutex.P();
printf(" \033[1;31m");
printf("Consumer %d read Buffer[%d]: %d\n",arg,j+1,buffer[j]);
printf("\033[0m");
j = (j+1) % N;
mutex.V();
empty.V(); // empty加1
}
int main()
{
pthread_t id[10];
// 开10个生产者线程,10个消费者线程
for(int k=0; kV(); // 放回左边筷子
chopstick[1]->V(); // 放回右边筷子
}
void* P2(void* arg) // 第2个哲学家线程
{
mutex.P(); // 在取筷子前获得互斥量
chopstick[1]->P(); // 取左边筷子
chopstick[2]->P(); // 取右边筷子
mutex.V(); // 释放取筷子的信号量
printf("Philosopher 2 eat.\n");
chopstick[1]->V(); // 放回左边筷子
chopstick[2]->V(); // 放回右边筷子
}
void* P3(void* arg) // 第3个哲学家线程
{
mutex.P(); // 在取筷子前获得互斥量
chopstick[2]->P(); // 取左边筷子
chopstick[3]->P(); // 取右边筷子
mutex.V(); // 释放取筷子的信号量
printf("Philosopher 3 eat.\n");
chopstick[2]->V(); // 放回左边筷子
chopstick[3]->V(); // 放回右边筷子
}
void* P4(void* arg) // 第4个哲学家线程
{
mutex.P(); // 在取筷子前获得互斥量
chopstick[3]->P(); // 取左边筷子
chopstick[4]->P(); // 取右边筷子
mutex.V(); // 释放取筷子的信号量
printf("Philosopher 4 eat.\n");
chopstick[3]->V(); // 放回左边筷子
chopstick[4]->V(); // 放回右边筷子
}
void* P5(void* arg) // 第5个哲学家线程
{
mutex.P(); // 在取筷子前获得互斥量
chopstick[4]->P(); // 取左边筷子
chopstick[0]->P(); // 取右边筷子
mutex.V(); // 释放取筷子的信号量
printf("Philosopher 5 eat.\n");
chopstick[4]->V(); // 放回左边筷子
chopstick[0]->V(); // 放回右边筷子
}
int main()
{
semaphore *sem1 = new semaphore("/home", 1);
semaphore *sem2 = new semaphore("/home/songlee", 1);
semaphore *sem3 = new semaphore("/home/songlee/java", 1);
semaphore *sem4 = new semaphore("/home/songlee/ADT", 1);
semaphore *sem5 = new semaphore("/home/songlee/Test", 1);
chopstick.push_back(sem1);
chopstick.push_back(sem2);
chopstick.push_back(sem3);
chopstick.push_back(sem4);
chopstick.push_back(sem5);
pthread_t id;
pthread_create(&id, NULL, P1, NULL);
pthread_create(&id, NULL, P2, NULL);
pthread_create(&id, NULL, P3, NULL);
pthread_create(&id, NULL, P4, NULL);
pthread_create(&id, NULL, P5, NULL);
sleep(1);
delete sem1;
delete sem2;
delete sem3;
delete sem4;
delete sem5;
return 0;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
编译运行的结果如下:
Philosopher 2 eat.
Philosopher 1 eat.
Philosopher 3 eat.
Philosopher 4 eat.
Philosopher 5 eat.
- 1
- 2
- 3
- 4
- 5
注意:创建信号量时的 路径参数 请改成你的系统中存在的路径!!!
附:semaphore类的封装
上面的代码中都使用了这个semaphore
类,实现如下:
- semaphore.h
#pragma once
#include
#include
#include
#include
using namespace std;
// 联合体,用于semctl初始化
union semun {
int val; /*for SETVAL*/
struct semid_ds *buf;
unsigned short *array;
};
class semaphore {
private:
int sem_id;
int init_sem(int);
public:
semaphore(const char*, int); /*构造函数*/
~semaphore(); /*析构函数*/
void P(); /*P操作*/
void V(); /*V操作*/
};
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- semaphore.cpp
#include"semaphore.h"
semaphore::semaphore(const char* path, int value)
{
key_t key;
/*获取key值*/
if((key = ftok(path, 'z')) < 0) {
perror("ftok error");
exit(1);
}
/*创建信号量集,其中只有一个信号量*/
if((sem_id = semget(key, 1, IPC_CREAT|0666)) == -1) {
perror("semget error");
exit(1);
}
init_sem(value);
}
semaphore::~semaphore()
{
union semun tmp;
if(semctl(sem_id, 0, IPC_RMID, tmp) == -1) {
perror("Delete Semaphore Error");
exit(1);
}
}
void semaphore::P()
{
struct sembuf sbuf;
sbuf.sem_num = 0; /*序号*/
sbuf.sem_op = -1; /*P操作*/
sbuf.sem_flg = SEM_UNDO;
if(semop(sem_id, &sbuf, 1) == -1) {
perror("P operation Error");
}
}
void semaphore::V()
{
struct sembuf sbuf;
sbuf.sem_num = 0; /*序号*/
sbuf.sem_op = 1; /*V操作*/
sbuf.sem_flg = SEM_UNDO;
if(semop(sem_id, &sbuf, 1) == -1) {
perror("V operation Error");
}
}
// 初始化信号量
int semaphore::init_sem(int value)
{
union semun tmp;
tmp.val = value;
if(semctl(sem_id, 0, SETVAL, tmp) == -1) {
perror("Init Semaphore Error");
return -1;
}
return 0;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
在这里,要创建不同的信号量,必须传递不同的路径参数(这样获取的 key 值才会不一样)。
注意,本文的关注点并不在于 linux 下如何创建信号量以及如何封装起来才更方便,而是通过几个经典的同步实例,了解在多线程环境下如何解决这类线程同步问题。
个人站点:http://songlee24.github.com