Уильям Стивенс - UNIX: взаимодействие процессов
37 if (errno != EAGAIN)
38 err_sys("mq_receive error");
39 Sigprocmask(SIG_UNBLOCK, &newmask, NULL); /* разблокируем SIGUSR1 */
40 }
41 exit(0);
42 }
43 static void
44 sig_usr1(int signo)
45 {
46 mqflag = 1;
47 return;
48 }
Открытие очереди сообщений в режиме отключенной блокировки15-18 Первое изменение в программе: при открытии очереди сообщений указывается флаг O_NONBLOCK.
Считывание всех сообщений из очереди34-38 Другое изменение: mq_receive вызывается в цикле, считывая все сообщения в очереди, пока не будет возвращена ошибка с кодом EAGAIN, означающая отсутствие сообщений в очереди.
Пример: уведомление с использованием sigwait вместо обработчика
Хотя программа из предыдущего примера работает правильно, можно повысить ее эффективность. Программа использует sigsuspend для блокировки в ожидании прихода сообщения. При помещении сообщения в пустую очередь вызывается сигнал, основной поток останавливается, запускается обработчик, который устанавливает флаг mqflag, затем снова запускается главный поток, он обнаруживает, что значение mqflag отлично от нуля, и считывает сообщение. Более простой и эффективный подход заключается в блокировании в функции, ожидающей получения сигнала, что не требует вызова обработчика только для установки флага. Эта возможность предоставляется функцией sigwait:
#include <signal.h>
int sigwait(const sigset_t *set, int *sig);
/* Возвращает 0 в случае успешного завершения, –1 – в случае ошибки */
Перед вызовом sigwait мы блокируем некоторые сигналы. Набор блокируемых сигналов указывается в качестве аргумента set. Функция sigwait блокируется, пока не придет по крайней мере один из этих сигналов. Когда он будет получен, функция возвратит его. Значение этого сигнала сохраняется в указателе sig, а функция возвращает значение 0. Это называется синхронным ожиданием асинхронного события: мы используем сигнал, но не пользуемся асинхронным обработчиком сигнала.
В листинге 5.11 приведен текст программы, использующей mq_notifу и sigwait.
Листинг 5.11. Использование mq_notify совместно с sigwait//pxmsg/mqnotifysig4.c
1 #include "unpipc.h"
2 int
3 main(int argc, char **argv)
4 {
5 int signo;
6 mqd_t mqd;
7 void *buff;
8 ssize_t n;
9 sigset_t newmask;
10 struct mq_attr attr;
11 struct sigevent sigev;
12 if (argc != 2)
13 err_quit("usage: mqnotifysig4 <name>");
14 /* открытие очереди, получение атрибутов, выделение буфера */
15 mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
16 Mq_getattr(mqd, &attr);
17 buff = Malloc(attr.mq_msgsize);
18 Sigemptyset(&newmask);
19 Sigaddset(&newmask, SIGUSR1);
20 Sigprocmask(SIG_BLOCK, &newmask, NULL); /* блокируем SIGUSR1 */
21 /* установка обработчика, включение уведомления */
22 sigev.sigev_notify = SIGEV_SIGNAL;
23 sigev.sigev_signo = SIGUSR1;
24 Mq_notify(mqd, &sigev);
25 for (;;) {
26 Sigwait(&newmask, &signo);
27 if (signo == SIGUSR1) {
28 Mq_notify(mqd, &sigev); /* перерегистрируемся */
29 while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
30 printf("read %ld bytesn", (long) n);
31 }
32 if (errno != EAGAIN)
33 err_sys("mq_receive error");
34 }
35 }
36 exit(0);
37 }
Инициализация набора сигналов и блокировка SIGUSR118-20 Инициализируется один набор сигналов, содержащий только SIGUSR1, а затем этот сигнал блокируется sigprocmask.
Ожидание сигнала26-34 Мы блокируем выполнение программы и ждем прихода сигнала, вызвав sigwait. При получении сигнала SIGUSR1 мы перерегистрируемся на уведомление и считываем все доступные сообщения.
ПРИМЕЧАНИЕ
Функция sigwait часто используется в многопоточных процессах. Действительно, глядя на прототип функции, мы можем заметить, что возвращаемое значение будет 0 или одной из ошибок Еххх, что весьма похоже на функции Pthread. Однако в многопоточном процессе нельзя пользоваться sigprocmask — вместо нее следует вызывать pthread_ sigmask, которая изменяет маску сигналов только для вызвавшего ее потока. Аргументы pthread_sigmask совпадают с аргументами sigprocmask.
Существуют два варианта функции sigwait: sigwaitinfo возвращает структуру siginfo_t (которая будет определена в следующем разделе) и предназначена для использования с надежными сигналами; функция sigtimedwait также возвращает структуру siginfo_t и позволяет вызывающему процессу установить ограничение по времени на ожидание.
Большая часть книг о многопоточном программировании, таких как [3], рекомендуют пользоваться sigwait для обработки всех сигналов в многопоточном процессе и не использовать асинхронные обработчики.
Пример: очереди сообщений Posix и функция select
Дескриптор очереди сообщений (переменная типа mqd_t) не является «обычным» дескриптором и не может использоваться с функциями select и poll (глава 6 [24]). Тем не менее их можно использовать вместе с каналом и функцией mq_notify. (Аналогичный метод применен в разделе 6.9 для очередей System V, где создается дочерний процесс и канал связи.) Прежде всего обратите внимание, что, согласно табл. 5.1, функция write принадлежит к группе async-signal-safe, поэтому она может вызываться из обработчика сигналов. Программа приведена в листинге 5.12.
Листинг 5.12. Использование уведомления с помощью сигнала и канала//pxmsg/mqnotifysig5.c
1 #include "unpipc.h"
2 int pipefd[2];
3 static void sig_usr1(int);
4 int
5 main(int argc, char **argv)
6 {
7 int nfds;
8 char c;
9 fd_set rset;
10 mqd_t mqd;
11 void *buff;
12 ssize_t n;
13 struct mq_attr attr;
14 struct sigevent sigev;
15 if (argc != 2)
16 err_quit("usage: mqnotifysig5 <name>");
17 /* открытие очереди, получение атрибутов, выделение буфера */
18 mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
19 Mq_getattr(mqd, &attr);
20 buff = Malloc(attr.mq_msgsize);
21 Pipe(pipefd);
22 /* установка обработчика, включение уведомления */
23 Signal(SIGUSR1, sig_usr1);
24 sigev.sigev_notify = SIGEV_SIGNAL;
25 sigev.sigev_signo = SIGUSR1;
26 Mq_notify(mqd, &sigev);
27 FD_ZERO(&rset);
28 for (;;) {
29 FD_SET(pipefd[0], &rset);
30 nfds = Select(pipefd[0] + 1, &rset, NULL, NULL, NULL);
31 if (FD_ISSET(pipefd[0], &rset)) {
32 Read(pipefd[0], &c, 1);
33 Mq_notify(mqd, &sigev); /* перерегистрируемся */
34 while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
35 printf("read %ld bytesn", (long) n);
36 }
37 if (errno != EAGAIN)
38 err_sys("mq_receive error");
39 }
40 }
41 exit(0);
42 }
43 static void
44 sig_usr1(int signo)
45 {
46 Write(pipefd[1], "", 1); /* один байт – 0 */
47 return;
48 }
Создание канала21 Мы создаем канал, в который обработчик сигнала произведет запись, когда будет получено уведомление о поступлении сообщения в очередь. Это пример использования канала внутри одного процесса.
Вызов select27-40 Мы инициализируем набор дескрипторов rset и при каждом проходе цикла включаем бит, соответствующий дескриптору pipefd[0] (открытый на считывание конец канала). Затем мы вызываем функцию select, ожидая получения единственного дескриптора, хотя в типичном приложении именно здесь осуществлялось бы размножение дескрипторов одного из концов канала. Когда появляется возможность читать из канала, мы перерегистрируемся на уведомление и считываем все доступные сообщения.
Обработчик сигнала43-48 Единственное, что делает обработчик сигнала, — записывает в канал 1 байт. Как мы уже отмечали, эта операция относится к разрешенным для асинхронных обработчиков.
Пример: запуск нового потока
Альтернативой снятию блокировки сигналом является присваивание sigev_notify значения SIGEV_THREAD, что приводит к созданию нового потока. Функция, указанная в sigev_notify_function, вызывается с параметром sigev_value. Атрибуты нового канала указываются переменной sigev_notify_attributes, которая может быть и нулевым указателем, если нас устраивают устанавливаемые по умолчанию атрибуты. Текст программы приведен в листинге 5.13.
Листинг 5.13. Функция mq_notify, запускающая новый программный поток//pxmsg/mqnotifythread1.с