C++17 STL Стандартная библиотека шаблонов - Яцек Галовиц
Как это делается
В данном примере мы реализуем программу, похожую на программу из предыдущего примера, но в этот раз у нас будет несколько производителей и несколько потребителей.
1. Сначала включим все необходимые заголовочные файлы и объявим об использовании пространств имен std и chrono_literals:
#include <iostream>
#include <iomanip>
#include <sstream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
using namespace std;
using namespace chrono_literals;
2. Затем реализуем синхронизированную вспомогательную функцию для вывода сообщений на экран, показанную в другом примере этой главы, поскольку будем выводить множество сообщений на конкурентной основе:
struct pcout : public stringstream {
static inline mutex cout_mutex;
~pcout() {
lock_guard<mutex> l {cout_mutex};
cout << rdbuf();
}
};
3. Все производители пишут значения в одну очередь, а все потребители также получают значения из нее. В дополнение к этой очереди нужен мьютекс, защищающий очередь и флаг, на основе которого можно сказать, что создание элементов будет приостановлено в какой-то момент:
queue<size_t> q;
mutex q_mutex;
bool production_stopped {false};
4. В этой программе задействуем две разные переменные condition_variables. В предыдущем примере у нас была переменная condition_variable, которая указывала на появление в очереди новых элементов. В этом случае ситуация чуть более запутанная. Мы хотим, чтобы производители создавали новые элементы до тех пор, пока в очереди не будет содержаться их определенное количество. Если мы его достигли, то они должны приостановиться. Таким образом, переменная go_consume пригодна для возобновления потребителей, которые, в свою очередь, смогут возобновить производителей с помощью переменной go_produce:
condition_variable go_produce;
condition_variable go_consume;
5. Функция-производитель принимает идентификатор производителя, общее количество элементов, которые нужно создать, а также максимальное количество элементов в очереди. Затем она входит в собственный производственный цикл. Далее блокирует мьютекс очереди, а затем разблокирует его снова в вызове go_produce.wait. Функция ожидает выполнения условия, согласно которому размер очереди должен быть меньше порогового значения stock:
static void producer(size_t id, size_t items, size_t stock)
{
for (size_t i = 0; i < items; ++i) {
unique_lock<mutex> lock(q_mutex);
go_produce.wait(lock,
[&] { return q.size() < stock; });
6. После того как производитель будет возобновлен, он создаст элемент и поместит его в очередь. Значение, помещаемое в очередь, определяется на основе выражения id*100+i. Таким образом, мы впоследствии можем увидеть, какой производитель создал его, поскольку количество сотен показывает идентификатор производителя. Кроме того, выводим сообщение о создании элемента в терминал. Формат этого сообщения может выглядеть странно, но оно будет выравнено в соответствии с сообщениями в окне консоли:
q.push(id * 100 + i);
pcout{} << " Producer " << id << " --> item "
<< setw(3) << q.back() << 'n';
7. После создания элемента можно возобновить приостановленных потребителей. Период приостановки, равный 90 миллисекундам, симулирует тот факт, что на создание элементов требуется какое-то время:
go_consume.notify_all();
this_thread::sleep_for(90ms);
}
pcout{} << "EXIT: Producer " << id << 'n';
}
8. Теперь перейдем к функции-потребителю, которая принимает в качестве аргумента только идентификатор. Она продолжает ожидать элементов при условии, что их производство не остановлено или очередь не пуста. Если очередь пуста, но производство не остановлено, то, возможно, скоро появятся новые элементы:
static void consumer(size_t id)
{
while (!production_stopped || !q.empty()) {
unique_lock<mutex> lock(q_mutex);
9. После блокирования мьютекса очереди снова разблокируем его, чтобы подождать установки значения переменной события go_consume. Аргумент лямбда-выражения описывает, что нужно вернуть из вызова функции wait, когда очередь содержит элементы. Второй аргумент 1s указывает, что мы не должны ждать вечно. Если мы ждем больше секунды, то хотим выйти из функции wait. Можно определить, вернула ли функция wait_for значение (условие-предикат будет верным) или мы вышли из нее по тайм-ауту (в этом случае возвратится значение false). При наличии в очереди новых элементов используем (потребим) их и выведем соответствующее сообщение на консоль:
if (go_consume.wait_for(lock, 1s,
[] { return !q.empty(); })) {
pcout{} << " item "
<< setw(3) << q.front()
<< " --> Consumer "
<< id << 'n';
q.pop();
10. После потребления элемента оповещаем производителей и приостанавливаем поток на 130 миллисекунд для симуляции того факта, что потребление элементов тоже требует времени:
go_produce.notify_all();
this_thread::sleep_for(130ms);
}
}
pcout{} << "EXIT: Producer " << id << 'n';
}
11. В функции main создаем один экземпляр вектора для рабочих потоков и еще один — для потоков-потребителей:
int main()
{
vector<thread> workers;
vector<thread> consumers;
12. Далее порождаем три потока-производителя и пять потоков-потребителей:
for (size_t i = 0; i < 3; ++i) {
workers.emplace_back(producer, i, 15, 5);
}
for (size_t i = 0; i < 5; ++i) {
consumers.emplace_back(consumer, i);
}
13. Сначала позволим закончить работу потокам-производителям. Как только все из них вернут значения, установим значение флага production_stopped; это приведет к тому, что потребители также закончат свою работу. Нужно собрать их, а затем завершить программу:
for (auto &t : workers) { t.join(); }
production_stopped = true;
for (auto &t : consumers) { t.join(); }
}
14. Компиляция и запуск программы дадут следующий результат. Сообщений получилось довольно много, поэтому мы приводим их в сокращенном виде. Как видите, производители приостанавливаются время от времени и позволяют потребителям использовать несколько элементов, чтобы снова получить возможность их производить. Интересно будет изменить время ожидания для производителей/потребителей, а также манипулировать количеством производителей/потребителей и максимальным количеством элементов в очереди, поскольку это значительно меняет шаблоны появления выходных сообщений:
$ ./multi_producer_consumer
Producer 0 -->