Джулиан Бакнелл - Фундаментальные алгоритмы и структуры данных в Delphi
Перейдем к рассмотрению метода StartWriting, код которого приведен в листинге 12.4.
Вначале снова необходимо перехватить управление критическим разделом. При наличии любых выполняющихся потоков считывания или записи метод увеличивает значение счетчика ожидающих потоков записи, освобождает управление критическим разделом, а затем ожидает освобождения семафора заблокированных потоков записи.
Листинг 12.4. Метод StartWriting
procedure TtdReadWriteSync.StartWriting;
var
HaveToWait : boolean;
begin
{перехватить управление критическим разделом}
EnterCriticalSection(FController);
{при наличии еще одного запущенного потока записи или активных потоков считывания, метод добавляет себя в качестве ожидающего потока считывания и обеспечивает переход в состояние ожидания}
if FActiveWriter or (FActiveReaders <> 0) then begin
inc(FWaitingWriters);
HaveToWait := true;
end
{в противном случае метод должен добавить себя в качестве еще одного выполняющегося потока записи и обеспечить отсутствие состояния ожидания}
else begin
FActiveWriter :=true;
HaveToWait := false;
end;
{освободить управление критическим разделом}
LeaveCriticalSection(FController);
{при необходимости ожидания нужно выполнить следующее}
if HaveToWait then
WaitForSingleObject(FBlockedWriters, INFINITE);
end;
При отсутствии каких-либо других выполняющихся потоков можно сразу начать запись. Метод увеличивает значение счетчика выполняющихся потоков записи, освобождает управление критическим разделом и осуществляет выход из подпрограммы. В любом случае, сразу по выходу из подпрограммы значение счетчика активных потоков записи оказывается установленным равным единице (либо самим этим методом, либо методом StopReading - если помните, это происходит Непосредственно перед передачей семафора заблокированных потоков записи).
И, наконец, можно приступить к рассмотрению метода StopWriting, код которого приведен в листинге 12.5.
Как и ранее, первоначальная задача состоит в перехвате управления критическим разделом. Затем, поскольку запись завершена, метод уменьшает значение счетчика активных потоков записи. Теперь выполняется проверка количества ожидающих потоков считывания. Мы входим в цикл, который уменьшает значение счетчика активных потоков считывания и освобождает семафор. Семафор, в свою очередь, освобождает от ожидания один поток считывания. Со временем, по завершении цикла, все потоки считывания будут освобождены и смогут считаться активными (обратите внимание, что они все будут использовать соответствующее обращение к методу StartReading). Если, с другой стороны, не существует никаких ожидающих потоков считывания, метод выполняет проверку на наличие каких-либо ожидающих потоков записи. Если такие потоки существуют, метод освобождает только один поток записи таким же образом, как уже было описано при рассмотрении метода StopReading. И, наконец, независимо ни от чего, метод освобождает управление критическим разделом.
Листинг 12.5. Метод StopWriting
procedure TtdReadWriteSync.StopWriting;
var
i : integer;
begin
{перехватить управление критическим разделом}
EnterCriticalSection(FController);
{запись завершена}
FActiveWriter := false;
{если имеется хотя бы один ожидающий поток записи, освободить их всех}
if (FWaitingReaders <> 0) then begin
FActiveReaders := FWaitingReaders;
FWaitingReaders := 0;
ReleaseSemaphore(FBlockedReaders, FActiveReadersr nil);
end
{в противном случае, при наличии по меньшей мере одного ожидающего потока записи, ему необходимо предоставить свободу действий}
else
if (FWaitingWriters <> 0) then begin
dec(FWaitingWriters);
FActiveWriter :=true;
ReleaseSemaphore(FBlockedWriters, 1, nil);
end;
{освободить управление критическим разделом}
LeaveCriticalSection(FController);
end;
Нам осталось рассмотреть только два метода: конструктор Create и деструктор Destroy. Код реализации этих методов показан в листинге 12.6.
Листинг 12.6. Создание и уничтожение объекта синхронизации
constructor TtdReadWriteSync.Create;
var
NameZ : array [0..MAXJPATH] of AnsiChar;
begin
inherited Create;
{создать примитивные объекты синхронизации}
GetRandomObjName (NameZ, ' tdRW.BlockedReaders' );
FBlockedReaders := CreateSemaphore(nil, 0, MaxReaders, NameZ);
GetRandomObjName(NameZ, 'tdRW.BlockedWriters');
FBlockedWriters := CreateSemaphore(nil, 0, 1, NameZ);
InitializeCriticalSection(FController);
end;
destructor TtdReadWriteSyhc.Destroy;
begin
CloseHandle(FBlockedReaders);
CloseHandle(FBlockedWriters);
DeleteCriticalSection(FController);
inherited Destroy;
end;
Как видите, конструктор Create будет создавать три примитивных объекта синхронизации, а деструктор Destroy будет, соответственно, их уничтожать.
Полный исходный код класса TtdReadWriteSync можно найти на Web-сайте издательства, в разделе материалов. После выгрузки материалов отыщите среди них файл TDRWSync.pas.
Алгоритм производителей-потребителей
Еще один многопоточный алгоритм, тесно связанный с проблемой потоков считывания и записи - алгоритм, решающий проблему производителей и потребителей.
Этот раздел адресован только тем программистам, которые работают в среде 32-раздядной Windows. Delphi I вообще не поддерживает многопоточную обработку, в то время как Kylix и Linux не предоставляют необходимых примитивных объектов синхронизации, с помощью которых можно было бы решить проблему производителей-потребителей.
В этой ситуации имеется один или более потоков, создающих данные (их называют производителями (producers)), которые будут использоваться или потребляться одним или большим количеством других потоков (называемых потребителями (consumers)). Как видите, эта задача тесно связана с алгоритмом потоков считывания-записи: потребителей можно считать потоками считывания данных, записанных производителями. Примером использования этого алгоритма может послужить программа потокового видео: в этом случае будет существовать поток, который загружает видео из какого-то Web-сайта, и поток, который воспроизводит загруженное видео. Ни один из этих потоков не должен беспокоиться о том, что должен делать второй.
Мы сымитируем этот процесс подпрограммой копирования нескольких потоков. Производитель будет копировать данные из потока в очередь буферов. Затем потребитель будет копировать данные из буферов в другой поток. Например, мог бы существовать производитель, считывающий несжатые данные из потока, и два потребителя данных: один, сжимающий данные в другой поток с помощью одного алгоритма, и второй, сжимающий их с помощью другого алгоритма, что теоретически позволяет выбирать более плотно упакованные данные. В этом случае производитель может продолжать работу и пытаться максимально быстро заполнять буфера в очереди, а потребители, в свою очередь, могут пытаться максимально быстро их считывать. Работа производителя будет тормозиться, если потребители работают недостаточно быстро и очередь заполняется непрочитанными буферами. Аналогично, работа потребителей будет замедляться, если производитель работает медленно и очередь опустошается.
Модель с одним производителем и одним потребителем
Вначале рассмотрим модель с одним производителем и одним потребителем. Затем мы ее расширим до модели с одним производителем и несколькими потребителями. Нам необходимо, чтобы сразу после генерирования производителем "достаточного" объема данных потребитель мог начинать использовать уже сгенерированные данные. Поэтому необходимо рассмотреть три ситуации: производитель и потребитель работают согласованно;
потребитель прекращает свою работу или блокируется, поскольку производитель не создал достаточный объем данных;
производитель блокируется, поскольку потребитель не успел выполнить считывание уже созданных данных.
В примере с копированием потока производитель будет прекращать работу, если ему удастся заполнить все буферы прежде, чем потребитель успеет считать и обработать первый буфер. Потребитель будет блокироваться, если ему удастся обработать все буферы прежде, чем производитель успеет заполнить еще один буфер.
Следовательно, разрабатываемый нами класс синхронизации должен содержать четыре метода: вызываемый производителем, чтобы начать генерирование данных;
вызываемый при наличии каких-либо данных, готовых для использования потребителем;
вызываемый потребителем, чтобы начать потребление данных;
и, наконец, вызываемый потребителем по завершении потребления им объема данных, достаточного для возобновления генерации данных производителем. Как и в случае потоков считывания-записи, оба метода запуска могут блокировать вызывающие их потоки.
Полный код интерфейса и реализации класса производителя-потребителя приведен в листинге 12.7. Как видите, реализация весьма проста.
Листинг 12.7. Класс синхронизации одного производителя и одного потребителя type
TtdProduceConsumeSync = class private
FHasData : THandle;
{семафор}
FNeedsData : THandle;
{семафор}
protected
public
constructor Create(aBufferCount : integer);
destructor Destroy; override;
procedure StartConsuming;
procedure StartProducing;
procedure StopConsuming;
procedure StopProducing;
end;