Уильям Стивенс - UNIX: взаимодействие процессов
24 mqstat->mq_curmsgs = attr->mq_curmsgs;
25 pthread_mutex_unlock(&mqhdr->mqh_lock);
26 return(0);
27 }
Блокирование взаимного исключения17-20 Мы должны заблокировать соответствующее взаимное исключение для работы с очередью, в частности для получения атрибутов, поскольку какой-либо другой поток может в это время их изменить.
Функция mq_setattr
В листинге 5.23 приведен текст функции mq_setattr, которая устанавливает значение атрибутов очереди.
Считывание текущих атрибутов22-27 Если третий аргумент представляет собой ненулевой указатель, мы возвращаем предыдущее значение атрибутов перед внесением каких-либо изменений.
Изменение mq_flags28-31 Единственный атрибут, который можно менять с помощью нашей функции, — mq_flags, хранящийся в структуре mq_infо.
Листинг 5.23. Функция mq_setattr//my_pxmsg_mniap/mq_setattr.с
1 #include "unpipc.h"
2 #include "mqueue.h"
3 int
4 mymq_setattr(mymqd_t mqd. const struct mymq_attr *mqstat,
5 struct mymq attr *omqstat)
6 {
7 int n;
8 struct mymq_hdr *mqhdr;
9 struct mymq_attr *attr;
10 struct mymq_info *mqinfo;
11 mqinfo = mqd;
12 if (mqinfo->mqi_magic != MQI_MAGIC) {
13 errno = EBADF;
14 return(-1);
15 }
16 mqhdr = mqinfo->mqi_hdr;
17 attr = &mqhdr->mqh_attr;
18 if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) ! = 0) {
19 errno = n;
20 return(-1);
21 }
22 if (omqstat != NULL) {
23 omqstat->mq_flags = mqinfo->mqi_flags; /* исходные атрибуты */
24 omqstat->mq_maxmsg = attr->mq_maxmsg;
25 omqstat->mq_msgsize = attr->mq_msgsize;
26 omqstat->mq_curmsgs = attr->mq_curmsgs; /* текущий статус */
27 }
28 if (mqstat->mq_flags & O_NONBLOCK)
29 mqinfo->mqi flags |= O_NONBLOCK;
30 else
31 mqinfo->ntqi_flags &= ~O_NONBLOCK;
32 pthread_mutex_unlock(&mqhdr->mqh_lock);
33 return(0);
34 }
Функция mq_notify
Функция mq_notify, текст которой приведен в листинге 5.24, позволяет регистрировать процесс на уведомление для текущей очереди и снимать его с регистрации. Информация о зарегистрированных процессах (их идентификаторы) хранится в поле mqh_pid структуры mq_hdr. Только один процесс может быть зарегистрирован на уведомление в любой момент времени. При регистрации процесса мы сохраняем его структуру sigevent в структуре mqh_event.
Листинг 5.24. Функция mq_notify//my_pxmsg_mmap/mq_notify.с
1 #include "unpipc.h"
2 #include "mqueue.h"
3 int
4 mymq_notify(mymqd_t mqd, const struct sigevent *notification)
5 {
6 int n;
7 pid_t pid;
8 struct mymq_hdr *mqhdr;
9 struct mymq_info *mqinfo;
10 mqinfo = mqd;
11 if (mqinfo->mqi magic != MQI_MAGIC) {
12 errno = EBADF;
13 return(-1);
14 }
15 mqhdr = mqinfo->mqi_hdr;
16 if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
17 errno = n;
18 return(-1);
19 }
20 pid = getpid();
21 if (notification == NULL) {
22 if (mqhdr->mqh_pid == pid) {
23 mqhdr->mqh_pid = 0; /* снятие вызвавшего процесса с регистрации */
24 } /* если вызвавший процесс не зарегистрирован – 61К */
25 } else {
26 if (mqhdr->mqh_pid != 0) {
27 if (kill(mqhdr->mqh_pid, 0) != –1 || errno != ESRCH) {
28 errno = EBUSY;
29 goto err;
30 }
31 }
32 mqhdr->mqh_pid = pid;
33 mqhdr->mqh_event = *notification;
34 }
35 pthread_mutex_unlock(&mqhdr->mqh_lock);
36 return(0);
37 err:
38 pthread_mutex_unlock(&mqhdr->mqh_lock);
39 return(-1);
40 }
Снятие процесса с регистрации20-24 Если второй аргумент представляет собой нулевой указатель, вызвавший процесс снимается с регистрации. Если он не зарегистрирован, никакой ошибки не возвращается.
Регистрация вызвавшего процесса25-34 Если какой-либо процесс уже зарегистрирован, мы проверяем, существует ли он, отправкой ему сигнала с кодом 0 (называемого нулевым сигналом — null signal). Это обычная проверка на возможность ошибки, на самом деле при этом никакого сигнала процессу не отправляется, но при его отсутствии возвращается ошибка с кодом ESRCH. Если какой-либо процесс уже зарегистрирован на уведомление, функция возвращает ошибку EBUSY. В противном случае сохраняется идентификатор процесса вместе с его структурой sigevent.
ПРИМЕЧАНИЕ
Наш метод проверки существования вызвавшего процесса не идеален. Процесс мог завершить работу, а его идентификатор мог быть использован другим процессом.
Функция mq_send
В листинге 5.25 приведен текст первой половины нашей функции mqsend.
Инициализация14-29 Мы получаем указатели на используемые структуры и блокируем взаимное исключение для данной очереди. Проверяем, не превышает ли размер сообщения максимально допустимый для данной очереди.
Проверка очереди на пустоту и отправка уведомления30-38 Если мы помещаем первое сообщение в пустую очередь, нужно проверить, не зарегистрирован ли какой-нибудь процесс на уведомление об этом событии и нет ли потоков, заблокированных в вызове mq_receive. Для проверки второго условия мы воспользуемся сохраняемым функцией mq_receive счетчиком mqh_nwait, содержащим количество потоков, заблокированных в вызове mq_receive. Если этот счетчик имеет ненулевое значение, мы не отправляем уведомление зарегистрированному процессу. Для отправки сигнала SIGEV_SIGNAL используется функция sigqueue. Затем процесс снимается с регистрации.
ПРИМЕЧАНИЕ
Вызов sigqueue для отправки сигнала приводит к передаче сигнала SI_QUEUE обработчику сигнала в структуре типа siginfo_t (раздел 5.7), что неправильно. Отправка правильного значения si_code (а именно SI_MESGQ) из пользовательского процесса осуществляется в зависимости от реализации. На с. 433 стандарта IEEE 1996 [8] отмечается, что для отправки этого сигнала из пользовательской библиотеки необходимо воспользоваться скрытым интерфейсом механизма отправки сигналов.
Проверка заполненности очереди39-48 Если очередь переполнена и установлен флаг O_NONBLOCK, мы возвращаем ошибку с кодом EAGAIN. В противном случае мы ожидаем сигнала по условной переменной mqh_wait, который, как мы увидим, отправляется функцией mq_receive при считывании сообщения из переполненной очереди.
ПРИМЕЧАНИЕ
Наша реализация упрощает ситуацию с возвращением ошибки EINTR при прерывании вызова mq_send сигналом, перехватываемым вызвавшим процессом. Проблема в том, что функция pthread_cond_wait не возвращает ошибки при возврате из обработчика сигнала: она может вернуть либо 0 (что рассматривается как ложное пробуждение), либо вообще не завершить работу. Все эти проблемы можно обойти, но это непросто.
В листинге 5.26 приведена вторая половина функции mq_send. К моменту ее выполнения мы уже знаем о наличии в очереди свободного места для нашего сообщения.
Листинг 5.25. Функция mq_send: первая половина//my_pxmsg_mmap/mq_send.с
1 #include "unpipc.h"
2 #include "mqueue.h"
3 int
4 mymq_send(mymqd_t mqd, const char *ptr, size_t len, unsigned int prio)
5 {
6 int n;
7 long index, freeindex;
8 int8_t *mptr;
9 struct sigevent *sigev;
10 struct mymq_hdr *mqhdr;
11 struct mymq_attr *attr;
12 struct mymsg_hdr *msghdr, *nmsghdr, *pmsghdr;
13 struct mymq_info *mqinfo;
14 mqinfo = mqd;
15 if (mqinfo->mqi_magic != MQI_MAGIC) {
16 errno = EBADF;
17 return(-1);
18 }
19 mqhdr = mqinfo->mqi_hdr; /* указатель типа struct */
20 mptr = (int8_t *) mqhdr; /* указатель на байт */
21 attr = &mqhdr->mqh_attr;
22 if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
23 errno = n;
24 return(-1);
25 }
26 if (len > attr->mq_msgsize) {
27 errno = EMSGSIZE;
28 goto err;
29 }
30 if (attr->mq_curmsgs == 0) {
31 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0) {
32 sigev = &mqhdr->mqh_event;
33 if (sigev->sigev_notify == SIGEV_SIGNAL) {
34 sigqueue(mqhdr->mqh_pid, sigev->sigev_signo,
35 sigev->sigev_value);
36 }
37 mqhdr->mqh_pid = 0; /* снятие с регистрации */
38 }
39 } else if (attr->mq_curmsgs >= attr->mq_maxmsg) {
40 /* 4queue is full */
41 if (mqinfo->mqi_flags & O_NONBLOCK) {
32 errno = EAGAIN;