Halo

A magic place for coding

0%

操作系统实验4--同步互斥问题

介绍

  同步互斥问题是操作系统学科中一个很重要的问题,要讲明白同步互斥问题,我先来介绍竞争状态。所谓竞争状态,是指多个进程对于同一块数据进行访问,其访问顺序会影响执行结果。对于这一类数据,我们需要对访问它的进程作限制,即保持该数据访问的互斥性。但是,如果我们只是读取该数据,访问顺序将不会影响执行结果;如果是,这就是同步互斥问题的主要内容。

  解决同步互斥问题有三个要求:

  • 互斥性:即当一个进程访问临界资源的时候,互斥的进程不能访问该数据。
  • 有空让进:当临界资源没有被占有的时候,在等待的进程可以访问到该资源,这里涉及到进程的调度算法。
  • 有限等待:一个进程请求资源的等待时间是有限的。等待方式有忙等待让权等待。忙等待指该进程占有CPU等待访问该资源,优点是可以降低系统的多道程序的度,缺点是占有CPU。让权等待指请求资源的进程让出CPU,直至该资源可以被占有。优点是可以让出CPU用作其他地方,缺点是需要进行上下文切换,可能引起系统抖动现象。

  这个实验包括两个很经典的问题:

  • (1)生产者-消费者问题
  • (2)读者-写者问题

生产者-消费者问题

问题要求

  设计一个程序来解决有限缓冲问题,其中的生产者与消费者进程如图6.10 与图6.11 所示。
  在6.6.1 小节中,使用了三个信号量: empty(以记录有多少空位)、full(以记录有多少满位)以及mutex(二进制信号量或互斥信号量,以保护对缓冲插入与删除的操作)。对于本项目,empty与full将采用标准计数信号量,而mutex将采用二进制信号量。生产者与消费者作为独立线程,在empty、full、mutex 的同步前提下,对缓冲进行插入与删除。

输入格式

  测试数据文件包括n行测试数据,分别描述创建的n个线程是生产者还是消费者,以及生产者或消费者存放或取产品的开始时间和持续时间。每行测试数据包括四个字段,各个字段间用空格分隔。第一字段为一个正整数,表示线程序号。第二字段表示相应线程角色,P表示生产者,C表示消费者。第三字段为一个正数,表示存放或取出操作的开始时间:线程创建后,延迟相应时间(单位为秒)后发出对共享资源的使用申请。第四字段为一个正数,表示操作的持续时间。第五字段为一个正数(仅生产者有),表示生产的产品号。当线程申请成功后,开始对共享资源的操作,该操作持续相应时间后结束,并释放共享资源。

输入

1 C 3 5
2 P 4 5 1
3 C 5 2
4 C 6 5
5 P 7 3 2
6 P 8 4 3

  注意:在创建数据文件时,由于涉及到文件格式问题,最好在记事本中手工逐个键入数据,而不要拷贝粘贴数据,否则,本示例程序运行时可能会出现不可预知的错误。

代码实现

buffer.h文件如下:

1
2
typedef int buffer_item;
#define BUFFER_SIZE 5

pc.cpp文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#include "buffer.h"
#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>

// empty: for producer
// full: for consumer
// mutex: buffer
sem_t empty, full, mutex;

/* the buffer */
buffer_item buffer[BUFFER_SIZE];

int nextp = 0;
int nextc = 0;

/* data struct */
struct cmd
{
int pid;
char type;
int startTime;
int lastTime;
int num;
};

// insert operation
int insert_item(buffer_item item) {
buffer[nextp] = item;
nextp = (nextp + 1) % BUFFER_SIZE;
return 0;
}

// remove operation
int remove_item(buffer_item* item) {
*item = buffer[nextc];
buffer[nextc] = 0;
nextc = (nextc + 1) % BUFFER_SIZE;
return 0;
}

/* producer */
void *producer(void *param) {
struct cmd* item = (struct cmd*)param;

while (1) {
sleep(item->startTime);
sem_wait(&empty); // There is empty place
sem_wait(&mutex);

// critical section
// add nextp to buffer

insert_item(item->num);
printf("Producer No.%d produces product No.%d\n", item->pid, item->num);

sleep(item->lastTime);
sem_post(&mutex);
sem_post(&full); // One more available commodity

pthread_exit(0);
}
}

/* consumer */
void *consumer(void* param) {
struct cmd* item = (struct cmd*)param;

while (1) {
sleep(item->startTime);
sem_wait(&full); // There is commodity.
sem_wait(&mutex);

// critical section
// remove an item from buffer to nextc

remove_item(&item->num);
printf("Consumer No.%d consumes product No.%d\n", item->pid, item->num);

sleep(item->lastTime);
sem_post(&mutex);
sem_post(&empty); // consume one commodity

pthread_exit(0);
}
}

int main(int argc, char const *argv[]) {
/* code */
int threadNum = atoi(argv[1]);
struct cmd commands[threadNum];
pthread_t pid[threadNum];

/* initialize the semaphores */
sem_init(&empty, 0, BUFFER_SIZE);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1);

int nextp = 0;
int nextc = 0;

for (int i = 0; i < BUFFER_SIZE; i++) {
buffer[i] = 0;
}

// input
char text[256];
int j = 0;
char ch;
// Read from file
FILE *fpRead=fopen("test.txt","r");
if(fpRead==NULL)
{
return 0;
}

while ((ch=fgetc(fpRead))!=EOF) {
if (ch != ' ' && ch != '\n' && ch != '\r')
text[j++] = ch;
}

j = 0;
for (int i = 0; i < threadNum; i++) {
commands[i].pid = text[j++] - '0';
commands[i].type = text[j++];
commands[i].startTime = text[j++] - '0';
commands[i].lastTime = text[j++] - '0';
if (commands[i].type == 'P') {
commands[i].num = text[j++] - '0';
}
}

// generate threads
for (int i = 0; i < threadNum; i++) {
// genrate consumers
if (commands[i].type == 'C') {
pthread_create(&pid[i], NULL, consumer, &commands[i]);
printf("Create Consumer\n");
}
// generate producers
else {
pthread_create(&pid[i], NULL, producer, &commands[i]);
printf("Create Producer\n");
}
}

// execute threads
for (int i = 0; i < threadNum; i++) {
pthread_join(pid[i], NULL);
}

// destroys semaphores
sem_destroy(&mutex);
sem_destroy(&full);
sem_destroy(&empty);
return 0;
}

  编译执行上述代码,结果如下:
Producer-Consumer
  运行两次有不同的顺序,可以从生产的商品顺序以及消费的商品顺序看出同步互斥的现象。

读者-写者问题

问题要求

  在Linux环境下,创建一个进程,此进程包含n个线程。用这n个线程来表示n个读者或写者。每个线程按相应测试数据文件(后面有介绍)的要求进行读写操作。用信号量机制分别实现读者优先和写者优先的读者-写者问题。

  读者-写者问题的读写操作限制(仅读者优先或写者优先):

  • 1)写-写互斥,即不能有两个写者同时进行写操作。
  • 2)读-写互斥,即不能同时有一个线程在读,而另一个线程在写。
  • 3)读-读允许,即可以有一个或多个读者在读。

  读者优先的附加限制:如果一个读者申请进行读操作时已有另一个读者正在进行读操作,则该读者可直接开始读操作。
  写者优先的附加限制:如果一个读者申请进行读操作时已有另一写者在等待访问共享资源,则该读者必须等到没有写者处于等待状态后才能开始读操作。
  运行结果显示要求:要求在每个线程创建、发出读写操作申请、开始读写操作和结束读写操作时分别显示一行提示信息,以确定所有处理都遵守相应的读写操作限制。

输入格式

  测试数据文件包括n行测试数据,分别描述创建的n个线程是读者还是写者,以及读写操作的开始时间和持续时间。每行测试数据包括四个字段,各个字段间用空格分隔。第一字段为一个正整数,表示线程序号。第二字段表示相应线程角色,R表示读者,W表示写者。第三字段为一个正数,表示读写操作的开始时间:线程创建后,延迟相应时间(单位为秒)后发出对共享资源的读写申请。第四字段为一个正数,表示读写操作的持续时间。当线程读写申请成功后,开始对共享资源的读写操作,该操作持续相应时间后结束,并释放共享资源

输入

1 R 3 5
2 W 4 5
3 R 5 2
4 R 6 5
5 W 7 3

读者优先

  读者优先指的是除非有写者在写文件,否则读者不需要等待。所以可以用一个整型变量read_count记录当前的读者数目,用于确定是否需要释放正在等待的写者线程(当read_count=0时,表明所有的读者读完,需要释放写者等待队列中的一个写者)。每一个读者开始读文件时,必须修改read_count变量。因此需要一个互斥对象mutex来实现对全局变量read_count修改时的互斥。
  另外,为了实现写-写互斥,需要增加一个临界区对象write。当写者发出写请求时,必须申请临界区对象的所有权。通过这种方法,也可以实现读-写互斥,当read_count=1时(即第一个读者到来时),读者线程也必须申请临界区对象的所有权。
  当读者拥有临界区的所有权时,写者阻塞在临界区对象write上。当写者拥有临界区的所有权时,第一个读者判断完“read_count==1”后阻塞在write上,其余的读者由于等待对read_count的判断,阻塞在mutex上。

ReaderWriter.cpp文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

int source; // shared data
int read_count = 0; // total readers

// mutex: read_count
// wrt: to make writers waiting if readers are reading
sem_t mutex, wrt;

struct cmd {
int pid;
char type;
int startTime;
int lastTime;
};

// write operations
void write_data() {
int old_source = source;
source++;
printf("Writing: from %d to %d.\n", old_source, source);
}

// read operations
void read_data() {
printf("Reading: source value is %d\n", source);
}

/* reader process */
void *reader(void *param) {
struct cmd* item = (struct cmd*)param;
while (1) {
sleep(item->startTime);
printf("Reader%d request to read.\n", item->pid);
sem_wait(&mutex);

// critical section
read_count++;
if (read_count == 1) {
sem_wait(&wrt); // This prioritize the reader!
}

sem_post(&mutex);
printf("Reader%d begins to read.\n", item->pid);

// reading
read_data();

sleep(item->lastTime);
printf("Reader%d finishes reading.\n", item->pid);
sem_wait(&mutex);

// critical section
read_count--;
// If there is no reader, let writer in
if (read_count == 0) {
sem_post(&wrt);
}

sem_post(&mutex);

pthread_exit(0);
}
}

/* writer process */
void *writer(void *param) {
struct cmd* item = (struct cmd*)param;
while (1) {
sleep(item->startTime);
printf("Writer%d request to write.\n", item->pid);
sem_wait(&wrt);
printf("Writer%d begins to write.\n", item->pid);

// writing
write_data();

sleep(item->lastTime);
printf("Writer%d finishes writing.\n", item->pid);
sem_post(&wrt);
pthread_exit(0);
}

}

int main(int argc, char const *argv[])
{
/* code */
int threadNum = atoi(argv[1]);
struct cmd commands[threadNum];
pthread_t pid[threadNum];

/* initialize the semaphores */
sem_init(&wrt, 0, 1);
sem_init(&mutex, 0, 1);

// input
char text[256];
int j = 0;
char ch;

// Read from file
FILE *fpRead=fopen("test2.txt","r");
if(fpRead==NULL)
{
return 0;
}

while ((ch=fgetc(fpRead))!=EOF) {
if (ch != ' ' && ch != '\n' && ch != '\r')
text[j++] = ch;
}

j = 0;
for (int i = 0; i < threadNum; i++) {
commands[i].pid = text[j++] - '0';
commands[i].type = text[j++];
commands[i].startTime = text[j++] - '0';
commands[i].lastTime = text[j++] - '0';
}

// generate threads
for (int i = 0; i < threadNum; i++) {
// genrate readers
if (commands[i].type == 'R') {
pthread_create(&pid[i], NULL, reader, &commands[i]);
printf("Create Reader\n");
}
// generate writers
else {
pthread_create(&pid[i], NULL, writer, &commands[i]);
printf("Create Writer\n");
}
}
// execute threads
for (int i = 0; i < threadNum; i++) {
pthread_join(pid[i], NULL);
}

// destroys semaphores
sem_destroy(&mutex);
sem_destroy(&wrt);
return 0;
}

写者优先

  写者优先与读者优先类似。不同之处在于一旦一个写者到来,它应该尽快对文件进行写操作,如果有一个写者在等待,则新到来的读者不允许进行读操作。为此应当添加一个整型变量write_count,用于记录正在等待的写者的数目,当write_count=0时,才可以释放等待的读者线程队列。
  为了对全局变量write_count实现互斥,必须增加一个互斥对象mutex2。
  为了实现写者优先,应当添加一个临界区对象read,当有写者在写文件或等待时,读者必须阻塞在read上。同样,有读者读时,写者必须等待。于是,必须有一个互斥对象RW_mutex来实现这个互斥。
有写者在写时,写者必须等待。
  读者线程要对全局变量read_count实现操作上的互斥,必须有一个互斥对象命名为mutex1。

ReaderWriter2.cpp文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

int source; // shared data

int write_count; // total writers
int read_count; // total readers

// mutex1: read_count
// mutex2: write_count
// RW_mutex: source
// rd: to make readers waiting if writers are writing
sem_t mutex1, mutex2, rd, RW_mutex;

struct cmd {
int pid;
char type;
int startTime;
int lastTime;
};

// write operation
void write_data() {
int old_source = source;
source++;
printf("Writing: from %d to %d.\n", old_source, source);
}

// read operation
void read_data() {
printf("Reading: source value is %d\n", source);
}

/* reader process */
void *reader(void *param) {
struct cmd* item = (struct cmd*)param;
while (1) {
sleep(item->startTime);
printf("Reader %d request to read.\n", item->pid);

sem_wait(&rd); // This is important to prioritize writer!
sem_wait(&mutex1);

// critical section
read_count++;
if (read_count == 1) {
sem_wait(&RW_mutex);
}

sem_post(&mutex1);
sem_post(&rd);

// read
printf("Reader %d begins to read.\n", item->pid);
read_data();

sleep(item->lastTime);
printf("Reader %d finishes reading.\n", item->pid);

sem_wait(&mutex1);

// critical section
read_count--;
// If there is no reader, let writer in
if (read_count == 0) {
sem_post(&RW_mutex);
}

sem_post(&mutex1);
pthread_exit(0);
}
}

/* writer process */
void *writer(void *param) {
struct cmd* item = (struct cmd*)param;
while (1) {
sleep(item->startTime);
printf("Writer %d request to write.\n", item->pid);
sem_wait(&mutex2);

// critical section
write_count++;
if (write_count == 1) {
sem_wait(&rd); // Writer has the priority!
}
sem_post(&mutex2);

// If there is no other readers or writer accessing the data
sem_wait(&RW_mutex);

// critical section
printf("Writer %d begins to write\n", item->pid);
write_data();

sleep(item->lastTime);
printf("Writer %d finishes writing\n", item->pid);
sem_post(&RW_mutex);

sem_wait(&mutex2);

// critical section
write_count--;
// If there is no writers, let reader in
if (write_count == 0) {
sem_post(&rd);
}
sem_post(&mutex2);
pthread_exit(0);
}
}


int main(int argc, char const *argv[]) {
/* code */
int threadNum = atoi(argv[1]);
struct cmd commands[threadNum];
pthread_t pid[threadNum];

/* initialize the semaphores */
sem_init(&mutex1, 0, 1);
sem_init(&mutex2, 0, 1);
sem_init(&RW_mutex, 0, 1);
sem_init(&rd, 0, 1);

// input
char text[256];
int j = 0;
char ch;

// Read from file
FILE *fpRead=fopen("test2.txt","r");
if(fpRead==NULL)
{
printf("File not exist!\n");
return 0;
}

while ((ch=fgetc(fpRead))!=EOF) {
if (ch != ' ' && ch != '\n' && ch != '\r')
text[j++] = ch;
}

j = 0;
for (int i = 0; i < threadNum; i++) {
commands[i].pid = text[j++] - '0';
commands[i].type = text[j++];
commands[i].startTime = text[j++] - '0';
commands[i].lastTime = text[j++] - '0';
}

// generate threads
for (int i = 0; i < threadNum; i++) {
// genrate readers
if (commands[i].type == 'R') {
pthread_create(&pid[i], NULL, reader, &commands[i]);
printf("Create Reader\n");
}
// generate writers
else {
pthread_create(&pid[i], NULL, writer, &commands[i]);
printf("Create Writer\n");
}
}
// execute threads
for (int i = 0; i < threadNum; i++) {
pthread_join(pid[i], NULL);
}

// destroys semaphores
sem_destroy(&mutex1);
sem_destroy(&mutex2);
sem_destroy(&rd);
sem_destroy(&RW_mutex);
return 0;
}

结果分析

读者优先结果:
读者优先
写者优先结果:
写者优先
  可以看出,读者优先的情况下,当有读者在读的时候,后续的读者同样可以对数据进行读操作,仅当没有读者的时候,写者才进入临界区进行写操作。
  写者优先的情况下,当有写者在等待时,新进来的读者不允许进行读操作。

小结

  同步互斥问题主要依赖于信号量来解决,我们需要合理地使用信号量,同步与互斥的信号量的顺序摆放也会决定死锁是否会出现。这两个问题只是很简单的同步互斥问题,但它们涵盖了这个问题的精髓。日后有机会再和大家分享有关的内容,谢谢!

Welcome to my other publishing channels