Уильям Стивенс - UNIX: взаимодействие процессов
Рис. 10.13. Копирование файла двумя потоками
Предполагается, что для обработки данных в буфере требуется две единицы времени. Важно отметить, что разделение чтения и записи между двумя потоками ничуть не ускорило выполнение операции копирования в целом. Мы не выиграли в скорости, мы просто распределили выполнение задачи между двумя потоками (или процессами).
В этих диаграммах мы игнорируем множество тонкостей. Например, большая часть ядер Unix выявляет операцию последовательного считывания файла и осуществляет асинхронное упреждающее чтение следующего блока данных еще до поступления запроса. Это может ускорить работу процесса, считывающего данные. Мы также игнорируем влияние других процессов на наши считывающий и записывающий потоки, а также влияние алгоритмов разделения времени, реализованных в ядре.
Следующим шагом будет использование двух потоков (или процессов) и двух буферов. Это называется классическим решением с двойной буферизацией; схема его изображена на рис. 10.14.
Рис. 10.14. Копирование файла двумя потоками с двумя буферами
На нашем рисунке считывающий поток помещает данные в первый буфер, а записывающий берет их из второго. После этого потоки меняются местами.
На рис. 10.15 изображена временная диаграмма процесса с двойной буферизацией. Считывающий поток помещает данные в буфер № 1, а затем уведомляет записывающий о том, что буфер готов к обработке. Затем считывающий процесс помещает данные в буфер № 2, а записывающий берет их из буфера № 1.
В любом случае, мы ограничены скоростью выполнения самой медленной операции — операции записи. После выполнения первых двух операций считывания серверу приходится ждать две дополнительные единицы времени, составляющие разницу в скорости выполнения операций чтения и записи. Тем не менее для нашего гипотетического примера полное время работы будет сокращено почти вдвое.
Обратите внимание, что операции записи выполняются так быстро, как только возможно. Они разделены промежутками времени всего лишь в 2 единицы, тогда как в предыдущих примерах между ними проходило 9 единиц времени (рис. 10.10 и 10.13). Это может оказаться выгодным при работе с некоторыми устройствами типа накопителей на магнитной ленте, которые функционируют быстрее, если данные записываются с максимально возможной скоростью (это называется потоковым режимом — streaming mode).
Рис. 10.15. Процесс с двойной буферизацией
Интересно, что задача с двойной буферизацией представляет собой лишь частный случай общей задачи производителей и потребителей.
Изменим нашу программу так, чтобы использовать несколько буферов. Начнем с решения из листинга 10.11, в котором использовались размещаемые в памяти семафоры. Мы получим даже не двойную буферизацию, а работу с произвольным числом буферов (задается NBUFF). В листинге 10.18 даны глобальные переменные и функция main.
Листинг 10.18. Глобальные переменные и функция main//pxsem/mycat2.c
1 #include "unpipc.h"
2 #define NBUFF 8
3 struct { /* общие данные */
4 struct {
5 char data[BUFFSIZE]; /* буфер */
6 ssize_t n; /* объем буфера */
7 } buff[NBUFF]; /* количество буферов */
8 sem_t mutex, nempty, nstored; /* семафоры, а не указатели */
9 } shared;
10 int fd; /* входной файл, копируемый в стандартный поток вывода */
11 void *produce(void *), *consume(void *);
12 int
13 main(int argc, char **argv)
14 {
15 pthread_t tid_produce, tid_consume;
16 if (argc != 2)
17 err_quit("usage: mycat2 <pathname>");
18 fd = Open(argv[1], O_RDONLY);
19 /* инициализация трех семафоров */
20 Sem_init(&shared.mutex, 0, 1);
21 Sem_init(&shared.nempty, 0, NBUFF);
22 Sem_init(&shared.nstored, 0, 0);
23 /* один производитель, один потребитель */
24 Set_concurrency(2);
25 Pthread_create(&tid_produce, NULL, produce, NULL); /* reader thread */
26 Pthread_create(&tid_consume, NULL, consume, NULL); /* writer thread */
27 Pthread_join(tid_produce, NULL);
28 Pthread_join(tid_consume, NULL);
29 Sem_destroy(&shared.mutex);
30 Sem_destroy(&shared.nempty);
31 Sem_destroy(&shared.nstored);
32 exit(0);
33 }
Объявление нескольких буферов2-9 Структура shared содержит массив структур buff, которые состоят из буфера и его счетчика. Мы создаем NBUFF таких буферов.
Открытие входного файла18 Аргумент командной строки интерпретируется как имя файла, который копируется в стандартный поток вывода.
В листинге 10.19 приведен текст функций produce и consume.
Листинг 10.19. Функции produce и consume//pxsem/mycat2.c
34 void *
35 produce(void *arg)
36 {
37 int i;
38 for (i = 0;;) {
39 Sem_wait(&shared.nempty); /* Ожидание освобождения места в буфере */
40 Sem_wait(&shared.mutex);
41 /* критическая область */
42 Sem_post(&shared.mutex);
43 shared.buff[i].n = Read(fd, shared.buff[i].data, BUFFSIZE);
44 if (shared.buff[i].n == 0) {
45 Sem_post(&shared.nstored); /* еще один объект */
46 return(NULL);
47 }
48 if (++i >= NBUFF)
49 i = 0; /* кольцевой буфер */
50 Sem_post(&shared.nstored); /* еще один объект */
51 }
52 }
53 void *
54 consume(void *arg)
55 {
56 int i;
57 for (i = 0;;) {
58 Sem_wait(&shared.nstored); /* ожидание появления объекта для обработки */
59 Sem_wait(&shared.mutex);
60 /* критическая область */
61 Sem_post(&shared.mutex);
62 if (shared.buff[i].n == 0)
63 return(NULL);
64 Write(STDOUT_FILENO, shared.buff[i].data, shared.buff[i].n);
65 if (++i >= NBUFF)
66 i=0; /* кольцевой буфер */
67 Sem_post(&shared.nempty); /* освободилось место для объекта */
68 }
69 }
Пустая критическая область40-42 Критическая область, защищаемая семафором mutex, в данном примере пуста. Если бы буферы данных представляли собой связный список, здесь мы могли бы удалять буфер из списка, не конфликтуя при этом с производителем. Но в нашем примере, где мы просто переходим к следующему буферу с единственным потоком-производителем, защищать нам просто нечего. Тем не менее мы оставляем операции установки и снятия блокировки, подчеркивая, что они могут потребоваться в новых версиях кода.
Считывание данных и увеличение семафора nstored43-49 Каждый раз, когда производитель получает пустой буфер, он вызывает функцию read. При возвращении из read увеличивается семафор nstored, уведомляя потребителя о том, что буфер готов. При возвращении функцией read значения 0 (конец файла) семафор увеличивается, а производитель завершает работу.
Поток-потребитель57-68 Поток-потребитель записывает содержимое буферов в стандартный поток вывода. Буфер, содержащий нулевой объем данных, обозначает конец файла. Как и в потоке-производителе, критическая область, защищенная семафором mutex, пуста.
ПРИМЕЧАНИЕ
В разделе 22.3 книги [24] мы разработали пример с несколькими буферами. В этом примере производителем был обработчик сигнала SIGIO, а потребитель представлял собой основной цикл обработки (функцию dg_echo). Разделяемой переменной был счетчик nqueue. Потребитель блокировал сигнал SIGIO на время проверки или изменения счетчика.
10.12. Использование семафоров несколькими процессами
Правила совместного использования размещаемых в памяти семафоров несколькими процессами просты: сам семафор (переменная типа semt, адрес которой является первым аргументом sem_init) должен находиться в памяти, разделяемой всеми процессами, которые хотят его использовать, а второй аргумент функции sem_init должен быть равен 1.
ПРИМЕЧАНИЕ
Эти правила аналогичны требованиям к разделению взаимного исключения, условной переменной или блокировки чтения-записи между процессами: средство синхронизации (переменная типа pthread_mutex_t, pthread_cond_t или pthread_rwlock_t) должно находиться в разделяемой памяти и инициализироваться с атрибутом PTHREAD_PROCESS SHARED.
Что касается именованных семафоров, процессы всегда могут обратиться к одному и тому же семафору, указав одинаковое имя при вызове sem_open. Хотя указатели, возвращаемые sem_open отдельным процессам, могут быть различны, все функции, работающие с семафорами, будут обращаться к одному и тому же именованному семафору.
Что произойдет, если мы вызовем функцию sem_open, возвращающую указатель на тип sem_t, а затем вызовем fork? В описании функции fork в стандарте Posix.1 говорится, что «все открытые родительским процессом семафоры будут открыты и в дочернем процессе». Это означает, что нижеследующий код верен: