Новейшая итерация C++, известная под названием C++11 и одобренная Международной организацией по стандартизации (International Organization for Standardization, ISO) в прошлом году, формализует новый набор библиотек и несколько зарезервированных слов, используемых при программировании параллельной обработки. Многие и раньше использовали параллельную обработку в C++, но всегда с помощью одной из сторонних библиотек, которые нередко напрямую предоставляют API операционной системы.
Герб Саттер (Herb Sutter) объявил в декабре 2004 г., что «бесплатные подарки» в производительности закончились, имея в виду, что производители процессоров больше не могут создавать более быстрые процессоры простым наращиванием их тактовой частоты из-за чрезмерно растущего потребления энергии и выделяемого тепла. В итоге это привело нас в современный мир многоядерных процессоров — в новую реальность, важный шаг в которую только что сделал стандартный C++.
Я решил разбить эту статью на две основные темы. Первая начинается с раздела «Параллельное выполнение», который охватывает технологии, позволяющие приложениям параллельно выполнять независимые или частично независимые операции. Вторая основная тема начинается с раздела «Синхронизация параллельной обработки» — в ней исследуются механизмы синхронизации упомянутых выше операций, обрабатывающих данные, с целью избежать условий для возникновения гонок.
Эта статья написана с учетом средств, включенных в предстоящую версию Visual C++ (теперь она называется Visual C++ 11). Некоторые из них уже доступны в текущей версии, Visual C++ 2010. Статья не является ни руководством по моделированию параллельных алгоритмов, ни исчерпывающим описанием всех новых возможностей — это обстоятельное введение в новые средства параллельной обработки C++11.
Параллельное выполнение
Когда вы моделируете процессы и проектируете алгоритмы для обработки данных, возникает естественный соблазн выразить их последовательностью этапов. Пока производительность находится в приемлемых рамках, чаще всего рекомендуют придерживаться именно такой схемы, потому что понять ее гораздо легче, а это очень важно для сопровождения кодовых баз.
Когда производительность становится фактором, вызывающим тревогу, традиционные попытки преодолеть эту проблему заключаются в такой оптимизации последовательного алгоритма, чтобы он расходовал меньше процессорных тактов. Однако это возможно лишь до определенной точки, после которой дальнейшая оптимизация либо недоступна, либо чрезмерно сложна в реализации. Тогда приходит пора разбивать последовательные наборы этапов на параллельные операции.
В первом разделе вы узнаете следующее.
- Асинхронные задачи — это меньшие части исходного алгоритма, связанные только данными, которые они создают или используют.
- Потоки — единицы выполнения, управляемые средой выполнения. Они связаны с задачами в том смысле, что задачи выполняются в потоках.
- Внутренние элементы потоков — переменные уровня потока, исключения, передаваемые от потоков, и т. д.
Асинхронные задачи
В коде, сопровождающем эту статью, вы найдете проект Sequential Case (рис. 1).
Рис. 1. Код Sequential Case
int a, b, c;
int calculateA()
{
return a+a*b;
}
int calculateB()
{
return a*(a+a*(a+1));
}
int calculateC()
{
return b*(b+1)-b;
}
int main(int argc, char *argv[])
{
getUserData(); // инициализируем a и b
c = calculateA() * (calculateB() + calculateC());
showResult();
}
Функция main запрашивает у пользователя некие данные, а затем передает их трем функциям: calculateA, calculateB и calculateC. Впоследствии результаты комбинируются, и создается выходная информация для пользователя.
В каждую функцию вычислений в сопутствующих материалах введена случайная задержка от одной до трех секунд. Учитывая, что они выполняются последовательно, это приводит к тому, что в худшем случае общее время выполнения (после ввода данных) длится девять секунд. Вы можете сами опробовать этот код, нажав клавишу F5 и запустив пример.
Поэтому мне нужно пересмотреть последовательность выполнения и найти стадии, которые можно выполнять параллельно. Так как эти функции независимы, их можно выполнять параллельно, используя функцию async:
int main(int argc, char *argv[])
{
getUserData();
future<int> f1 = async(calculateB), f2 = async(calculateC);
c = (calculateA() + f1.get()) * f2.get();
showResult();
}
Я ввел здесь две концепции: async и future — обе они определены в заголовке <future> и пространстве имен std. Первая принимает функцию, лямбду или функцию-объект (функтор) и возвращает future. Вы можете рассматривать концепцию future как шаблон подстановки конечного результата. Какого результата? Того, который возвращается функцией, вызванной асинхронно.
В какой-то момент мне понадобятся результаты этих выполняемых параллельно функций. Вызов метода get в каждом future блокирует выполнение до тех пор, пока не будет доступно значение.
Вы можете проверить и сравнить переработанный код с первоначальным последовательным, запустив проект AsyncTasks. В этой модификации задержка в худшем случае составляет примерно три секунды против девяти для последовательной версии.
Это облегченная модель программирования, которая освобождает разработчика от обязанности создавать потоки. Впрочем, вы можете определять политики создания потоков, но здесь я не буду говорить об этом.
Потоки
Модели асинхронных задач, представленной в предыдущем разделе, в каких-то случаях может оказаться достаточно, но если вам нужны более глубокая обработка и более тонкий контроль за выполнением потоков, то C++11 предлагает класс thread, объявленный в заголовке <thread> и расположенный в пространстве имен std.
Несмотря на более сложную модель программирования, потоки обеспечивают более эффективные способы синхронизации и координации, что позволяет одному потоку передавать выполнение другому и ждать определенное время или до того момента, пока другой поток не закончит свою работу.
В следующем примере (он содержится в проекте Threads в прилагаемых материалах) у меня имеется лямбда-функция, которая, получив целочисленный аргумент, выводит в консоль кратные ему значения, меньшие 100 000:
auto multiple_finder = [](int n) {
for (int i = 0; i < 100000; i++)
if (i%n==0)
cout << i << " is a multiple of " << n << endl;
};
int main(int argc, char *argv[])
{
thread th(multiple_finder, 23456);
multiple_finder(34567);
th.join();
}
Как вы увидите в последующих примерах, тот факт, что я передал лямбду потоку, не существенен; можно было бы обойтись функцией или функтором.
В функции main я запускаю эту функцию в двух потоках с разными параметрами. Взгляните на мои результаты (они могут варьироваться при разных прогонах из-за различий в синхронизации):
0 is a multiple of 23456
0 is a multiple of 34567
23456 is a multiple of 23456
34567 is a multiple of 34567
46912 is a multiple of 23456
69134 is a multiple of 34567
70368 is a multiple of 23456
93824 is a multiple of 23456
Я мог бы реализовать пример с асинхронными задачами из предыдущего раздела на основе потоков. В этом случае я должен ввести концепцию promise. Это своего рода приемник, через который пройдет результат, когда он будет доступен. Где после этого появится результат? Каждому promise сопоставляется future.
Код на рис. 2, доступный в проекте Promises, связывает три потока (вместо задач) с соответствующими promise и заставляет каждый поток вызывать функцию calculate. Сравните этот код с более простой версией AsyncTasks.
Рис. 2. Сопоставление future с promise
typedef int (*calculate)(void);
void func2promise(calculate f, promise<int> &p)
{
p.set_value(f());
}
int main(int argc, char *argv[])
{
getUserData();
promise<int> p1, p2;
future<int> f1 = p1.get_future(), f2 = p2.get_future();
thread t1(&func2promise, calculateB, std::ref(p1)),
t2(&func2promise, calculateC, std::ref(p2));
c = (calculateA() + f1.get()) * f2.get();
t1.join(); t2.join();
showResult();
}
Переменные и исключения уровня потоков
В C++ можно определять глобальные переменные, область видимости которых ограничена только уровнем приложения, включая потоки. Но теперь появился способ так определять эти глобальные переменные, чтобы в каждом потоке хранилась своя копия этих переменных. Данная концепция известна под названием «хранилище, локальное для потока (thread local storage, TLS), которое объявляется так:
thread_local int subtotal = 0;
Если это объявление размещается в области видимости какой-либо функции, то видимость этой переменной будет ограничена этой функцией, но каждый поток будет хранить собственную статическую копию этой переменной. То есть значения данной переменной в каждом потоке сохраняются между вызовами функции.
Хотя thread_local недоступно в Visual C++ 11, его можно имитировать с помощью нестандартного расширения от Microsoft:
#define thread_local __declspec(thread)
Что будет, если исключение генерируется внутри потока? В ряде случаев исключение может быть захвачено и обработано в стеке вызовов внутри потока. Но, если поток не обрабатывает данное исключение, вам нужен некий способ его транспортировки потоку-инициатору. Требуемые механизмы введены в C++11.
На рис. 3 (полный код доступен в проекте ThreadInternals) показана функция sum_until_element_with_threshold, которая обходит вектор до тех пор, пока не обнаруживает конкретный элемент, попутно суммируя все элементы, попадающиеся ей в процессе обхода. Если сумма превышает пороговую величину, генерируется исключение.
Рис. 3. TLS и исключения
thread_local unsigned sum_total = 0;
void sum_until_element_with_threshold(unsigned element,
unsigned threshold, exception_ptr& pExc)
{
try {
find_if_not(begin(v), end(v), [=](const unsigned i) -> bool
{
bool ret = (i!=element);
sum_total+= i;
if (sum_total>threshold)
throw runtime_error("Sum exceeded threshold.");
return ret;
});
cout << "(Thread #" << this_thread::get_id() << ") " <<
"Sum of elements until " << element << " is found: " <<
sum_total << endl;
} catch (...) {
pExc = current_exception();
}
}
При генерации исключения оно захватывается через current_exception в exception_ptr.
Функция main запускает поток для sum_until_element_with_threshold, параллельно вызывая ту же функцию с другим параметром. Когда оба вызова завершаются (в основном потоке и в потоке, запущенном из него), их exception_ptr анализируются:
const unsigned THRESHOLD = 100000;
vector<unsigned> v;
int main(int argc, char *argv[])
{
exception_ptr pExc1, pExc2;
scramble_vector(1000);
thread th(sum_until_element_with_threshold, 0, THRESHOLD,
ref(pExc1));
sum_until_element_with_threshold(100, THRESHOLD, ref(pExc2));
th.join();
dealWithExceptionIfAny(pExc1);
dealWithExceptionIfAny(pExc2);
}
Если любой из этих exception_ptr инициализируется — признак того, что произошло какое-то исключение, — их исключения инициируются заново с помощью rethrow_exception:
void dealWithExceptionIfAny(exception_ptr pExc)
{
try
{
if (!(pExc==exception_ptr()))
rethrow_exception(pExc);
} catch (const exception& exc) {
cout << "(Main thread) Exception received from thread: "
<< exc.what() << endl;
}
}
Вот результат нашего выполнения, когда сумма во втором потоке превысила свое пороговое значение:
(Thread #10164) Sum of elements until 0 is found: 94574
(Main thread) Exception received from thread:
Sum exceeded threshold.
Синхронизация параллельной обработки
В идеале было бы желательно, чтобы все приложения можно было разбивать на набор совершенно независимых асинхронных задач. На практике это почти никогда недостижимо, так как существуют, как минимум, зависимости от данных, параллельно обрабатываемых всеми участниками. В этом разделе вы ознакомитесь с новыми технологиями C++11, предотвращающими условия гонок (конкуренцию).
- Атомарные типы Аналогичны элементарным типам данных, но обеспечивают модификацию, безопасную в многопоточной среде.
- Мьютексы (mutexes) и блокировки Элементы, позволяющие определять критические области, безопасные в многопоточной среде.
- Условные переменные Способ замораживания потоков до тех пор, пока не будет удовлетворен некий критерий.
Атомарные типы
Заголовок <atomic> вводит ряд элементарных типов — atomic_char, atomic_int и др., — реализованных с применением блокирующих операций. Таким образом, эти типы эквивалентны своим омонимам без префикса atomic_, но с тем различием, что все их операции присваивания (==, ++, --, +=, *= и т. д.) защищены от условий гонок. Поэтому посреди присваивания чего-либо этим типам данных другой поток не сможет прервать эту операцию и изменить значения.
В следующем примере два параллельных потока (один из них является основным) ищут разные элементы внутри одного и того же вектора:
atomic_uint total_iterations;
vector<unsigned> v;
int main(int argc, char *argv[])
{
total_iterations = 0;
scramble_vector(1000);
thread th(find_element, 0);
find_element(100);
th.join();
cout << total_iterations << " total iterations." << endl;
}
При нахождении каждого элемента из потока выводится сообщение, уведомляющее о позиции в векторе (или об итерации), в которой был найден данный элемент:
void find_element(unsigned element)
{
unsigned iterations = 0;
find_if(begin(v), end(v),
[=, &iterations](const unsigned i) -> bool {
++iterations;
return (i==element);
});
total_iterations+= iterations;
cout << "Thread #" << this_thread::get_id() <<
": found after " << iterations << " iterations." << endl;
}
Здесь также присутствует общая переменная, total_iterations, которая обновляется комбинированным числом итераций, выполненных обоими потоками. Таким образом, total_iterations должна быть атомарной, что два потока не могли обновить ее одновременно. В предыдущем примере, даже если бы в find_element не требовалось выводить частичное число итераций, все равно нужно суммировать число итераций в локальной, а не в общей переменной total_iterations, чтобы избежать конкуренции за атомарную переменную.
Предыдущий пример вы найдете в проекте Atomics. Запустив его, я получил следующее:
Thread #8064: found after 168 iterations.
Thread #6948: found after 395 iterations.
563 total iterations.
Мьютекс и блокировки
В предыдущем разделе был показан конкретный случай взаимоисключения (mutual exclusion, mutex) для получения доступа к элементарным типам на запись. В заголовке <mutex> определен ряд блокируемых классов (lockable classes) для определения критических областей (critical regions). Благодаря этому вы можете определить мьютекс, чтобы установить критическую область вокруг набора функций или методов. Тогда обращаться к любому члену этого набора сможет только один поток единовременно, успешно захвативший свой мьютекс.
Поток, пытающийся захватить мьютекс, может либо оставаться блокированным, пока этот мьютекс не станет доступным, либо потерпеть неудачу в этих попытках. Между этими двумя крайностями предлагается альтернативный класс timed_mutex, ожидать захвата которого можно лишь в течение короткого интервала, после чего попытка считается неудачной. Это помогает избегать взаимоблокировок (deadlocks).
Захваченный (блокированный) мьютекс по окончании работы в критической области нужно явным образом освободить (разблокировать) для других потоков. Если вы этого не сделаете, поведение приложения может стать непредсказуемым по аналогии с тем, когда вы забываете освободить динамическую память. Однако в случае с мьютексом последствия будут куда хуже, так как приложение может оказаться вообще больше не способным функционировать должным образом. К счастью, в C++11 также предусмотрены блокирующие классы (locking classes). Блокировка действует на мьютекс, но ее деструктор гарантирует освобождение мьютекса, если он заблокирован.
В следующем коде (доступном в проекте Mutex) определяется критическая область вокруг мьютекса с именем mx:
mutex mx;
void funcA();
void funcB();
int main()
{
thread th(funcA)
funcB();
th.join();
}
Этот мьютекс гарантирует, что две функции — funcA и funcB — смогут работать параллельно, не пересекаясь в критической области.
Функция funcA при необходимости будет ждать входа в критическую область. Чтобы заставить ее делать это, достаточно простейшего блокирующего механизма lock_guard:
void funcA()
{
for (int i = 0; i<3; ++i)
{
this_thread::sleep_for(chrono::seconds(1));
cout << this_thread::get_id() <<
": locking with wait... " << endl;
lock_guard<mutex> lg(mx);
... // что-то делаем в критической области
cout << this_thread::get_id() << ": releasing lock."
<< endl;
}
}
Согласно определению, funcA должна обращаться к критической области три раза. Функция funcB будет пытаться захватить мьютекс и, если он уже блокирован к тому времени, просто подождет одну секунду до повторной попытки получить доступ к критической области. Механизм, который она использует, — unique_lock с политикой try_to_lock_t, как показано на рис. 4.
Рис. 4. Блокировка с ожиданием
void funcB()
{
int successful_attempts = 0;
for (int i = 0; i<5; ++i)
{
unique_lock<mutex> ul(mx, try_to_lock_t());
if (ul)
{
++successful_attempts;
cout << this_thread::get_id() <<
": lock attempt successful." << endl;
... // что-то делаем в критической области
cout << this_thread::get_id() << ": releasing lock."
<< endl;
} else {
cout << this_thread::get_id() <<
": lock attempt unsuccessful. Hibernating..." << endl;
this_thread::sleep_for(chrono::seconds(1));
}
}
cout << this_thread::get_id() << ": " << successful_attempts
<< " successful attempts." << endl;
}
Функция funcB в соответствии с ее определением будет пытаться войти в критическую область до пяти раз. Результат выполнения этих функций показан на рис. 5. Из пяти попыток funcB сумела войти в критическую область лишь дважды.
Рис. 5. Выполнение проекта-примера Mutex
funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful. Hibernating ...
funcA: releasing lock.
funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful. Hibernating ...
funcB: lock attempt unsuccessful. Hibernating ...
funcA: releasing lock.
funcB: 2 successful attempts.
funcA: locking with wait ...
funcA: lock secured ...
funcA: releasing lock.
Условные переменные
В заголовке <condition_variable> определен последний рассматриваемый в этой статье механизм, имеющий фундаментальное значение в тех случаях, когда координация между потоками увязана с событиями.
В следующем примере, доступном в проекте CondVar, функция producer ставит элементы в очередь:
mutex mq;
condition_variable cv;
queue<int> q;
void producer()
{
for (int i = 0;i<3;++i)
{
... // создаем элемент
cout << "Producer: element " << i << " queued." << endl;
mq.lock(); q.push(i); mq.unlock();
cv.notify_all();
}
}
Стандартная очередь не является безопасной в многопоточной среде, поэтому вы должны гарантировать, что в процессе постановки в очередь больше никто использовать эту очередь не будет (т. е. функция consumer не извлекает какой-либо элемент).
Функция consumer пытается извлечь элементы из очереди, когда они становятся доступными, или просто ждет какое-то время на условной переменной до повторения попытки; если две последовательные попытки оказываются неудачными, consumer завершается (рис. 6).
Рис. 6. Пробуждение потоков через условные переменные
void consumer()
{
unique_lock<mutex> l(m);
int failed_attempts = 0;
while (true)
{
mq.lock();
if (q.size())
{
int elem = q.front();
q.pop();
mq.unlock();
failed_attempts = 0;
cout << "Consumer: fetching " << elem <<
" from queue." << endl;
... // используем элемент
} else {
mq.unlock();
if (++failed_attempts>1)
{
cout << "Consumer: too many failed attempts ->
Exiting." << endl;
break;
} else {
cout << "Consumer: queue not ready -> going to sleep."
<< endl;
cv.wait_for(l, chrono::seconds(5));
}
}
}
}
Функция producer должна пробуждать функцию consumer через notify_all всякий раз, когда в очереди становится доступным новый элемент. Тем самым producer предотвращает засыпание consumer на весь интервал, если элементы готовы до истечения этого периода.
Результат моего прогона показан на рис. 7.
Рис. 7. Синхронизация с помощью условных переменных
Consumer: queue not ready -> going to sleep.
Producer: element 0 queued.
Consumer: fetching 0 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 1 queued.
Consumer: fetching 1 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 2 queued.
Producer: element 3 queued.
Consumer: fetching 2 from queue.
Producer: element 4 queued.
Consumer: fetching 3 from queue.
Consumer: fetching 4 from queue.
Consumer: queue not ready -> going to sleep.
Consumer: two consecutive failed attempts -> Exiting.
Целостное представление
Напомню, что в этой статье показана концептуальная панорама механизмов, введенных в C++11 для поддержки параллельного выполнения в эпоху многоядерных процессоров.
Асинхронные задачи позволяют распараллеливать выполнение с использованием облегченной модели программирования. Результат каждой задачи можно получить через сопоставленный future.
Потоки обеспечивают более тонкое управление, чем задачи, но влекут за собой более существенные издержки — особенно учитывая механизмы хранения раздельных копий статических переменных и транспорта исключений между потоками.
Так как параллельные потоки работают с общими данными, C++11 предлагает ресурсы, предотвращающие конкуренцию. Атомарные типы гарантируют модификацию данных только одним потоком единовременно.
Мьютексы помогают нам определять критические области в коде — регионы, в которых запрещен параллельный доступ потоков. Блокировки обертывают мьютексы, увязывая разблокировку последних с жизненным циклом первых.
Наконец, условные переменные обеспечивают более эффективную синхронизацию потоков, поскольку некоторые потоки могут ждать события, о которых их уведомляют другие потоки.
В этой статье не были рассмотрены все способы конфигурирования и использования каждого из этих механизмов, но теперь у вас должно сложиться целостное впечатление о них, и вы готовы самостоятельно продолжить более глубокое их изучение.
Исходный код можно скачать по ссылке code.msdn.microsoft.com/mag201203CPP.