Операционная система UNIX - Робачевский Андрей Михайлович
Передача данных
Как уже обсуждалось, передача данных в потоке происходит в виде сообщений. Процесс инициирует передачу данных с помощью системных вызовов write(2) и putmsg(2), которые непосредственно взаимодействуют с головным модулем. Головной модуль формирует сообщение, копируя в него прикладные данные, и передает его следующему модулю вниз по потоку. В конечном итоге сообщение принимается драйвером, который выполняет необходимые операции с конкретным устройством. В случае, когда драйвер получает данные от устройства, он также передает их в виде сообщений вверх по потоку. Процесс имеет возможность получить данные с помощью системных вызовов read(2) или getmsg(2). Если в головном модуле данные отсутствуют, процесс блокируется и переходит в состояние сна.
Сообщения передаются модулями с помощью системной функции putnext(9F):
#include <sys/stream.h>
#include <sys/ddi.h>
int putnext(queue_t *q, mblk_t *mp);
Эта функция адресует очередь следующего модуля параметром q и вызывает процедуру <i>xx</i>put() этой очереди, передавая ей сообщение mp. Не поощряется непосредственный вызов функции <i>xx</i>put() следующего модуля, поскольку это может вызвать определенные проблемы переносимости.
Передача данных внутри потока осуществляется асинхронно и не может блокировать процесс. Блокирование процесса возможно только при передаче данных между процессом и головным модулем. Таким образом, функции обработки данных потока — <i>xx</i>put() и <i>xx</i>service() не могут блокироваться. Если процедура <i>xx</i>put() не может передать данные следующему модулю, она помещает сообщение в собственную очередь, откуда оно может быть передано позже процедурой <i>xx</i>service(). Если и процедура <i>xx</i>service() не может осуществить передачу сообщения, например, из-за переполнения очереди следующего модуля, она не будет ожидать изменения ситуации, а вернет сообщение обратно в собственную очередь и завершит выполнение. Попытка передачи повторится, когда ядро через некоторое время опять запустит <i>xx</i>service().
Процедура <i>xx</i>service() вызывается в системном контексте, а не в контексте процесса, который инициировал передачу данных. Таким образом, блокирование процедуры <i>xx</i>service() может заблокировать (перевести в состояние сна) независимый процесс, что может привести к непредсказуемым результатам и потому недопустимо. Решение этой проблемы заключается в запрещении процедурам <i>xx</i>put() и <i>xx</i>service() блокирования своего выполнения.
Блокирование недопустимо и для драйвера. Обычно прием данных драйвером осуществляется с использованием прерываний. Таким образом процедура <i>xx</i>put() вызывается в контексте прерывания и не может блокировать свое выполнение.
Когда процедура <i>xx</i>put() не может передать сообщение следующему модулю, она вызывает функцию putq(9F), имеющую следующий вид:
#include <sys/stream.h>
int putq(queue_t *q, mblk_t *mp);
Функция putq(9F) помещает сообщение mp в очередь q, где сообщение ожидает последующей передачи, и заносит очередь в список очередей, нуждающихся в обработке. Для таких очередей ядро автоматически вызывает процедуру <i>xx</i>service(). Планирование вызова процедур <i>xx</i>service() производится функцией ядра runqueues().[59] Функция runqueues() вызывается ядром в двух случаях:
□ Когда какой-либо процесс выполняет операцию ввода/вывода над потоком.
□ Непосредственно перед переходом какого-либо процесса из режима ядра в режим задачи.
Заметим, что планирование обслуживания очередей не связано с конкретным процессом и производится для всей подсистемы STREAMS в целом.
Функция runqueue() производит поиск всех потоков, нуждающихся в обработке очередей. При наличии таковых просматривается список очередей, ожидающих обработки, и для каждой из них вызывается соответствующая функция <i>xx</i>service(). Каждая процедура <i>xx</i>service(), в свою очередь, пытается передать все сообщения очереди следующему модулю. Если для каких-либо сообщений это не удается, они остаются в очереди, ожидая следующего вызова runqueue(), после чего процесс повторяется.
Управление передачей данных
Деление процесса передачи данных на два этапа, выполняемых, соответственно, функциями <i>xx</i>put() и <i>xx</i>service(), позволяет реализовать механизм управления передачей данных.
Как уже упоминалось, обязательной для модуля является лишь функция <i>xx</i>put(). Рассмотрим ситуацию, когда модули потока не содержат процедур <i>xx</i>service(). В этом случае, проиллюстрированном на рис. 5.19, каждый предыдущий модуль вызывает функцию <i>xx</i>put() следующего, передавая ему сообщение, с помощью функции ядра putnext(9F). Функция <i>xx</i>put() немедленно вызывает putnext(9F) и т.д.:
xxput(queue_t *q, mblk_t *mp) {
putnext(q, mp);
}
Рис. 5.19. Передача данных без управления потоком
Когда данные достигают драйвера, он передает их непосредственно устройству. Если устройство занято, или драйвер не может немедленно обработать данные, сообщение уничтожается. В данном примере никакого управления потоком не происходит, и очереди сообщений не используются.
Хотя такой вариант может применяться для некоторых драйверов (как правило, для псевдоустройств, например, /dev/null), в общем случае устройство не может быть все время готово к обработке данных, а потеря данных из-за занятости устройства недопустима. Таким образом, в потоке может происходить блокирование передачи данных[60], и эта ситуация не должна приводить к потере сообщений, во избежание которой необходим согласованный между модулями механизм управления потоком. Для этого сообщения обрабатываются и буферизуются в соответствующей очереди модуля, а их передача возлагается на функцию <i>xx</i>service(), вызываемую ядром автоматически. Для каждой очереди определены две ватерлинии — верхняя и нижняя, которые используются для контроля заполненности очереди. Если число сообщений превышает верхнюю ватерлинию, очередь считается переполненной, и передача сообщений блокируется, пока их число не станет меньше нижней ватерлинии.