Уильям Стивенс - UNIX: взаимодействие процессов
59-62 Потребитель проверяет правильность значений всех элементов массива и выводит сообщение в случае обнаружения ошибки. Как уже говорилось, эта функция запускается в единственном экземпляре и только после того, как все потоки-производители завершат свою работу, так что надобность в синхронизации отсутствует.
При запуске только что описанной программы с пятью процессами-производителями, которые должны вместе создать один миллион элементов данных, мы получим следующий результат:
solaris % prodcons2 1000000 5
count[0] = 167165
count[1] = 249891
count[2] = 194221
count[3] = 191815
count[4] = 196908
Как мы отмечали ранее, если убрать вызов set_concurrency, в системе Solaris 2.6 значение count[0] будет 1000000, а все остальные счетчики будут нулевыми.
Если убрать из этого примера блокировку с помощью взаимного исключения, он перестанет работать, как и предполагается. Потребитель обнаружит множество элементов buff[i], значения которых будут отличны от i. Также мы можем убедиться, что удаление блокировки ничего не изменит, если будет выполняться только один поток.
7.4. Блокировка и ожидание
Продемонстрируем теперь, что взаимные исключения предназначены для блокирования, но не для ожидания. Изменим наш пример из предыдущего раздела таким образом, чтобы потребитель запускался сразу же после запуска всех производителей. Это даст возможность потребителю обрабатывать данные по мере их формирования производителями в отличие от пpoгрaммы в листинге 7.1, в которой потребитель не запускался до тех пор, пока все производители не завершали свою работу. Теперь нам придется синхронизовать потребителя с производителями, чтобы первый обрабатывал только данные, уже сформированные последними.
В листинге 7.3 приведен текст функции main. Начало кода (до объявления функции main) не претерпело никаких изменений по сравнению с листингом 7.1.
Листинг 7.3. Функция main: запуск потребителя сразу после запуска производителей//mutex/prodcons3.c
14 int
15 main(int argc, char **argv)
16 {
17 int i, nthreads, count[MAXNTHREADS];
18 pthread_t tid_produce[MAXNTHREADS], tid_consume;
19 if (argc != 3)
20 err_quit("usage: prodcons3 <#items> <#threads>");
21 nitems = min(atoi(argv[1]), MAXNITEMS);
22 nthreads = min(atoi(argv[2]), MAXNTHREADS);
23 /* создание всех производителей и одного потребителя */
24 Set_concurrency(nthreads + 1);
25 for (i = 0; i < nthreads; i++) {
26 count[i] = 0;
27 Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
28 }
29 Pthread_create(&tid_consume, NULL, consume, NULL);
30 /* ожидание завершения производителей и потребителя */
31 for (i = 0; i < nthreads; i++) {
32 Pthread_join(tid_produce[i], NULL);
33 printf("count[%d] = %dn", i, count[i]);
34 }
35 Pthread_join(tid_consume, NULL);
36 exit(0);
37 }
24 Мы увеличиваем уровень параллельного выполнения на единицу, чтобы учесть поток-потребитель, выполняемый параллельно с производителями.
25-29 Поток-потребитель создается сразу же после создания потоков-производителей.
Функция produce по сравнению с листингом 7.2 не изменяется. В листинге 7.4 приведен текст функции consume, вызывающей новую функцию consume_wait.
Листинг 7.4. Функции consume и consume_wait//mutex/prodcons3.с
54 void
55 consume wait(int i)
56 {
57 for (;;) {
58 Pthread_mutex_lock(&shared.mutex);
59 if (i < shared.nput) {
60 Pthread_mutex_unlock(&shared.mutex);
61 return; /* элемент готов */
62 }
63 Pthread_mutex_unlock(&shared.mutex);
64 }
65 }
66 void *
67 consume(void *arg)
68 {
69 int i;
70 for (i = 0; i < nitems; i++) {
71 consume_wait(i);
72 if (shared.buff[i] != i)
73 printf("buff[%d] = %dn", i, shared.buff[i]);
74 }
75 return(NULL);
76 }
Потребитель должен ждать71 Единственное изменение в функции consume заключается в добавлении вызова consume_wait перед обработкой следующего элемента массива.
Ожидание производителей57-64 Наша функция consume_wait должна ждать, пока производители не создадут i-й элемент. Для проверки этого условия производится блокировка взаимного исключения и значение i сравнивается с индексом производителя nput. Блокировка необходима, поскольку значение nput может быть изменено одним из производителей в момент его проверки.
Главная проблема — что делать, если нужный элемент еще не готов. Все, что нам остается и что мы делаем в листинге 7.4, — это повторять операции в цикле, устанавливая и снимая блокировку и проверяя значение индекса. Это называется опросом (spinning или polling) и является лишней тратой времени процессора.
Мы могли бы приостановить выполнение процесса на некоторое время, но мы не знаем, на какое. Что нам действительно нужно — это использовать какое-то другое средство синхронизации, позволяющее потоку или процессу приостанавливать работу, пока не произойдет какое-либо событие.
7.5. Условные переменные: ожидание и сигнализация
Взаимное исключение используется для блокирования, а условная переменная — для ожидания. Это два различных средства синхронизации, и оба они нужны. Условная переменная представляет собой переменную типа pthread_cond_t. Для работы с такими переменными предназначены две функции:
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *cptr, pthread_m_tex_t *mptr);
int pthread_cond_signal(pthread_cond_t *cptr);
/* Обе функции возвращают 0 в случае успешного завершения, положительное значение Еххх – в случае ошибки */
Слово signal в имени второй функции не имеет никакого отношения к сигналам Unix SIGxxx.
Мы определяем условие, уведомления о выполнении которого будем ожидать.
Взаимное исключение всегда связывается с условной переменной. При вызове pthread_cond_wait для ожидания выполнения какого-либо условия мы указываем адрес условной переменной и адрес связанного с ней взаимного исключения.
Мы проиллюстрируем использование условных переменных, переписав пример из предыдущего раздела. В листинге 7.5 объявляются глобальные переменные.
Переменные производителя и взаимное исключение объединяются в структуру7-13 Две переменные nput и rival ассоциируются с mutex, и мы объединяем их в структуру с именем put. Эта структура используется производителями.
14-20 Другая структура, nready, содержит счетчик, условную переменную и взаимное исключение. Мы инициализируем условную переменную с помощью PTHREAD_ COND_INITIALIZER.
Функция main по сравнению с листингом 7.3 не изменяется.
Листинг 7.5. Глобальные переменные: использование условной переменной//mutex/prodcons6.c
1 #include "unpipc.h"
2 #define MAXNITEMS 1000000
3 #define MAXNTHREADS 100
4 /* глобальные переменные для всех потоков */
5 int nitems; /* только для чтения потребителем и производителем */
6 int buff[MAXNITEMS];
7 struct {
8 pthread_mutex_t mutex;
9 int nput; /* следующий сохраняемый элемент */
10 int nval; /* следующее сохраняемое значение */
11 } put = {
12 PTHREAD_MUTEX_INITIALIZER
13 };
14 struct {
15 pthread_mutex_t mutex:
16 pthread_cond_t cond;
17 int nready; /* количество готовых для потребителя */
18 } nready = {
19 PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER
20 };
Функции produce и consume претерпевают некоторые изменения. Их текст дан в листинге 7.6.
Листинг 7.6. Функции produce и consume//mutex/prodcons6.c
46 void *
47 produce(void *arg)
48 {
49 for (;;) {
50 Pthread_mutex_lock(&put.mutex);
51 if (put.nput >= nitems) {
52 Pthread_mutex_unlock(&put.mutex);
53 return(NULL); /* массив заполнен, готово */
54 }
55 buff[put.nput] = put.nval;
56 put.nput++;
57 put.nval++;
58 Pthread_mutex_unlock(&put.mutex);
59 Pthread_mutex_lock(&nready.mutex):
60 if (nready.nready == 0)
61 Pthread_cond_signal(&nready.cond);
62 nready.nready++;
63 Pthread_mutex_unlock(&nready.mutex);
64 *((int *) arg) += 1;
65 }
66 }
67 void*
68 consume(void *arg)
69 {
70 int i;
71 for (i = 0; i < nitems; i++) {
72 Pthread_mutex_lock(&nready.mutex);
73 while (nready.nready == 0)
74 Pthread_cond_wait(&nready.cond, &nready.mutex);
75 nready.nready--;