C++17 STL Стандартная библиотека шаблонов - Яцек Галовиц
Producer 1 --> item 100
item 0 --> Consumer 0
Producer 2 --> item 200
item 100 --> Consumer 1
item 200 --> Consumer 2
Producer 0 --> item 1
Producer 1 --> item 101
item 1 --> Consumer 0
...
Producer 0 --> item 14
EXIT: Producer 0
Producer 1 --> item 114
EXIT: Producer 1
item 14 --> Consumer 0
Producer 2 --> item 214
EXIT: Producer 2
item 114 --> Consumer 1
item 214 --> Consumer 2
EXIT: Consumer 2
EXIT: Consumer 3
EXIT: Consumer 4
EXIT: Consumer 0
EXIT: Consumer 1
Как это работает
Этот пример дополняет предыдущий. Вместо того чтобы синхронизировать одного производителя с одним потребителем, мы реализовали программу, которая синхронизирует M производителей и N потребителей. Кроме того, приостанавливаются не только потребители при отсутствии элементов для них, но и производители, если очередь становится слишком длинной. Когда несколько потребителей ждут заполнения одной очереди, они будут действовать по принципу, работающему и для сценария «один производитель — один потребитель». Пока только один поток блокирует мьютекс, защищающий очередь, а затем извлекает оттуда элементы, код безопасен. Неважно, как много потоков ожидают блокировки одновременно. Это же верно и для производителя, поскольку единственный важный аспект в обоих сценариях таков: к очереди одномоментно может получить доступ только один поток, и не больше.
Более сложной, чем предыдущий пример, в котором запускались всего один производитель и один потребитель, эту программу делает тот факт, что мы указываем потокам-производителям останавливаться, когда длина очереди превышает какое-то значение. Чтобы соответствовать этому требованию, мы реализовали два разных сигнала, имеющих собственную переменную condition_variable.
1. go_produce сигнализирует о том, что очередь снова заполнена не до конца и производители могут опять начать ее заполнять.
2. go_consume уведомляет о достижении очереди максимального размера и о том, что потребители снова могут свободно использовать элементы.
Таким образом производители заполняют очередь элементами и сигнализируют с помощью события go_consume потокам-потребителям, которые ожидают на следующей строке:
if (go_consume.wait_for(lock, 1s, [] { return !q.empty(); })) {
// получили событие без тайм-аута
}
Производители, с другой стороны, ждут на следующей строке до тех пор, пока не смогут создавать элементы снова:
go_produce.wait(lock, [&] { return q.size() < stock; });
Интересный момент: мы не позволяем потребителям ждать вечно. В вызове go_consume.wait_for добавляем дополнительный аргумент timeout, имеющий значение, равное 1 секунде. Он представляет собой механизм выхода для потребителей: если очередь пуста более секунды, то, возможно, активных производителей больше не осталось.
Для простоты код пытается поддерживать длину очереди всегда на максимуме. Более сложная программа могла бы позволить потокам отправлять уведомления о пробуждении только в том случае, если очередь достигнет половины максимального размера. Таким образом, производители будут пробуждаться до того, как очередь опустеет, но не раньше, когда в очереди все еще хватает элементов.
Рассмотрим следующую ситуацию, которую позволяет элегантно решить condition_variable: если потребитель отправляет уведомление go_produce, то, возможно, множество производителей пытаются перегнать друг друга в попытке создать новый элемент. При нехватке только одного элемента работать будет только один производитель. Если все производители всегда станут создавать элемент при появлении события go_produce, то мы зачастую будем сталкиваться с ситуацией, когда очередь заполняется сверх своего максимального размера.
Представим ситуацию, когда у нас в очереди имеется (max-1) элементов и нужно создать один новый элемент, чтобы очередь снова стала заполненной.
Независимо от того, какой метод вызовет поток-потребитель — go_produce.notify_ one() (возобновит только один ожидающий поток) или go_produce.notify_all() (возобновит все ожидающие потоки), — можно гарантировать, что только один поток-производитель завершит вызов go_produce.wait, поскольку для остальных потоков-производителей не будет удовлетворяться условие ожидания q.size()<stock в момент получения ими мьютекса при пробуждении.
Распараллеливание отрисовщика множества Мандельброта в ASCII с применением std::async
Помните отрисовщик множества Мандельброта в ASCII из главы 6? В этом примере мы воспользуемся потоками, чтобы немного сократить время его вычисления. Сначала изменим строку оригинальной программы, которая ограничивает количество итераций для каждой выбранной координаты. Это сделает программу медленнее, а результаты — более точными в сравнении с той точностью, которая доступна при выводе данных на консоли, но у нас будет хороший пример программы для параллелизации.
Далее мы применим минимальные модификации к программе и увидим, что вся программа работает быстрее. После применения этих модификаций программа будет работать с std::async и std::future. Чтобы полностью уяснить данный пример, очень важно понять оригинальную программу.
Как это делается
В этом примере мы возьмем отрисовщик фрактала Мандельброта, который реализовали в главе 6. Сначала увеличим время вычисления, повысив границу вычислений. Затем ускорим программу, внеся четыре небольших изменения, чтобы распараллелить ее.
1. Чтобы следовать шагам, лучше всего скопировать всю программу из другого примера. Затем следуйте инструкциям, показанным в следующих шагах, для внесения всех необходимых изменений. Все отличия от оригинальной программы выделяются полужирным шрифтом.
Первое изменение — это дополнительный заголовочный файл <future>:
#include <iostream>
#include <algorithm>
#include <iterator>
#include <complex>
#include <numeric>
#include <vector>
#include <future>
using namespace std;
2. Функции scaler и scaled_cmplx менять не нужно:
using cmplx = complex<double>;
static auto scaler(int min_from, int max_from,
double min_to, double max_to)
{
const int w_from {max_from - min_from};
const double w_to {max_to - min_to};
const int mid_from {(max_from - min_from) / 2 + min_from};
const double mid_to {(max_to - min_to) / 2.0 + min_to};
return [=] (int from) {
return double(from - mid_from) / w_from * w_to + mid_to;
};
}
template <typename A, typename B>
static auto scaled_cmplx(A scaler_x, B scaler_y)
{
return [=](int x, int y) {
return cmplx{scaler_x(x), scaler_y(y)};
};
}
3. В функции mandelbrot_iterations просто увеличим количество итераций, чтобы программа выполняла больше вычислений:
static auto mandelbrot_iterations(cmplx c)
{
cmplx z