C++17 STL Стандартная библиотека шаблонов - Яцек Галовиц
o: 4
r: 1
z: 2
Sorted string: " aaaabbbffginoooorzz"
Total vowels: 9
Как это работает
Если бы мы не использовали std::async, то последовательный нераспараллеленный код выглядел бы довольно просто:
auto hist (histogram(input));
auto sorted_str (sorted( input));
auto vowel_count (vowels( input));
for (const auto &[c, count] : hist) {
cout << c << ": " << count << 'n';
}
cout << "Sorted string: " << quoted(sorted_str) << 'n';
cout << "Total vowels: " << vowel_count << 'n';
Чтобы распараллелить код, мы сделали следующее: обернули три вызова функций в вызовы async(launch::async, ...). Следовательно, указанные функции выполняются не в основном потоке. Вместо этого async запускает новые потоки и позволяет им выполнить функции конкурентно. Таким образом, мы имеем дело только с теми издержками, которые возникают при запуске другого потока, и можем продолжить выполнение следующих строк кода, а вся работа совершится в фоновом режиме:
auto hist (async(launch::async, histogram, input));
auto sorted_str (async(launch::async, sorted, input));
auto vowel_count (async(launch::async, vowels, input));
for (const auto &[c, count] : hist.get()) {
cout << c << ": " << count << 'n';
}
cout << "Sorted string: "
<< quoted(sorted_str.get()) << 'n'
<< "Total vowels: "
<< vowel_count.get() << 'n';
В то время как, например, функция histogram возвращает экземпляр типа map, вызов async(..., histogram, ...) возвращает ассоциативный массив, который обернут в объект типа future. Последний является чем-то вроде заполнителя до тех пор, пока поток, выполняющий функцию histogram, не вернет значение. Полученный ассоциативный массив помещается в объект типа future, и мы наконец можем получить к нему доступ. Функция get дает доступ к инкапсулированному результату.
Рассмотрим еще один короткий пример. Взгляните на этот фрагмент:
auto x (f(1, 2, 3));
cout << x;
Вместо предыдущего кода мы могли написать следующее:
auto x (async(launch::async, f, 1, 2, 3));
cout << x.get();
Вот, по сути, и все. Выполнение задач в фоновом режиме никогда не было проще по стандартам С++. Осталось разрешить один момент: что означает launch::async? Это флаг, который определяет политику запуска. Существуют два флага политики, соответственно, возможны три их сочетания (табл. 9.4).
Вызов наподобие async(f, 1, 2, 3) без аргумента политики автоматически выберет обе политики. Реализация async сама выберет, какую политику использовать. Это означает отсутствие уверенности в том, что другой поток вообще запустится или что выполнение будет просто отложено в другом потоке.
Дополнительная информация
Следует рассмотреть последний момент. Предположим, мы пишем код следующим образом:
async(launch::async, f);
async(launch::async, g);
Это может привести к тому, что функции f и g (в данном примере неважны возвращаемые ими значения) будут выполняться в конкурирующих потоках и в это же время будут запускаться разные задачи. При запуске такого кода мы увидим блокировку кода при этих вызовах, что нам, вероятно, не требуется.
Почему же код блокируется? Разве async не должен отвечать за неблокируемость асинхронных вызовов? Да, это так, но есть одна особая тонкость: если объект типа future был получен из вызова async, имеющего политику launch::async, то его деструктор выполнит блокирующее ожидание.
Это значит, что оба вызова async из данного короткого примера являются блокирующими, поскольку сроки жизни объектов типа future, которые они возвращают, заканчиваются в одной строке! Можно исправить данную ситуацию, получив их возвращаемые значения и поместив в переменные с более длинным сроком жизни.
Реализуем идиому «производитель/потребитель» с использованием std::condition_variable
В этом примере мы реализуем типичную программу, работающую по принципу «производитель/потребитель», в которой запускается несколько потоков. Основная идея заключается в том, что существует один поток, который создает элементы и помещает их в очередь. Еще один поток потребляет (использует) эти элементы. Если создавать нечего, то поток-производитель приостанавливается. При отсутствии в очереди элементов приостанавливается потребитель.
Оба потока имеют доступ к очереди и могут изменить, поэтому ее нужно защитить с помощью мьютекса.
Важно рассмотреть и следующий момент: что будет делать потребитель, если в очереди нет элементов? Будет ли он опрашивать очередь каждую секунду до тех пор, пока не увидит новые элементы? В этом нет необходимости, поскольку можно позволить потребителю подождать событий, которые его пробудят, отправляемых производителем в момент, когда появляются новые элементы.
Для таких событий в C++11 предоставляется удобная структура данных: std::condition_variable. В этом примере мы реализуем простое приложение, работающее по принципу «производитель/потребитель», которое пользуется этими структурами.
Как это делается
Итак, в этом примере мы реализуем простую программу, работающую по принципу «производитель/потребитель», которая запускает одного производителя значений в отдельном потоке, а также одного потребителя в другом потоке.
1. Сначала выполним все необходимые директивы include и объявим об использовании пространства имен std:
#include <iostream>
#include <queue>
#include <tuple>
#include <condition_variable>
#include <thread>
using namespace std;
using namespace chrono_literals;
2. Создадим экземпляр очереди простых численных значений и назовем ее q. Производитель будет помещать туда значения, а потребитель — доставать их оттуда. Для их синхронизации понадобится мьютекс. Вдобавок создадим экземпляр типа condition_variable и назовем его cv. Переменная finished представляет собой способ, с помощью которого производитель укажет потребителю, что других значений больше не будет:
queue<size_t> q;
mutex mut;
condition_variable cv;
bool finished {false};
3. Реализуем функцию-производитель. Она принимает аргумент items, который ограничивает максимальное количество создаваемых элементов. В простом цикле он будет приостанавливаться на 100 миллисекунд для каждого элемента, что симулирует некоторую вычислительную сложность. Затем заблокируем мьютекс, синхронизирующий доступ к очереди. После успешного создания и вставки элемента в очередь вызываем cv.notify_all(). Данная функция пробуждает потребителя. Далее мы увидим на стороне потребителя, как это работает.
static void producer(size_t items) {
for (size_t i {0}; i < items; ++i) {
this_thread::sleep_for(100ms);
{
lock_guard<mutex> lk {mut};
q.push(i);
}
cv.notify_all();
}
4. После создания всех элементов снова