Уильям Стивенс - UNIX: взаимодействие процессов
32 errno = EAGAIN;
43 goto err;
44 }
45 /* ожидание освобождения места в очереди */
46 while (attr->mq_curmsgs >= attr->mq_maxmsg)
47 pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);
48 }
Листинг 5.25. Функция mq_send: вторая половина//my_pxmsg_mmap/mq_send.с
49 /* nmsghdr будет указывать на новое сообщение*/
50 if ((freeindex = mqhdr->mqh_free) == 0)
51 err_dump("mymq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
52 nmsghdr = (struct mymsg_hdr *) &mptr[freeindex];
53 nmsghdr->msg_prio = prio;
54 nmsghdr->msg_len = len;
55 memcpy(nmsghdr + 1, ptr, len); /* копирование сообщения в очередь */
56 mqhdr->mqh_free = nmsghdr->msg_next; /* новое начало списка пустых сообщений */
57 /* поиск места в списке для нового сообщения */
58 index = mqhdr->mqh_head;
59 pmsghdr = (struct mymsg_hdr *) &(mqhdr->mqh_head);
60 while (index != 0) {
61 msghdr = (struct mymsg_hdr *) &mptr[index];
62 if (prio > msghdr->msg_prio) {
63 nmsghdr->msg_next = index;
64 pmsghdr->msg_next = freeindex;
65 break;
66 }
67 index = msghdr->msg_next;
68 pmsghdr = msghdr;
69 }
70 if (index == 0) {
71 /* очередь была пуста или новое письмо добавлено к концу списка */
72 pmsghdr->msg_next = freeindex;
73 nmsghdr->msg_next = 0;
74 }
75 /* запускаем любой из процессов, заблокированных в mq_receive */
76 if (attr->mq_curmsgs == 0)
77 pthread_cond_signal(&mqhdr->mqh_wait);
78 attr->mq_curmsgs++;
79 pthread_mutex_unlock(&mqhdr->mqh_lock);
80 return(0);
81 err:
82 pthread_mutex_unlock(&mqhdr->mqh lock);
83 return(-1);
84 }
Получение индекса свободного блока50-52 Поскольку количество свободных сообщений при создании очереди равно mq_maxmsg, ситуация, в которой mq_curmsgs будет меньше mq_maxmsg для пустого списка свободных сообщений, возникнуть не может.
Копирование сообщения53-56 Указатель nmsghdr хранит адрес области памяти, в которую помещается сообщение. Приоритет и длина сообщения сохраняются в структуре msg_hdr, а затем в память копируется содержимое сообщения, переданного вызвавшим процессом.
Помещение нового сообщения в соответствующее место связного списка57-74 Порядок сообщений в нашем списке зависит от их приоритета: они расположены в порядке его убывания. При добавлении нового сообщения мы проверяем, существуют ли сообщения с тем же приоритетом; в этом случае сообщение добавляется после последнего из них. Используя такой метод упорядочения, мы гарантируем, что mq_receive всегда будет возвращать старейшее сообщение с наивысшим приоритетом. По мере продвижения по списку мы сохраняем в pmsghdr адрес предыдущего сообщения, поскольку именно это сообщение будет хранить индекс нового сообщения в поле msg_next.
ПРИМЕЧАНИЕ
Наша схема может оказаться медленной в случае наличия в очереди большого количества сообщений, поскольку каждый раз при добавлении нового придется просматривать их значительную часть. Можно хранить отдельно индексы последних сообщений со всеми имеющимися значениями приоритета.
Пробуждение любого процесса, заблокированного в вызове mq_receive75-77 Если очередь была пуста в момент помещения в нее нового сообщения, мы вызываем pthread_cond_signal, чтобы разблокировать любой из процессов, ожидающих сообщения.
78 Увеличиваем на единицу количество сообщений в очереди mq_curmsgs.
Функция mq_receive
В листинге 5.27 приведен текст первой половины функции mq_receive, которая получает необходимые указатели, блокирует взаимное исключение и проверяет объем буфера вызвавшего процесса, который должен быть достаточным для помещения туда сообщения максимально возможной длины.
Проверка полноты очереди30-40 Если очередь пуста и установлен флаг O_NONBLOCK, возвращается ошибка с кодом EAGAIN. В противном случае увеличивается значение счетчика mqh_nwait, который проверяется функцией mq_send (листинг 5.25) в случае, если очередь пуста и есть процессы, ожидающие уведомления. Затем мы ожидаем сигнала по условной переменной, который будет передан функцией mq_send (листинг 5.26).
ПРИМЕЧАНИЕ
Наша реализация mq_receive, как и реализация mq_send, упрощает ситуацию с ошибкой EINTR, возвращаемой при прерывании ожидания сигналом, перехватываемым вызвавшим процессом.
В листинге 5.28 приведен текст второй половины функции mq_receive. Мы уже знаем, что в очереди есть сообщение, которое можно будет возвратить вызвавшему процессу.
Листинг 5.27.Функция mq_receive: первая половина//my_pxmsg_mmap/mq_receive.с
1 #include "unpipc.h"
2 #include "mqueue.h"
3 ssize_t
4 mymq_receive(mymqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
5 {
6 int n;
7 long index;
8 int8_t *mptr;
9 ssize_t len;
10 struct mymq_hdr *mqhdr;
11 struct mymq_attr *attr;
12 struct mymsg_hdr *msghdr;
13 struct mymq_info *mqinfo;
14 mqinfo = mqd;
15 if (mqinfo->mqi_magic != MQI_MAGIC) {
16 errno = EBADF;
17 return(-1);
18 }
19 mqhdr = mqinfo->mqi_hdr; /* указатель struct */
20 mptr = (int8_t *) mqhdr; /* указатель на байт */
21 attr = &mqhdr->mqh_attr;
22 if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
23 errno = n;
24 return(-1);
25 }
26 if (maxlen < attr->mq_msgsize) {
27 errno = EMSGSIZE;
28 goto err;
29 }
30 if (attr->mq_curmsgs = 0) { /* очередь пуста */
31 if (mqinfo->mqi_flags & O_NONBLOCK) {
32 errno = EAGAIN;
33 goto err;
34 }
35 /* ожидаем помещения сообщения в очередь */
36 mqhdr->mqh_nwait++;
37 while (attr->mq_curmsgs == 0)
38 pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);
39 mqhdr->mqh_nwait--;
40 }
Листинг 5.28. Функция mq_receive: вторая половина//my_pxmsg_mmap/mq_receive.c
41 if ((index = mqhdr->mqh_head) == 0)
42 err_dump("mymq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
43 msghdr = (struct mymsg_hdr *) &mptr[index];
44 mqhdr->mqh_head = msghdr->msg_next; /* новое начало списка */
45 len = msghdr->msg_len;
46 memcpy(ptr, msghdr + 1, len); /* копирование самого сообщения */
47 if (priop != NULL)
48 *priop = msghdr->msg_prio;
49 /* только что считанное сообщение становится первым в списке пустых */
50 msghdr->msg_next = mqhdr->mqr_free;
51 mqhdr->mqh_free = index;
52 /* запуск любого процесса, заблокированного в вызове mq_send */
53 if (attr->mq_curmsgs == attr->mq_maxmsg)
54 pthread_cond_signal(&mqhdr->mqh_wait);
55 attr->mq_curmsgs--;
56 pthread_mutex_unlock(&mqhdr->mqh_lock);
57 return(len);
58 err:
59 pthread_mutex_unlock(&mqhdr->mqh_lock);
60 return(-1);
61 }
Возвращение сообщения вызвавшему процессу43-51 msghdr указывает на msg_hdr первого сообщения в очереди, которое мы и возвратим. Освободившееся сообщение становится первым в списке свободных.
Разблокирование процесса, заблокированного в вызове mq_send52-54 Если очередь была полной в момент считывания сообщения, мы вызываем pthread_cond_signal для отправки сообщения любому из процессов, заблокированных в вызове mq_send.
5.9. Резюме
Очереди сообщений Posix просты в использовании: новая очередь создается (или существующая открывается) функцией mq_open; закрываются очереди вызовом mq_close, а удаляются mq_unlink. Поместить сообщение в очередь можно функцией mq_send, а считать его оттуда можно с помощью mq_receive. Атрибуты очереди можно считать и установить с помощью функций mq_getattr и mq_setattr, а функция mq_notify позволяет зарегистрировать процесс на уведомление о помещении нового сообщения в пустую очередь. Каждое сообщение в очереди обладает приоритетом (небольшое целое число), и функция mq_receive всегда возвращает старейшее сообщение с наивысшим приоритетом.
Изучая mq_notify, мы познакомились с сигналами реального времени стандарта Posix, которые обладают номерами от SIGMIN до SIGMAX. При установке обработчика для этих сигналов с флагом SA_SIGINFO они будут помещаться в очередь, доставляться в порядке очереди и сопровождаться двумя дополнительными аргументами (при вызове обработчика).
Наконец, мы реализовали большую часть возможностей очереди сообщений Posix в приблизительно 500 строках кода на языке С, используя отображаемые в память файлы, взаимные исключения и условные переменные Posix. Эта реализация иллюстрирует обработку ситуации гонок при создании новой очереди; еще раз нам придется столкнуться с такой ситуацией в главе 10 при реализации семафоров Posix.