Уильям Стивенс - UNIX: взаимодействие процессов
//pxsem/prodcons2.c
1 #include "unpipc.h"
2 #define NBUFF 10
3 int nitems; /* только для чтения производителем и потребителем */
4 struct { /* общие данные производителя и потребителя */
5 int buff[NBUFF];
6 sem_t mutex, nempty, nstored; /* семафоры, а не указатели */
7 } shared;
8 void *produce(void *), *consume(void *);
9 int
10 main(int argc, char **argv)
11 {
12 pthread_t tid_produce, tid_consume;
13 if (argc != 2)
14 err_quit("usage: prodcons2 <#items>");
15 nitems = atoi(argv[1]);
16 /* инициализация трех семафоров */
17 Sem_init(&shared.mutex, 0, 1);
18 Sem_init(&shared.nempty, 0, NBUFF);
19 Sem_init(&shared.nstored, 0, 0);
20 Set_concurrency(2);
21 Pthread_create(&tid_produce, NULL, produce, NULL);
22 Pthread_create(&tid_consume, NULL, consume, NULL);
23 Pthread_join(tid_produce, NULL);
24 Pthread_join(tid_consume, NULL):
25 Sem_destroy(&shared.mutex);
26 Sem_destroy(&shared.nempty):
27 Sem_destroy(&shared.nstored);
28 exit(0);
29 }
30 void *
31 produce(void *arg)
32 {
33 int i;
34 for (i = 0; i < nitems; i++) {
35 Sem_wait(&shared.nempty); /* ожидание одного свободного поля */
36 Sem_wait(&shared.mutex);
37 shared.buff[i % NBUFF] = i; /* помещение i в циклический буфер */
38 Sem_post(&shared.mutex);
39 Sem_post(&shared.nstored); /* поместили еще один элемент */
40 }
41 return(NULL);
42 }
43 void *
44 consume(void *arg)
45 {
46 int i;
47 for (i = 0; i < nitems; i++) {
48 Sem_wait(&shared.nstored); /* ожидаем появления хотя бы одного готового для обработки элемента */
49 Sem_wait(&shared.mutex);
50 if (shared.buff[i % NBUFF] != i)
51 printf("buff[*d] = *dn", i, shared.buff[i % NBUFF]);
52 Sem_post(&shared.mutex);
53 Sem_post(&shared.nempty); /* еще одно пустое поле */
54 }
55 return(NULL);
56 }
Выделение семафоров6 Мы объявляем три семафора типа sem_t, и теперь это сами семафоры, а не указатели на них.
Вызов sem_init16-27 Мы вызываем sem_init вместо sem_open* а затем sem_destroy вместо sem_unlink. Вызывать sem_destroy на самом деле не требуется, поскольку программа все равно завершается.
Остальные изменения обеспечивают передачу указателей на три семафора при вызовах sem_wait и sem_post.
10.9. Несколько производителей, один потребитель
Решение в разделе 10.6 относится к классической задаче с одним производителем и одним потребителем. Новая, интересная модификация программы позволит нескольким производителям работать с одним потребителем. Начнем с решения из листинга 10.11, в котором использовались размещаемые в памяти семафоры. В листинге 10.12 приведены объявления глобальных переменных и функция main.
Листинг 10.12. Функция main задачи с несколькими производителями//pxsem/prodcons3.c
1 #include "unpipc.h"
2 #define NBUFF 10
3 #define MAXNTHREADS 100
4 int nitems, nproducers; /* только для чтения производителем и потребителем */
5 struct { /* общие данные */
6 int buff[NBUFF];
7 int nput;
8 int nputval;
9 sem_t mutex, nempty, nstored; /* семафоры, а не указатели */
10 } shared;
11 void *produce(void *), *consume(void *);
12 int
13 main(int argc, char **argv)
14 {
15 int i, count[MAXNTHREADS];
16 pthread_t tid_produce[MAXNTHREADS], tid_consume;
17 if (argc != 3)
18 err_quit("usage: prodcons3 <#items> <#producers>");
19 nitems = atoi(argv[1]);
20 nproducers = min(atoi(argv[2]), MAXNTHREADS);
21 /* инициализация трех семафоров */
22 Sem_init(&shared.mutex, 0, 1);
23 Sem_init(&shared.nempty, 0, NBUFF);
24 Sem_init(&shared.nstored, 0, 0);
25 /* создание всех производителей и одного потребителя */
26 Set_concurrency(nproducers + 1);
27 for (i = 0; i < nproducers; i++) {
28 count[i] = 0;
29 Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
30 }
31 Pthread_create(&tid_consume, NULL, consume, NULL);
32 /* ожидание завершения всех производителей и потребителя */
33 for (i = 0; i < nproducers; i++) {
34 Pthread_join(tid_produce[i], NULL);
35 printf("count[%d] = %dn", i, count[i]);
36 }
37 Pthread_join(tid_consume, NULL);
38 Sem_destroy(&shared.mutex);
39 Sem_destroy(&shared.nempty);
40 Sem_destroy(&shared.nstored);
41 exit(0);
42 }
Глобальные переменные4 Глобальная переменная nitems хранит число элементов, которые должны быть совместно произведены. Переменная nproducers хранит число потоков-производителей. Оба эти значения устанавливаются с помощью аргументов командной строки.
Общая структура5-10 В структуру shared добавляются два новых элемента: nput, обозначающий индекс следующего элемента, куда должен быть помещен объект (по модулю BUFF), и nputval —следующее значение, которое будет помещено в буфер. Эти две переменные взяты из нашего решения в листингах 7.1 и 7.2. Они нужны для синхронизации нескольких потоков-производителей.
Новые аргументы командной строки17-20 Два новых аргумента командной строки указывают полное количество элементов, которые должны быть помещены в буфер, и количество потоков-производителей.
Запуск всех потоков21-41 Инициализируем семафоры и запускаем потоки-производители и поток-потребитель. Затем ожидается завершение работы потоков. Эта часть кода практически идентична листингу 7.1.
В листинге 10.13 приведен текст функции produce, которая выполняется каждым потоком-производителем.
Листинг 10.13. Функция, выполняемая всеми потоками-производителями//pxsem/prodcons3.c
43 void *
44 produce(void *arg)
45 {
46 for (;;) {
47 Sem_wait(&shared.nempty); /* ожидание освобождения поля */
48 Sem_wait(&shared.mutex);
49 if (shared.nput >= nitems) {
50 Sem_post(&shared.nempty);
51 Sem_post(&shared.mutex);
52 return(NULL); /* готово */
53 }
54 shared.buff[shared.nput % NBUFF] = shared.nputval;
55 shared.nput++;
56 shared.nputval++;
57 Sem_post(&shared.mutex);
58 Sem_post(&shared.nstored); /* еще один элемент */
59 *((int *) arg) += 1;
60 }
61 }
Взаимное исключение между потоками-производителями49-53 Отличие от листинга 10.8 в том, что цикл завершается, когда nitems объектов будет помещено в буфер всеми потоками. Обратите внимание, что потоки-производители могут получить семафор nempty в любой момент, но только один производитель может иметь семафор mutex. Это защищает переменные nput и nval от одновременного изменения несколькими производителями.
Завершение производителей50-51 Нам нужно аккуратно обработать завершение потоков-производителей. После того как последний объект помещен в буфер, каждый поток выполняет
Sem_wait(&shared.nempty); /* ожидание пустого поля */
в начале цикла, что уменьшает значение семафора nempty. Но прежде, чем поток будет завершен, он должен увеличить значение этого семафора, потому что он не помещает объект в буфер в последнем проходе цикла. Завершающий работу поток должен также освободить семафор mutex, чтобы другие производители смогли продолжить функционирование. Если мы не увеличим семафор nempty по завершении процесса и если производителей будет больше, чем мест в буфере, лишние потоки будут заблокированы навсегда, ожидая освобождения семафора nempty, и никогда не завершат свою работу.
Функция consume в листинге 10.14 проверяет правильность всех записей в буфере, выводя сообщение при обнаружении ошибки.
Листинг 10.14. Функция, выполняемая потоком-потребителем//pxsem/prodcons3.с
62 void *
63 consume(void *arg)
64 {
65 int i;
66 for (i = 0; i < nitems; i++) {
67 Sem_wait(&shared.nstored); /* ожидание помещения по крайней мере одного элемента в буфер */
68 Sem_wait(&shared.mutex);
69 if (shared.buff[i % NBUFF] != i)
70 printf("error: buff[%d] = %dn", i, shared.buff[i % NBUFF]);
71 Sem_post(&shared.mutex);
72 Sem_post(&shared.nempty); /* еще одно пустое поле */
73 }
74 return(NULL);
75 }
Условие завершения единственного потока-потребителя звучит просто: он считает все потребленные объекты и останавливается по достижении nitems.
10.10. Несколько производителей, несколько потребителей
Следующее изменение, которое мы внесем в нашу пpoгрaммy, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей — зависит от приложения. Автор видел два примера, в которых использовался этот метод.