Прошлое
Исторически сложилось так, что для создания «отзывчивых» клиентских приложений, масштабируемых серверов и для распараллеливания алгоритмов разработчики напрямую манипулировали потоками. Но это же вело к взаимоблокировкам (deadlocks), активным блокировкам (livelocks), очередям на блокировках (lock convoys), «топтанию потоков на месте» (two-step dances), конкуренции за блокировки (race conditions), превышению лимита (oversubscription) и уйме других нежелательных проблем в приложениях. С самого начала Microsoft .NET Framework предоставляла мириады низкоуровневых средств для создания параллельных приложений, в том числе целое пространство имен, специально выделенное для этой области: System.Threading. При наличии примерно 50 типов в этом пространстве имен в базовых сборках .NET Framework 3.5 (включая такие типы, как Thread, ThreadPool, Timer, Monitor, ManualResetEvent, ReaderWriterLock и Interlocked) никто не должен был бы винить .NET Framework в легковесном отношении к поддержке потоков. И тем не менее я обвиняю предыдущие версии .NET Framework в таком отношении к реальной поддержке разработчиков, которым нужно было создавать масштабируемые приложения с высокой степенью распараллеливания. С радостью констатирую, что эта проблема устранена в .NET Framework 4 и что в будущих версиях .NET Framework будет внесено много усовершенствований в этой области.
Некоторые могут усомниться в ценности богатой подсистемы в управляемом языке для написания параллельного кода. В конце концов, параллелизм и параллельная обработка сводятся к вопросам производительности, а разработчики, заинтересованные в максимальном быстродействии, должны искать его в неуправляемых языках, которые обеспечивают полный доступ к «железу» и контроль над каждым битом, позволяют манипулировать кеш-линиями и выполнять interlocked-операции…, правильно? Если бы дело и впрямь обстояло таким образом, я бы испугался за состояние нашей индустрии. Существуют управляемые языки вроде C#, Visual Basic и F#, которые предоставляют всем разработчикам — и простым смертным, и супергероям — безопасную, производительную среду для быстрого написания эффективного кода. Разработчикам даются тысячи и тысячи заранее скомпилированных библиотечных классов наряду с языками, напичканными всеми современными сервисами и позволяющими достигать впечатляющих показателей производительности. Все это я говорю, чтобы подчеркнуть, что управляемые языки и связанные с ними инфраструктуры имеют глубокую поддержку для создания высокопроизводительных параллельных приложений, благодаря которой и волки сыты, и овцы целы — даже на современном аппаратном обеспечении.
Я всегда считал, что шаблоны — хороший способ чему-то научиться, поэтому для данной тематики будет тем более правильно, если мы начнем наше исследование с рассмотрения какого-то шаблона. И для «сбивающего с толку», и для «восхитительно» параллельного шаблона одна из наиболее востребованных конструкций разветвления-соединения (fork-join) — параллельный цикл, который предназначен для обработки каждой независимой итерации в цикле параллельно. Очень поучительно посмотреть, как такая обработка могла бы выполняться с использованием ранее упомянутых низкоуровневых примитивов, и для этого мы подробно обсудим базовую реализацию наивного параллельного цикла, реализованного на C#. Возьмем типичный цикл for:
for (int i=0; i<N; i++) {
... // здесь обрабатывается i
}
Мы можем напрямую использовать потоки, чтобы добиться распараллеливания этого цикла, как показано на рис. 1.
Рис. 1. Распараллеливание цикла for
int lowerBound = 0, upperBound = N;
int numThreads = Environment.ProcessorCount;
int chunkSize = (upperBound - lowerBound) / numThreads;
var threads = new Thread[numThreads];
for (int t = 0; t < threads.Length; t++) {
int start = (chunkSize * t) + lowerBound;
int end = t < threads.Length - 1 ? start +
chunkSize : upperBound;
threads[t] = new Thread(delegate() {
for (int i = start; i < end; i++) {
... // здесь обрабатываем i
}
});
}
foreach (Thread t in threads) t.Start(); // fork
foreach (Thread t in threads) t.Join(); // join
Конечно, с этим подходом к распараллеливанию возникает множество проблем. Мы крутим на блокировках новые потоки, выделенные циклу, что не только добавляет издержки (особенно если тело цикла выполняет тривиальную работу), но и может значительно превысить лимиты в процессе, выполняющем параллельно другую работу. Мы используем статическое распределение работы между потоками, способное привести к значительному перекосу нагрузки, если она неравномерно распределена по итерациям (не говоря уж о том, что при количестве итераций, не кратном числу потоков последний из них перегружается). Однако хуже всего, что разработчик вынужден писать такой код. Каждый алгоритм, который мы пытаемся распараллелить, требует аналогичного кода — в лучшем случае весьма хрупкого.
Проблема, проиллюстрированная предыдущим кодом, усугубляется, когда мы осознаем, что параллельные циклы — лишь один из множества шаблонов, существующих в параллельных программах. Заставлять разработчиков выражать все такие параллельные шаблоны на этом низком уровне кодирования — плохая модель программирования, не сулящая успеха широким массам разработчиков, которым нужно использовать возможности аппаратного обеспечения с высокой степенью параллелизма.
Настоящее
Переходим к .NET Framework 4. Этот выпуск .NET Framework был дополнен множеством средств, которые значительно облегчают выражение параллелизма в приложениях и обеспечивают его эффективность. Эти средства выходят далеко за рамки параллельных циклов, но мы, тем не менее, начнем с них.
Пространство имен System.Threading в .NET Framework 4 было расширено новым подпространством имен System.Threading.Tasks. Оно включает новый тип — Parallel, который предоставляет уйму статических методов для реализации параллельных циклов и структурированных шаблонов fork-join. В качестве примера его применения рассмотрим предыдущий цикл for:
for (int i=0; i<N; i++) {
... // здесь обрабатываем i
}
С помощью класса Parallel этот цикл можно распараллелить так:
Parallel.For(0, N, i => {
... // здесь обрабатываем i
});
В этом примере разработчик по-прежнему отвечает за то, чтобы каждая итерация цикла была независимой, но в остальном конструкция Parallel.For берет на себя все аспекты распараллеливания этого цикла. Она обрабатывает динамическое распределение входного диапазона между всеми нижележащими потоками, участвующими в этом вычислении, в то же время сводя издержки разделения примерно к тому уровню, что и в реализациях со статическим распределением. Кроме того, данная конструкция динамически увеличивает и уменьшает количество потоков, чтобы подобрать оптимальное их число для текущей рабочей нагрузки (которое не всегда совпадает с количеством аппаратных потоков). Parallel.For предоставляет средства обработки исключений, отсутствующие в моей наивной реализации, показанной ранее, и т. д. Что важнее всего, Parallel.For избавляет разработчика от того, чтобы рассматривать параллелизм на низком уровне абстракции потоков в ОС, и от необходимости каждый раз писать код для весьма тонких решений, связанных с разделением рабочих нагрузок, их распределением по нескольким ядрам процессора и эффективным объединением результатов. Вместо этого разработчик может сосредоточиться на главном: прикладной логике, которая и дает ему заработок.
Parallel.For также предоставляет механизмы более тонкого управления работой цикла. С помощью параметра options, передаваемого в метод For, можно управлять планировщиком, определяющим работу цикла, задавать максимальную степень параллелизма и корректно завершать цикл при запросе отмены:
var options = new ParallelOptions {
MaxDegreeOfParallelism = 4 };
Parallel.For(0, N, options, i=> {
... // здесь обрабатываем i
});
Такие возможности настройки подчеркивают одну из целей предпринимаемых усилий в расширении поддержки распараллеливания в .NET Framework: существенно облегчить использование параллелизма без усложнения программирования и в то же время предоставить более квалифицированным разработчикам средства, необходимые им для тонкой настройки обработки и выполнения. В этом отношении, кстати, поддерживаются дополнительные средства. Другие перегруженные версии Parallel.For позволяют досрочно выходить из цикла:
Parallel.For(0, N, (i,loop) => {
... // здесь обрабатываем i
if (SomeCondition()) loop.Break();
});
Есть также версии, дающие возможность передавать состояние между итерациями, которые выполняются в одном и том же потоке, что обеспечивает гораздо более эффективные реализации таких алгоритмов, как, например, упрощений (reductions):
static int SumComputations(int [] inputs,
Func<int,int> computeFunc) {
int total = 0;
Parallel.For(0, inputs.Length, () => 0, (i,loop,partial)=> {
return partial + computeFunc(inputs[i]);
},
partial => Interlocked.Add(ref total, partial));
}
Класс Parallel поддерживает не только целые диапазоны, но и произвольные источники IEnumerable<T> представление в .NET Framework перечислимой последовательности: код может регулярно вызывать MoveNext в перечислителе, чтобы получать следующее значение Current. Эта возможность использовать произвольные перечислимые последовательности позволяет параллельно обрабатывать произвольные наборы данных независимо от их представления в памяти; более того, источники данных можно материализовать по требованию и загружать, когда вызовы MoveNext достигают еще не материализованных разделов исходных данных:
IEnumerable<string> lines = File.ReadLines("data.txt");
Parallel.ForEach(lines, line => {
... // здесь обрабатываем строку
});
Parallel.ForEach обеспечивает еще более широкие возможности настройки и контроля, чем Parallel.For. Например, ForEach позволяет определять, как именно будет разделяться входной набор данных. Это делается с помощью набора специальных абстрактных классов, которые позволяют конструкциям распараллеливания запрашивать фиксированное или переменное количество разделов (partitions) и дают им возможность предоставлять абстракции этих разделов над входным набором данных, а также закреплять данные за разделами — статически или динамически:
Graph<T> graph = ...;
Partitioner<T> data = new GraphPartitioner<T>(graph);
Parallel.ForEach(data, vertex => {
... // здесь обрабатываем вертекс
});
Parallel.For и Parallel.ForEach дополняются в классе Parallel методом Invoke, принимающим произвольное количество операций, которые нужно запускать с такой степень параллелизма, с какой справится нижележащая система. Эта классическая конструкция fork-join облегчает распараллеливание рекурсивных алгоритмов декомпозиции (divide-and-conquer algorithms), как в часто используемом примере QuickSort:
static void QuickSort<T>(T [] data, int lower, int upper) {
if (upper – lower < THRESHOLD) {
Array.Sort(data, index:lower, length:upper-lower);
}
else {
int pivotPos = Partition(data, lower, upper);
Parallel.Invoke(
() => QuickSort(data, lower, pivotPos),
() => QuickSort(data, pivotPos, upper));
}
}
Несмотря на большой шаг вперед, класс Parallel лишь поверхностно использует доступную функциональность. Одной из более фундаментальных мер, предпринятых в .NET Framework 4 в области распараллеливания, было введение Parallel LINQ, или сокращенно PLINQ (произносится как «Pee-link»). LINQ (Language Integrated Query) появился в .NET Framework версии 3.5. LINQ — это на самом деле две сущности: описание набора операторов, предоставляемых как методы для манипуляций над наборами данных, и совокупность ключевых слов в C# и Visual Basic для выражения запросов непосредственно в языке. Многие из операторов, включенных в LINQ, базируются на эквивалентных давно известных операциях, в том числе Select, SelectMany, Where, Join, GroupBy и еще около 50 других. В .NET Framework Standard Query Operators API определен шаблон для этих методов, но он не описывает, для каких наборов данных предназначены эти операции и как именно следует реализовать эти операции. Далее этот шаблон реализуется различными «провайдерами LINQ» для множества различных источников данных и целевых сред (наборов в памяти, баз данных SQL, объектно-реляционных систем (ORM), вычислительных кластеров HPC Server, временных и потоковых источников данных и т. д.). Один из наиболее часто применяемых провайдеров — LINQ to Objects, и он предоставляет полный набор LINQ-операторов, реализованных поверх IEnumerable<T>. Это обеспечивает реализацию запросов в C# и Visual Basic. Например, следующий фрагмент кода считывает все данные из файла строка за строкой, отфильтровывая строки, содержащие слово «secret» и зашифровывая их; в конечном счете вы получаете перечислимые байтовые массивы:
IEnumerable<byte[]> encryptedLines =
from line in File.ReadLines("data.txt")
where line.Contains("secret")
select DataEncryptor.Encrypt(line);
Для запросов, требующих интенсивных вычислений, или даже запросов, которые вызывают много операций ввода-вывода с длительными задержками, PLINQ предоставляет средства автоматического распараллеливания, реализуя полный набор LINQ-операторов, использующих параллельные алгоритмы. Таким образом, предыдущий запрос можно распараллелить простым добавлением к источнику данных «.AsParallel()»:
IEnumerable<byte[]> encryptedLines =
from line in File.ReadLines("data.txt").AsParallel()
where line.Contains("secret")
select DataEncryptor.Encrypt(line);
Как и в случае класса Parallel, эта модель тоже заставляет разработчика оценивать последствия параллельного выполнения определенных вычислений. Но, как только выбор сделан, система берет на себя обработку низкоуровневых деталей реального распараллеливания, разбиения на разделы, регулирование потоков и прочее. Кроме того, как и Parallel, эти PLINQ-запросы можно настраивать самыми разнообразными способами. Разработчик может управлять тем, как осуществляется секционирование, насколько высокая степень параллелизма применяется на самом деле, контролировать баланс между синхронизацией и задержками и другие вещи:
IEnumerable<byte[]> encryptedLines =
from line in new OneAtATimePartitioner<string>(
File.ReadLines("data.txt"))
.AsParallel()
.AsOrdered()
.WithCancellation(someExternalToken)
.WithDegreeOfParallelism(4)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
where line.Contains("secret")
select DataEncryptor.Encrypt(line);
Эти мощные и высокоуровневые модели программирования для циклов и запросов построены на основе не менее мощного, но низкоуровневого набора API задач, который главным образом опирается на типы Task и Task<TResult> в пространстве имен System.Threading.Tasks. По сути, механизмы параллельных циклов и запросов являются генераторами задач (task generators), которые опираются на инфраструктуру задач и сопоставляют параллелизм с ресурсами, доступными в нижележащей операционной системе. Task — это фактически представление единицы работы или более обобщенно единицы асинхронности, рабочего элемента, который можно порождать и впоследствии соединять с другими такими элементами различными средствами. Task предоставляет методы Wait, WaitAll и WaitAny, которые обеспечивают синхронное блокирование дальнейшего продвижения до тех пор, пока не будет завершена целевая задача (или задачи) и пока не будут выполнены дополнительные ограничения, переданные перегруженным версиям этих методов (например, истечет время ожидания или появится маркер отмены). Task поддерживает опрос на предмет завершения через свойство IsCompleted и — более обобщенно — опрос на наличие изменений в своем жизненном цикле, что обрабатывается через свойство Status. И, вероятно, самое главное — он предоставляет методы ContinueWith, ContinueWhenAll и ContinueWhenAny, которые позволяют создавать задачи, планируемые, только когда завершен конкретный набор предыдущих задач. Это открывает возможность легко реализовать самые разнообразные сценарии, в том числе выражать зависимости между вычислениями так, чтобы система могла планировать работу на основе выполнения условий этих зависимостей:
Task t1 = Task.Factory.StartNew(() => BuildProject(1));
Task t2 = Task.Factory.StartNew(() => BuildProject(2));
Task t3 = Task.Factory.StartNew(() => BuildProject(3));
Task t4 = Task.Factory.ContinueWhenAll(
new [] { t1, t2 }, _ => BuildProject(4));
Task t5 = Task.Factory.ContinueWhenAll(
new [] { t2, t3 }, _ => BuildProject(5));
Task t6 = Task.Factory.ContinueWhenAll(
new [] { t4, t5 }, _ => BuildProject(6));
t6.ContinueWith(_ => Console.WriteLine(
"Solution build completed."));
Класс Task<TResult>, производный от Task, позволяет передавать результаты от завершенной операции:
int SumTree<T>(Node<T> root, Func<T,int> computeFunc) {
if (root == null) return 0;
Task<int> left =
Task.Factory.StartNew(() => SumTree(root.Left));
Task<int> right =
Task.Factory.StartNew(() => SumTree(root.Right));
return computeFunc(root.Data) + left.Result + right.Result;
}
При всех этих моделях (циклах, запросах и задачах) в .NET Framework применяются методики перехвата работы (work-stealing techniques), обеспечивающие более эффективную обработку специализированных рабочих нагрузок; кроме того, по умолчанию используется эвристическая логика поиска экстремума (hill-climbing heuristics) для варьирования количества участвующих потоков в течение времени, чтобы найти оптимальный уровень обработки. Эвристическая логика также встроена в части этих компонентов для автоматического переключения на последовательную обработку, если система считает, что любая попытка распараллеливания приведет к более длительной обработке, чем последовательная; однако эту логику, как и многое другое, что уже обсуждалось, тоже можно изменять.
Task<TResult> представляет не только операции, связанные с вычислениями. Его можно использовать и для представления произвольных асинхронные операций. Рассмотрим класс System.IO.Stream из .NET Framework, в котором содержится метод Read для извлечения данных из их потока (stream):
NetworkStream source = ...;
byte [] buffer = new byte[0x1000];
int numBytesRead = source.Read(buffer, 0, buffer.Length);
Эта операция Read является синхронной и блокирующей, из-за чего поток, вызывающий Read, нельзя использовать для другой работы до завершения этой операции, завязанной на ввод-вывод. Чтобы более высокую масштабируемость, класс Stream предоставляет асинхронный эквивалент метода Read в виде двух методов: BeginRead и EndRead. Эти методы следуют шаблону, существующему в .NET Framework с момента ее появления, а именно шаблону APM (Asynchronous Programming Model). Ниже показана асинхронная версия предыдущего кода:
NetworkStream source = …;
byte [] buffer = new byte[0x1000];
source.BeginRead(buffer, 0, buffer.Length,
delegate(IAsyncResult iar) {
int numBytesRead = source.EndRead(iar);
}, null);
Однако этот подход ухудшает возможности композиции (composability). Тип TaskCompletionSource<TResult> устраняет эту проблему, позволяя предоставлять такую асинхронную операцию чтения как задачу:
public static Task<int> ReadAsync(
this Stream source, byte [] buffer, int offset, int count)
{
var tcs = new TaskCompletionSource<int>();
source.BeginRead(buffer, 0, buffer.Length, iar => {
try { tcs.SetResult(source.EndRead(iar)); }
catch(Exception exc) { tcs.SetException(exc); }
}, null);
return tcs.Task;
}
Благодаря этому становится возможной композиция множества асинхронных операций — так же, как в примерах с чисто вычислительными операциями. В следующем примере осуществляется параллельное чтение изо всех источников — потоков данных и вывод в консоль выполняется, только когда завершены все операции:
NetworkStream [] sources = ...;
byte [] buffers = ...;
Task.Factory.ContinueWhenAll(
(from i in Enumerable.Range(0, sources.Length)
select sources[i].ReadAsync(buffers[i], 0,
buffers[i].Length)).ToArray(),
_ => Console.WriteLine("All reads completed"));
Помимо механизмов для распараллеливания и запуска параллельной обработки, .NET Framework 4 также предоставляет примитивы для более глубокой координации работы между задачами и потоками. К ним относится набор масштабируемых и безопасных в условиях многопоточности (thread-safe) типов-наборов, которые в основном исключают необходимость для разработчиков вручную синхронизировать доступ к общим наборам. ConcurrentQueue<T> безопасный в условиях многопоточности, свободный от блокировок FIFO-набор (first-in-first-out), который может одновременно использоваться любым количеством пишущих и читающих потоков. Кроме того, он поддерживает семантику статического множества (snapshot semantics) для параллельных перечислителей, благодаря чему код может анализировать состояние очереди в момент, когда ее изменяют другие потоки. ConcurrentStack<T> аналогичен ConcurrentQueue, но вместо FIFO предоставляет семантику LIFO (last-in-first-out). ConcurrentDictionary<T> словарь, который поддерживает любое количество одновременно обращающихся «читателей», «писателей» и перечислителей. Он также содержит несколько атомарных реализаций многоступенчатых операций, таких как GetOrAdd и AddOrUpdate. Другой тип, ConcurrentBag<T>, предоставляет неупорядоченный набор, использующий очереди с перехватом работы (work-stealing queues).
.NET Framework не останавливается на типах-наборах. Lazy<T> > обеспечивает отложенную инициализацию переменной, используя настраиваемые подходы для достижения безопасности в условиях многопоточности. ThreadLocal<T> предоставляет данные, индивидуальные для потока и экземпляра, инициализацию которых можно откладывать до первого обращения. Тип Barrier обеспечивает поэтапное выполнение, чтобы несколько задач или потоков могли упорядоченно выполнять алгоритм. Список можно продолжить, но все его составляющие исходят из одного главенствующего принципа: разработчики не должны чрезмерно отвлекаться на низкоуровневые и рудиментарные аспекты распараллеливания своих алгоритмов — вместо этого они должны позволить .NET Framework обрабатывать всю механику и детали за них.
Будущее
В будущих версиях .NET Framework поддержка параллелизма и параллельной обработки, заложенная в .NET Framework 4, будет расширена; этого с нетерпением ждут многие разработчики. В фокусе следующих версий .NET Framework будет не только повышение производительности существующих моделей программирования, но и увеличение набора высокоуровневых моделей для охвата большего количества шаблонов параллельных рабочих нагрузок. Одно из таких усовершенствований — новая библиотека для реализации параллельных систем, основанных на потоках данных, и для проектирования приложений с моделями на основе агентов. Новая библиотека System.Threading.Tasks.Dataflow предоставляет множество «блоков потоков данных» («dataflow blocks»), действующих как буферы, процессоры и распространители (propagators) данных. Данные можно асинхронно отправлять в эти блоки, и данные будут обрабатываться и автоматически пересылаться любым связанным приемникам с учетом семантики блока-источника (source block). Библиотека потоков данных также построена поверх задач, причем «за кулисами» эти блоки используют задачи для обработки и распространения данных.
С точки зрения шаблонов, эта библиотека особенна полезна при обработке сетей потоков данных (dataflow networks), образующих цепочки создателей и потребителей данных. Рассмотрим случай, когда данные нужно сжимать, шифровать и записывать в файл, при этом данные поступают из потока (stream) в приложение и проходят через него. Для этого можно было бы сконфигурировать небольшую сеть блоков потока данных:
static byte [] Compress(byte [] data) { ... }
static byte [] Encrypt(byte [] data) { ... }
...
var compressor = new TransformBlock<byte[],byte[]>(Compress);
var encryptor = new TransformBlock<byte[],byte[]>(Encrypt);
var saver = new ActionBlock<byte[]>(AppendToFile);
compressor.LinkTo(encryptor);
encryptor.LinkTo(saver);
...
// По мере поступления данных
compressor.Post(byteArray);
Помимо библиотеки потоков данных, одним из самых важных новшеств в области параллельного выполнения и обработки в .NET Framework будет первоклассная языковая поддержка в C# и Visual Basic для создания и асинхронного ожидания завершения задач. Эти языки сейчас дополняются средствами перезаписи на основе конечного автомата (state-machine-based rewrite capabilities), которые позволят использовать все последовательные конструкции потоков управления одновременно с асинхронным ожиданием завершения задач. [F# в Visual Studio 2010 поддерживает одну из форм асинхронности как часть своего механизма асинхронных рабочих процессов (asynchronous workflows feature), которая также интегрируется с задачами.] Взгляните на следующий метод, который синхронно копирует данные из одного Stream в другой, возвращая число скопированных байтов:
static long CopyStreamToStream(Stream src, Stream dst) {
long numCopied = 0;
byte [] buffer = new byte[0x1000];
int numRead;
while((numRead = src.Read(buffer,0,buffer.Length)) > 0) {
dst.Write(buffer, 0, numRead);
numCopied += numRead;
}
return numCopied;
}
Реализация этой функции, в том числе ее условий и циклов, с помощью поддержки, например, ранее показанных методов BeginRead/EndRead в Stream, приведет к кошмару из обратных вызовов и логики, которая будет подвержена ошибкам и которую будет очень трудно отлаживать. Вместо этого рассмотрим подход с применением метода ReadAsync, который возвращает Task<int>, и соответствующего метода WriteAsync, возвращающего Task. Используя новую функциональность C#, можно переписать предыдущий метод так:
static async Task<long> CopyStreamToStreamAsync(
Stream src, Stream dst) {
long numCopied = 0;
byte [] buffer = new byte[0x1000];
int numRead;
while((numRead =
await src.ReadAsync(buffer,0,buffer.Length)) > 0) {
await dst.WriteAsync(buffer, 0, numRead);
numCopied += numRead;
}
return numCopied;
}
Обратите внимание на несколько минимальных изменений, потребовавшихся для преобразования синхронного метода в асинхронный. Теперь функция аннотирована как async, чтобы уведомить компилятор о том, что он должен выполнить перезапись этой функции. Благодаря этому всякий раз, когда в Task или Task<TResult>, запрашивается await-операция, оставшаяся для выполнения часть функции фактически подключается к этой задаче как продолжение (continuation): пока задача не завершится, этот вызов метода не будет занимать какой-либо поток.
Вызов метода Read преобразован в вызов ReadAsync, чтобы ключевое слово await можно было использовать для обозначения точки возврата, где остаток функции должен быть преобразован в продолжение; то же самое относится к WriteAsync. Когда этот асинхронный метод завершается, возвращаемое значение типа long будет приведено к Task<long>,которое было возвращено изначально вызвавшему метод CopyStreamToStreamAsync, с использованием механизма, подобного уже показанному в случае TaskCompletionSource<TResult>. Теперь я могу использовать возвращаемое значение от CopyStreamToStreamAsync так же, как и любой Task, ожидать на нем, подключать к нему продолжение и т. д. Благодаря функциональности вроде ContinueWhenAll и WaitAll я могу инициировать множество асинхронных операций и впоследствии соединять их, чтобы добиться более высоких уровней параллельного выполнения и увеличить пропускную способность моего приложения в целом.
Эта языковая поддержка асинхронности делает более эффективными не только операции, связанные с вводом-выводом, но и операции с интенсивным использованием процессора, и, в частности, дает возможность создавать «отзывчивые» клиентские приложения (где нет привязки к UI-потоку и программа не перестает отвечать при любой нагрузке) и при этом по-прежнему использовать преимущества высокой степени распараллеливания. Переход из UI-потока в другой, выполнение любой обработки и возврат результатов в UI-поток для обновления UI-элементов и взаимодействия с пользователем давно уже стали головной болью разработчиков. Языковая поддержка асинхронности взаимодействует с ключевыми компонентами .NET Framework и по умолчанию автоматически возвращает операции в их исходный контекст по завершении await-операции (например, если await-операция инициирована из UI-потока, подключенное продолжение возобновит выполнение в этом потоке). То есть появляется возможность запуска задачи, выполняющей работу, которая требует интенсивного использования вычислительных ресурсов, как фоновой, а разработчик может просто дожидаться получения результатов и сохранять их в UI-элементах, например, так:
async void button1_Click(object sender, EventArgs e) {
string filePath = txtFilePath.Text;
txtOutput.Text = await Task.Factory.StartNew(() => {
return ProcessFile(filePath);
});
}
Эта фоновая задача может сама порождать множество задач для распараллеливания фоновых вычислений, например с использованием PLINQ-запроса:
async void button1_Click(object sender, EventArgs e) {
string filePath = txtFilePath.Text;
txtOutput.Text = await Task.Factory.StartNew(() => {
return File.ReadLines(filePath).AsParallel()
.SelectMany(line => ParseWords(line))
.Distinct()
.Count()
.ToString();
});
}
Языковую поддержку можно также использовать в сочетании с библиотекой потоков данных, что облегчает естественное выражение сценариев с асинхронными создателями и потребителями данных. Рассмотрим реализацию набора регулируемых создателей, каждый из которых генерирует какие-то данные, отправляемые ряду потребителей. В синхронной реализации можно было бы использовать тип вроде BlockingCollection<T> (see рис. 2), введенный в .NET Framework 4.
Рис. 2. Применение BlockingCollection
static BlockingCollection<Datum> s_data =
new BlockingCollection<Datum>(boundedCapacity:100);
...
static void Producer() {
for(int i=0; i<N; i++) {
Datum d = GenerateData();
s_data.Add(d);
}
s_data.CompleteAdding();
}
static void Consumer() {
foreach(Datum d in s_data.GetConsumingEnumerable()) {
Process(d);
}
}
...
var workers = new Task[3];
workers[0] = Task.Factory.StartNew(Producer);
workers[1] = Task.Factory.StartNew(Consumer);
workers[2] = Task.Factory.StartNew(Consumer);
Task.WaitAll(workers);
Это отличный шаблон — до тех пор, пока в приложении допустимо блокировать потоки как со стороны создателей (генераторов данных), так и их потребителей. Если этот вариант неприемлем, вы можете написать асинхронный эквивалент, используя другой из блоков потоков данных, BufferBlock<T>, и возможность асинхронной передачи данных из блока и их приема, как показано на рис. 3.
Рис. 3. Применение BufferBlock
static BufferBlock<Datum> s_data = new BufferBlock<Datum>(
new DataflowBlockOptions { BoundedCapacity=100 });
...
static async Task ProducerAsync() {
for(int i=0; i<N; i++) {
Datum d = GenerateData();
await s_data.SendAsync(d);
}
s_data.Complete();
}
static async Task ConsumerAsync() {
Datum d;
while(await s_data.OutputAvailableAsync()) {
while(s_data.TryReceive(out d)) {
Process(d);
}
}
}
...
var workers = new Task[3];
workers[0] = ProducerAsync();
workers[1] = ConsumerAsync();
workers[2] = ConsumerAsync();
await Task.WhenAll(workers);
Здесь методы SendAsync и OutputAvailableAsync возвращают задачи, позволяя компилятору подключать продолжения и обеспечивая асинхронное выполнение всего процесса.
Заключение
PПараллельное программирование давно является уделом разработчиков-экспертов с уникальной квалификацией в искусстве масштабирования кода для использования нескольких ядер процессора. Такими экспертами становятся в результате долгого обучения и накопления многолетнего опыта. Они ценятся очень высоко — их мало. В нашем прекрасном новом мире, где повсеместно распространены многоядерные процессоры, ситуация, когда с задачами распараллеливания могут справляться только эксперты, больше недопустима. Независимо от того, предназначено приложение/компонент для широкой общественности, внутреннего использования или применения в качестве промежуточного инструмента, параллелизм теперь является таким фактором, который нужно как минимум учитывать почти каждому разработчику, и концепцией, которая должна стать доступной широким массам разработчиков, использующим управляемые языки, — пусть даже через компоненты, инкапсулирующие средства распараллеливания. Модели параллельного программирования вроде тех, которые предоставляются в .NET Framework 4 и которые появятся в следующих версиях .NET Framework, сделают реальностью это светлое будущее.