Уильям Стивенс - UNIX: взаимодействие процессов
74 return(NULL);
75 }
Условие завершения единственного потока-потребителя звучит просто: он считает все потребленные объекты и останавливается по достижении nitems.
10.10. Несколько производителей, несколько потребителей
Следующее изменение, которое мы внесем в нашу пpoгрaммy, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей — зависит от приложения. Автор видел два примера, в которых использовался этот метод.
1. Пpoгрaммa преобразования IP-адресов в имена узлов. Каждый потребитель берет IP-адрес, вызывает gethostbyaddr (раздел 9.6 [24]), затем дописывает имя узла к файлу. Поскольку каждый вызов gethostbyaddr обрабатывается неопределенное время, порядок IP-адресов в буфере будет, скорее всего, отличаться от порядка имен узлов в файле, созданном потоками-потребителями. Преимущество этой схемы в параллельности выполнения вызовов gethostbyaddr (каждый из которых может работать несколько секунд) — по одному на каждый поток-потребитель.
ПРИМЕЧАНИЕ
Предполагается наличие версии gethostbyaddr, допускающей многократное вхождение, что не всегда верно. Если эта версия недоступна, можно хранить буфер в разделяемой памяти и использовать процессы вместо потоков.
2. Программа, принимающая дейтаграммы UDP, обрабатывающая их и записывающая результат в базу данных. Каждая дeйтaгрaммa обрабатывается одним потоком-потребителем, которые выполняются параллельно для ускорения процесса. Хотя дейтаграммы записываются в базу данных в порядке, вообще говоря, отличном от порядка их приема, встроенная схема упорядочения записей в базе данных справляется с этой проблемой.
В листинге 10.15 приведены глобальные переменные программы.
Листинг 10.15. Глобальные переменные//pxsem/prodcons4.с
1 #include "unpipc.h"
2 #define NBUFF 10
3 #define MAXNTHREADS 100
4 int nitems, nproducers, nconsumers; /* только для чтения */
5 struct { /* общие данные производителей и потребителей */
6 int buff[NBUFF];
7 int nput; /* номер объекта: 0, 1. 2, … */
8 int nputval; /* сохраняемое в buff[] значение */
9 int nget; /* номер объекта: 0, 1, 2, … */
10 int ngetval; /* получаемое из buff[] значение */
11 sem_t mutex, nempty, nstored; /* семафоры, а не указатели */
12 } shared;
13 void *produce(void *), *consume(void *);
Глобальные переменные и общая структура4-12 Количество потоков-потребителей является глобальной переменной, устанавливаемой из командной строки. В структуру shared добавилось два новых поля: nget — номер следующего объекта, получаемого одним из потоков-потребителей, и ngetval — соответствующее значение.
Функция main, текст которой приведен в листинге 10.16, запускает несколько потоков-потребителей и потоков-производителей одновременно.
19-23 Новый аргумент командной строки указывает количество потоков-потребителей. Для хранения идентификаторов потоков-потребителей выделяется место под специальный массив (tid_consume), а для подсчета обработанных каждым потоком объектов выделяется массив conscount.
24-50 Создаются несколько потоков-производителей и потребителей, после чего основной поток ждет их завершения.
Листинг 10.16. Функция main для версии с несколькими производителями и потребителями//pxsem/prodcons4.с
14 int
15 main(int argc, char **argv)
16 {
17 int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
18 pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
19 if (argc != 4)
20 err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");
21 nitems = atoi(argv[1]);
22 nproducers = min(atoi(argv[2]), MAXNTHREADS);
23 nconsumers = min(atoi(argv[3]), MAXNTHREADS);
24 /* инициализация трех семафоров */
25 Sem_init(&shared.mutex, 0, 1);
26 Sem_init(&shared.nempty, 0, NBUFF);
27 Sem_init(&shared.nstored, 0, 0);
28 /* создание производителей и потребителей */
29 Set_concurrency(nproducers + nconsumers);
30 for (i = 0; i < nproducers; i++) {
31 prodcount[i] = 0;
32 Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
33 }
34 for (i = 0; i < nconsumers; i++) {
35 conscount[i] = 0;
36 Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
37 }
38 /* ожидание завершения всех производителей и потребителей */
39 for (i = 0; i < nproducers: i++) {
40 Pthread_join(tid_produce[i], NULL);
41 printf("producer count[%d] = %dn", i, prodcount[i]);
42 }
43 for (i = 0; i < nconsumers; i++) {
44 Pthread_join(tid_consume[i], NULL);
45 printf("consumer count[%d] = %dn", i, conscount[i]);
46 }
47 Sem_destroy(&shared.mutex);
48 Sem_destroy(&shared.nempty);
49 Sem_destroy(&shared.nstored);
50 exit(0);
51 }
Функция produce содержит одну новую строку по сравнению с листингом 10.13. В части кода, относящейся к завершению потока-производителя, появляется строка, отмеченная знаком +:
if (shared.nput >= nitems) {
+ Sem_post(&shared.nstored); /* даем возможность потребителям завершить работу */
Sem_post(&shared.nempty);
Sem_post(&shared.mutex);
return(NULL); /* готово */
}
Снова нам нужно быть аккуратными при обработке завершения процессов-производителей и потребителей. После обработки всех объектов в буфере все потребители блокируются в вызове
Sem_wait(&shared.nstored); /* Ожидание помещения объекта в буфер */
Производителям приходится увеличивать семафор nstored для разблокирования потрeбитeлeй, чтобы они узнали, что работа завершена. Функция consume приведена в листинге 10.17.
Листинг 10.17. Функция, выполняемая всеми потоками-потребителями//pxsem/prodcons4.c
72 void *
73 consume(void *arg)
74 {
75 int i;
76 for (;;) {
77 Sem_wait(&shared.nstored); /* ожидание помещения объекта в буфер */
78 Sem_wait(&shared.mutex);
79 if (shared.nget >= nitems) {
80 Sem_post(&shared.nstored);
81 Sem_post(&shared.mutex);
82 return(NULL); /* готово */
83 }
84 i = shared.nget % NBUFF;
85 if (shared.buff[i] != shared.ngetval)
86 printf("error: buff[%d] = %dn", i, shared.buff[i]);
87 shared.nget++;
88 shared.ngetval++;
89 Sem_post(&shared.mutex);
90 Sem_post(&shared.nempty); /* освобождается место для элемента */
91 *((int *) arg) += 1;
92 }
93 }
Завершение потоков-потребителей79-83 Функция consume сравнивает nget и nitems, чтобы узнать, когда следует остановиться (аналогично функции produce). Обработав последний объект в буфере, потоки-потребители блокируются, ожидая изменения семафора nstored. Когда завершается очередной поток-потребитель, он увеличивает семафор nstored, давая возможность завершить работу другому потоку-потребителю.
10.11. Несколько буферов
Во многих программах, обрабатывающих какие-либо данные, можно встретить цикл вида
while ((n = read(fdin, buff, BUFFSIZE)) > 0) {
/* обработка данных */
write(fdout, buff, n);
}
Например, программы, обрабатывающие текстовые файлы, считывают строку из входного файла, выполняют с ней некоторые действия, а затем записывают строку в выходной файл. Для текстовых файлов вызовы read и write часто заменяются на функции стандартной библиотеки ввода-вывода fgets и fputs.
На рис. 10.11 изображена иллюстрация к такой схеме. Здесь функция reader считывает данные из входного файла, а функция writer записывает данные в выходной файл. Используется один буфер.
Рис. 10.10. Процесс считывает данные в буфер, а потом записывает его содержимое в другой файл
Рис. 10.11. Один процесс, считывающий данные в буфер и записывающий их в файл
На рис. 10.10 приведена временная диаграмма работы такой программы. Числа слева проставлены в условных единицах времени. Предполагается, что операция чтения занимает 5 единиц, записи — 7, а обработка данных между считыванием и записью требует 2 единицы времени.
Можно изменить это приложение, разделив процесс на отдельные потоки, как показано на рис. 10.12. Здесь используется два потока (а не процесса), поскольку глобальный буфер автоматически разделяется между ними. Мы могли бы разделить приложение и на два процесса, но это потребовало бы использования разделяемой памяти, с которой мы еще не знакомы.
Рис. 10.12. Разделение копирования файла между двумя потоками
Разделение операций между потоками (или процессами) требует использования какой-либо формы уведомления между ними. Считывающий поток должен уведомлять записывающий о готовности буфера к операции записи, а записывающий должен уведомлять считывающий о том, что буфер пуст и его можно заполнять снова. На рис. 10.13 изображена временная диаграмма для новой схемы.