Task Parallel Library (TPL), введенная в Microsoft .NET Framework 4, позволяет создавать решения, использующие мощь параллельной обработки в компьютерах с многоядерными процессорами. Однако во многих сценариях возможность вертикального масштабирования (за счет добавления большего количества ядер) ограничена рядом факторов, в том числе стоимостью и лимитами хостинга. В таких случаях, если требуется масштабируемость, желательно распределить обработку по массиву серверов; один из примеров — хостинг в облаке. В данной статье я опишу ключевые аспекты (включая реализацию) концептуального решения для достижения этой цели с применением новых средств .NET Framework 4.5.
Базовые предположения
Описываемый подход требует применения нескольких технологий:
- Task Parallel Library (TPL);
- Windows Communication Foundation (WCF);
- Managed Extensibility Framework (MEF).
Заметьте, что я буду рассматривать их только в контексте проблемы, которую я пытаюсь решить. Я исхожу из того, что вы хорошо понимаете эти технологии.
Клиент удаленной задачи, координатор задач и узлы выполнения задач
Клиент удаленной задачи (remote task client) — это клиентский уровень, скрывающий сложности, которые вытекают из семантики использования распределенной среды. Этот клиент напрямую взаимодействует с координатором задач (task coordinator), который затем становится точкой входа в нижележащую инфраструктуру. На высоком уровне координатор задач:
- является единственной точкой связи с клиентами;
- предоставляет необходимые сервисы для запроса выполнения задач на масштабируемой платформе, а также для отмены конкретной задачи;
- обеспечивает регулирование (throttling) и постановку запросов на выполнение задач в очередь, поддерживая тем самым нормальную работу среды.
Узлы выполнения задач (task execution nodes) — это хосты процессов, в которых выполняются задачи. Именно в этих узлах находятся настоящие реализации задач, которые будут выполняться TPL.
Ниже перечислены основные аспекты этих логических уровней:
- клиент удаленной задачи запрашивает выполнение одной или более задач;
- координатор задач передает запрос узлам выполнения задач;
- узлы выполнения задач обрабатывают задачи и обновляют состояние каждого запроса в координаторе задач;
- координатор задач обновляет клиент результатами выполнения каждого запроса;
- узлы выполнения задач размещаются за компонентом балансировки нагрузки, поэтому при необходимости могут быть добавлены дополнительные узлы, что обеспечивает возможность горизонтального масштабирования.
Эти логические уровни и поток информации между ними показаны на рис. 1.
Рис. 1. Горизонтальное масштабирование задач
Remote Task Client | Клиент удаленной задачи |
Task Coordinator | Координатор задач |
Load Balancer | Компонент балансировки нагрузки |
Task Execution Node O | Узел выполнения задач 0 |
Task Execution Node N | Узел выполнения задач N |
Callback | Обратный вызов |
Заметьте, что узлы выполнения задач обновляют координатор задач, который в свою очередь обновляет клиент удаленной задачи. Я опишу реализацию, основанную на двухстороннем взаимодействии между клиентом и координатором задач и между координатором и узлами выполнения задач. В терминах WCF это подразумевает использование дуплексного канала, позволяющего узлам выполнения задач инициировать обратные вызовы координатора задач, а координатору делать то же самое для обновления клиента. Я продемонстрирую применение WebSockets для реализации этого подхода с двухсторонним взаимодействием. Транспорт WebSockets реализован в .NET Framework 4.5 как новая привязка и доступен для Windows 8. Более подробную информацию об этой привязке см. по ссылке bit.ly/SOLNiU.
Клиент и координатор задач
Теперь, когда вы знаете о трех основных логических уровнях (клиенте удаленной задачи, координаторе задач и узлах выполнения задач), приступим к обсуждению реализации клиента удаленной задачи. Обратите внимание: когда я буду в дальнейшем употреблять термин "клиент" в этой статье, я буду подразумевать клиент удаленной задачи.
Как я уже упоминал, ценное преимущество клиента — скрытие сложностей нижележащих компонентов. Один из способов достижения этой цели — предоставление API, который создает иллюзию локального выполнения задач, несмотря на тот факт, что они могут выполняться где-то в другом месте. Код на рис. 2 показывает открытые методы класса RemoteTaskClient.
Рис. 2. Открытые методы класса RemoteTaskClient
public class RemoteTaskClient<TResult> : IDisposable
{
public void AddRequest(string typeName, string[] parameters,
CancellationToken tk)
{...}
public void AddRequest(string typeName, string[] parameters)
{...}
public Task<TResult>[] SubmitRequests()
{...}
public RemoteTaskClient(string taskCoodinatorEndpointAddress)
{...}
public void Dispose()
{...}
}
Вы можете использовать метод AddRequest, чтобы добавлять запросы на удаленное выполнение. Для каждого запроса нужно указывать typeName (тип истинной реализации, содержащей делегат, который инфраструктура будет удаленно выполнять как TPL-задачу) и связанные параметры. После этого вы можете передавать запросы через метод SubmitRequest. Результат передачи запросов — массив TPL-задач, по одной на каждый запрос. Этот подход позволяет управлять получаемыми TPL-задачами так, будто они локальные. Например, можно передать различные запросы и ждать их завершения так:
using (var c = new RemoteTaskClient<int>("..."))
{
c.AddRequest("...", null);
c.AddRequest("...", null);
var ts = c.SubmitRequests();
Task.WaitAll(ts);
foreach (var t in ts)
Console.WriteLine(t.Result);
}
Прежде чем углубляться в детали реализации RemoteTaskClient, давайте рассмотрим операции сервиса и контракты данных, предоставляемые координатором задач. Сделав это до изучения реализации RemoteTaskClient, вы сможете лучше понять реализацию, так как она опирается на эти сервисы.
Код на рис. 3 показывает операции сервиса, предоставляемые координатором клиенту. Через операцию SubmitRequest клиент получает возможность запрашивать выполнение одной или более TPL-задач. Клиент может также запрашивать отмену конкретной TPL-задачи, которая еще не завершена, через операцию CancelTask. Заметьте, что операция UpdateStatus является обратным вызовом. Именно через клиентскую реализацию этого контракта обратного вызова координатор задач будет обновлять состояние клиента.
Рис. 3. Операции сервиса
[ServiceContract(CallbackContract = typeof(ITaskUpdateCallback))]
public interface ITaskCoordinator
{
[OperationContract(IsOneWay = true)]
void SubmitRequest(List<STask> stask);
[OperationContract]
bool CancelTask(string Id);
}
public interface ITaskUpdateCallback
{
[OperationContract (IsOneWay = true)]
void UpdateStatus(string id, STaskStatus status, string result);
}
Посмотрим на контракт данных, представляющий запрос на выполнение задачи. Это сущность данных, которую клиент отправляет координатору задач, а тот в свою очередь передает ее узлам выполнения задач, где и происходит реальное выполнение. Класс STask, показанный на рис. 4, моделирует запрос на выполнение задачи. Используя свойства STaskTypeName и STaskParameters, клиент может указать тип задачи, подлежащей выполнению, и определить релевантные параметры. Координатор задач будет использовать свойство Id как уникальный идентификатор, с помощью которого логические уровни смогут соотносить запрос с реальной TPL-задачей, выполняемой в системе.
Рис. 4. Класс STask
[DataContract]
public class STask
{
[DataMember]
public string Id
{ get; set; }
[DataMember]
public string STaskTypeName
{ get; set; }
[DataMember]
public string[] STaskParameters
{ get; set; }
}
Теперь вернемся к RemoteTaskClient и обсудим, как я планирую соотносить локальную TPL-задачу с результатом выполнения в узлах. В TPL есть удобный класс, TaskCompletionSource<TResult>, и его можно использовать для создания TPL-задачи и управления ее жизненным циклом. Этот механизм позволяет мне сигнализировать о том, когда задача завершается, отменяется или заканчивается неудачей. Затруднение, возникающее здесь, заключается в том, что каждый запрос, отправляемый в узел выполнения задачи (через координатор) должен быть соотнесен с экземпляром TaskCompletionSource. С этой целью я реализовал класс ClientRequestInfo (рис. 5).
Рис. 5. Класс ClientRequestInfo
internal class ClientRequestInfo<TResult>
{
internal STask TaskExecutionRequest
{ get; set; }
internal TaskCompletionSource<TResult> CompletionSource
{ get; set; }
internal ClientRequestInfo(string typeName, string[] args)
{
TaskExecutionRequest = new STask()
{Id = Guid.NewGuid().ToString(), STaskTypeName =typeName,
STaskParameters = args };
CompletionSource = new TaskCompletionSource<TResult>();
}
}
На рис. 6 приведена реализация конструктора этого класса.
Рис. 6. Конструктор ClientRequestInfo
ITaskCoordinator _client;
ConcurrentDictionary<string, ClientRequestInfo<TResult>>
_requests = new ConcurrentDictionary<string,
ClientRequestInfo<TResult>>();
public RemoteTaskClient(string taskCoordinatorEndpointAddress)
{
var factory = new DuplexChannelFactory<ITaskCoordinator>
(new InstanceContext(new CallbackHandler<TResult>( _requests)),
new NetHttpBinding(),
new EndpointAddress(taskCoordinatorEndpointAddress));
_client = factory.CreateChannel();
((IClientChannel)_client).Open();
}
Обратите внимание на то, что я открываю дуплексный канал с координатором задач и создаю экземпляр обратного вызова типа CallbackHandler. CallbackHandler принимает в качестве параметра _requests, содержащий экземпляры ClientRequestInfo. Логическое обоснование этому состоит в том, что словарь _requests содержит все активные экземпляры клиентских запросов (и экземпляры TaskCompletionSource, сопоставленные с ними), а CallbackHandler будет обрабатывать обновления, поступающие от координатора задач. Поскольку словарь _requests будет обновляться несколькими запросами сервиса, мне нужно гарантировать безопасность в многопоточной среде, и поэтому нужно создавать этот словарь как экземпляр ConcurrentDictionary.
Реализация класса CallbackHandler представлена на рис. 7.
Рис. 7. Класс CallbackHandler
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler<TResult> : ITaskUpdateCallback
{
ConcurrentDictionary<string,
ClientRequestInfo<TResult>> _requests;
public void UpdateStatus
(string id, STaskStatus status, Object result)
{
ClientRequestInfo<TResult> info;
if (_requests.TryRemove(id, out info))
{
switch (status)
{
case STaskStatus.Completed: info.CompletionSource.
SetResult((TResult)result);
break;
case STaskStatus.Canceled: info.CompletionSource.
SetCanceled();
break;
case STaskStatus.Faulted: info.CompletionSource.
SetException((Exception)result);
break;
}
}
}
internal CallbackHandler(ConcurrentDictionary<string,
ClientRequestInfo<TResult>> requests)
{
requests = requests;
}
}
Далее посмотрим на реализацию методов AddRequest и SubmitRequest (рис. 8).
Рис. 8. Методы AddRequest и SubmitRequest
public void AddRequest(string typeName, string[] parameters,
CancellationToken tk)
{
var info = new ClientRequestInfo<TResult>(typeName, args);
_buffer.Add(info);
tk.Register(()=> _client.CancelTask(info.TaskExecutionRequest.Id));
}
public void AddRequest(string typeName, string[] parameters)
{
_buffer.Add(new ClientRequestInfo<TResult>(typeName, parameters));
}
public Task<TResult>[] SubmitRequests()
{
if (_buffer.Count == 0)
return null;
var req = _buffer.Select((r) =>
{
_requests.TryAdd(r.TaskExecutionRequest.Id, r);
return r.TaskExecutionRequest;
});
_client.SubmitRequest(req.ToList<STask>());
var ret = _buffer.Select(r =>
r.CompletionSource.Task).ToArray<Task<TResult>>();
_buffer.Clear();
return ret;
}
Отслеживание клиентских запросов
Как вы видели в предыдущем разделе, клиент взаимодействует исключительно с координатором задач, и именно на этот уровень возлагается обработка запросов от клиента и последующее обновление клиента результатами выполнения TPL-задачи. Со стороны клиента это требует сохранения исходного запроса в той или иной форме. Кроме того, нужно отслеживать соответствующий экземпляр обратного вызова (который позволяет взаимодействовать с клиентом), канал с узлами выполнения задач, сопоставленными с данным соединением (необходимо на случай отмены, как вы потом увидите), уникальный идентификатор, группирующий все запросы на выполнение задач, связанные с одним вызовом узла выполнения задач (для определения того, когда канал больше не нужен), а также состояние и результат выполнения. На рис. 9 дано определение класса STaskInfo — сущности, которая хранит эту информацию. Вдобавок я буду использовать в качестве механизма сохранения единственный экземпляр ConcurrentDictionary<TKey,TValue>.
Рис. 9. Классы STaskInfo и CoordinatorContext
public class STaskInfo
{
public string ExecutionRequestId
{ get; set; }
public STask ClientRequest
{ get; set; }
public ITaskUpdateCallback CallbackChannel
{ get; private set; }
public ITaskExecutionNode ExecutionRequestChannel
{ get; set; }
public STaskInfo(ITaskUpdateCallback callback)
{
CallbackChannel = callback;
}
}
public static class CoordinatorContext
{
...
private static readonly ConcurrentDictionary<string,
STaskInfo> _submissionTracker =
new ConcurrentDictionary<string, STaskInfo>();
...
}
Наконец, обратите внимание на то, что _submissionTracker содержится в классе CoordinatorContext. Этот класс будет использоваться для реализации основной функциональности координатора задач.
Обработка клиентских запросов
Координатор задач — единственная точка входа для клиентов, а значит, она должна быть способной обрабатывать максимально возможное количество клиентских запросов, в то же время не допуская насыщения узлов выполнения задач (в терминах ресурсов). Это не так легко, как может показаться. Чтобы нагляднее пояснить потенциальные трудности, рассмотрим упрощенное решение:
- координатор задач предоставляет операцию сервиса, через которую клиенты передают запросы на выполнение задач;
- координатор задач передает эти запросы узлам выполнения задач и отслеживает их, т. е. сохраняет состояние.
Базовая реализация этого процесса передачи приведена на рис. 10.
Рис. 10. Реализация процесса передачи
public class TaskCoordinatorService : ITaskCoordinator
{
...
public void SubmitRequest(List<STask> stasks)
{
CoordinatorContext.SendTasksToTaskHandler(stasks);
}
...
}
public static class CoordinatorContext
{
...
internal static void SendTaskRequestToTaskExecutionNode(List<STask> stasks)
{
var clientFactory = ... // логика создания клиентской фабрики
var channel = clientFactory.CreateChannel();
foreach (var stask in stasks)
_submissionTracker.TryAdd(stask.Id, stask);
try
{
((IClientChannel)channel).Open();
channel.Start(stasks);
}
catch (CommunicationException ex)
{
// Обработка ошибок и протоколирование...
}
finally
{
if (((IClientChannel)channel).State !=CommunicationState.Faulted)
((IClientChannel)channel).Close();
}
}
...
}
Однако эта упрощенная реализация не будет эффективно работать в некоторых сценариях.
- Если клиент передает большое количество задач в одном запросе, все они в конечном счете попадают в один узел выполнения задач, что приводит к неравномерному использованию доступных ресурсов (предполагая, что имеется более одного узла выполнения задач).
- В ситуациях с пиковой нагрузкой система может исчерпать доступные ресурсы в узлах выполнения задач, если количество выполняемых TPL-задач превышает лимиты, обрабатываемые этими ресурсами. Такое возможно, когда то, что выполняется как TPL-задача, связано с конкретным ресурсом (например, памятью). Это может привести к тому, что при пиковой нагрузке система перестанет отвечать.
Регуляторы
Один из способов справиться с такими трудностями — каким-то образом «управлять» запросами на выполнение задач по мере их прохождения по системе. В этом контексте вы можете рассматривать координатор задач как регулирующий контроллер (throttling controller). Но, прежде чем обсуждать процесс регулирования, рассмотрим семантику регуляторов (throttles), которыми я буду пользоваться в сочетании с процессом регулирования.
Риск проблем в первом случае можно уменьшить ограничением количества задач, которые координатор может передавать узлам в одном запросе. Я называю этот регулятор maxSTasksPerRequest. Используя такой подход, алгоритм балансировки нагрузки сможет эффективно распределять нагрузку по доступным узлам выполнения задач.
Второй случай сложнее. Возможное решение — ограничивать количество задач, выполняемых узлами определенным числом. Этот регулятор я назову maxNumberOfTasks.
В дополнение к этому регулятору решение могло бы получить выигрыш от еще одного регулятора, который ограничивает количество выполняемых задач на основе их типа. Чтобы пояснить, какая от этого польза, рассмотрим сценарий, где на узлах выполняются два типа задач: T1 и T2. T1 — задачи, интенсивно использующие процессорные мощности, а T2 — задачи, интенсивно использующие дисковый ввод-вывод. В этом сценарии на скорость обработки запросов на выполнение задач T1 скорее всего будут влиять активные задачи, лимитированные тем же типом ограничения, поэтому чем больше количество задач T1, тем сильнее это влияние. Поскольку задачи T2 лимитированы другим ограничением, они иначе влияют на задачи T1. Возможность ограничивать выполнение задач по типу означает, что я могу управлять тем, сколько задач T1 может выполняться в любой момент, а это позволяет мне максимально использовать процессорные ресурсы и в итоге добиться ускорения обработки. Я буду называть этот регулятор maxNumberOfTasksByType.
Очереди и регулирование
Теперь, когда вы понимаете семантику регуляторов и то, насколько они могут оказаться эффективными в поддержании работоспособности узлов выполнения задач, посмотрим, что происходит, когда достигается лимит, указанный регуляторами, т. е. обсудим сам процесс регулирования.
Один из вариантов — просто генерировать исключение. Однако это повлияло бы на общую пропускную способность решения, так как возникли бы издержки, связанные с тем, что клиенту пришлось бы выполнять проверку на определенную ошибку или сбой, а затем повторно передавать запросы до тех пор, пока координатору задач не удастся успешно обработать их. В качестве альтернативы можно было бы использовать очередь на серверной стороне для временного хранения запросов от клиента и процесс, подобный дозиметру (дозированно передающий данные), который через регулярные интервалы считывал бы запросы из очереди и пересылал бы их в узлы выполнения задач. Я задействую такой процесс для регулирования, потому что он считывает запросы из очереди по следующим правилам.
- Ограничивает количество запросов, которые могут быть извлечены из очереди, значением maxSTasksPerRequest.
- Если достигается порог maxNumberOfTasks, извлечение запросов из очереди прекращается.
- Если достигается порог maxNumberOfTasksByType, запросы соответствующего типа извлекаются из очереди, а потом ставятся обратно в очередь. Повторная отправка таких запросов в очередь обеспечивает продолжение обработки задач другого типа. Эта стратегия обеспечивает равные возможности выполнения для всех задач в очереди. Однако в некоторых случаях, возможно, придется подумать о применении очереди с поддержкой приоритетов. Хороший справочник на эту тему см. по ссылке bit.ly/NF0xQq.
Этот процесс показан на рис. 11.
Рис. 11. Процесс передачи
Requests (From the Client) | Запросы (от клиента) |
Submitter Process | Процесс дозирования |
Throttling | Регулирование |
Request Q | Запрос Q |
Requests (To the Task Execution Nodes | Запросы (адресованные узлам выполнения задач) |
Я начну описание реализации этого процесса с кода для операции сервиса SubmitRequest (рис. 12), которая ставит запросы в очередь по мере получения от клиента.
Рис. 12. Операция сервиса SubmitRequest
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskCoordinatorService : ITaskCoordinator
{
public void SubmitRequest(List<STask> stasks)
{
CoordinatorContext.EnqueueRequestsInRequestQ(stasks);
}
...
}
public static class CoordinatorContext
{
...
internal static void EnqueueRequestsInRequestQ(List<STask> stasks)
{
var callback = OperationContext.Current.GetCallbackChannel<
ITaskUpdateCallback>();
foreach (var stask in stasks)
_requestQ.Enqueue(new STaskInfo(callback) { ClientRequest = stask });
}
...
}
Далее рассмотрим реализацию дозирующего процесса (submitter process); она показана на рис. 13.
Рис. 13. Реализация Submitter
public static class CoordinatorContext
{
...
static CoordinatorContext()
{
Submitter(...);
}
private static async void Submitter(int interval)
{
while (true)
{
await Task.Delay(interval);
SendTaskRequestToTaskExecutionNode(
GetTasksFromRequestQ());
}
}
...
}
На рис. 12 и 13 видно, что операция сервиса ставит (пишет) запрос в очередь и извлекает (считывает) из нее задачу Submitter. В этом сценарии вам нужно гарантировать, что нижележащая структура данных (очередь) является безопасной в многопоточной среде. К счастью, существует класс, предназначенный специально для этого, — ConcurrentQueue<T>. Поэтому я использую единственный экземпляр этого типа как нижележащий репозитарий для запросов:
public static class CoordinatorContext
{
...
private static readonly ConcurrentQueue<STaskInfo> _requestQ =
new ConcurrentQueue<STaskInfo>();
...
}
Теперь рассмотрим реализацию метода GetTasksFromRequestQ, который считывает задачи, когда истекает интервал выполнения. Именно в этом методе происходит процесс регулирования и применяются описанные ранее регуляторы. Реализация этого процесса показана на рис. 14.
Рис. 14. Реализация GetTasksFromRequestQ
public static class CoordinatorContext
{
...internal static List<STaskInfo> GetTasksFromRequestQ()
{
var ret = new List<STaskInfo>();
var maxSTasksPerRequest = // из конфигурации
var maxNumberOfTasks = // из конфигурации
var count = // счетчик переданных или выполняемых задач
var countByType = // перечисление счетчиков по типу
for (int i = 0; i < maxSTasksPerRequest; i++)
{
STaskInfo info;
if (count + i == maxNumberOfTasks || !_requestQ.TryDequeue(out info))
return ret;
var countTT = // счетчик переданных или выполняемых
// задач типа, совпадающего с типом
// текущего элемента
if (countTT == GetMaxNumberOfTasksByType(info.ClientRequest.STaskTypeName))
{ _requestQ.Enqueue(info); }
else ret.Add(info);
}
return ret;
}
}
private static int GetMaxNumberOfTasksByType(string taskTypeName)
{
// Логика чтения из репозитария конфигурации
// значения по имени типа задачи
}
...
}
Цель реализации на рис. 14 — получение значений, позволяющих процессу оценивать условия регулирования. На рис. 15 показаны возможные LINQ-запросы, которые могут быть выполнены применительно к _submissionTracker, а также список, содержащий возвращаемые элементы (ret) для получения этих значений. Заметьте, что этот подход может срабатывать ценой потери производительности. Если это так, то в качестве альтернативы вы могли бы реализовать набор безопасных в многопоточной среде счетчиков, которые увеличиваются или уменьшаются при добавлении или удалении элементов в экземпляре компонента отслеживания передач (submission tracker), и использовать эти счетчики вместо того, чтобы напрямую запрашивать ConcurrentDictionary.
Рис. 15. Значения-регуляторы
var countByType = (from t in _submissionTracker.Values
group t by t.ClientRequest.STaskTypeName into g
select new
{
TypeName = g.Key,
Count = g.Count()
});
var count = countByType.Sum(c => c.Count);
var countTT = (from tt in countByType
where tt.TypeName == info.ClientRequest.STaskTypeName
select tt.Count).SingleOrDefault()+
ret.Where((rt) => rt.ClientRequest.STaskTypeName ==
info.ClientRequest.STaskTypeName)
.Count();
Отправка запросов узлам выполнения задач и обработка результатов
До сих пор я говорил о том, как координатор задач управляет запросами. Давайте обсудим, как он передает запрос узлам выполнения задач, но теперь уже с учетом процесса регулирования. Чтобы лучше понять это, сначала рассмотрим операции сервиса, предоставляемые узлами выполнения задач (через компонент балансировки нагрузки):
[ServiceContract( CallbackContract = typeof(ITaskUpdateCallback))]
public interface ITaskExecutionNode
{
[OperationContract]
void Start(List<STask> stask);
[OperationContract]
void Cancel(string Id);
}
Как понятно по названиям, цель этих операций — запуск списка запросов на выполнение задач и запрос на отмену определенной задачи. Контракт этого сервиса использует тот же контракт обратного вызова для обновления координатора задач через реализацию контракта.
На рис. 16 показана измененная реализация метода SendTaskToTaskExecutionNode, где координатор задач хранит экземпляры STaskInfo в _submissionTracker и вызывает операции сервиса Start узла выполнения задач.
Рис. 16. SendTaskToTaskExecutionNode и вспомогательные методы
internal static void SendTaskRequestToTaskExecutionNode(List<STaskInfo> staskInfos)
{
if (staskInfos.Count() == 0)
return;
var channel = new DuplexChannelFactory<ITaskExecutionNode>(
new InstanceContext(new CallbackHandler()),
new NetHttpBinding(), new EndpointAddress("http://.../"))
.CreateChannel();
try
{
var requestId = Guid.NewGuid().ToString();
var reqs = staskInfos.Select(s => AddRequestToTracker(
requestId,s, channel)).Where(s => s != null);
((IChannel)channel).Open();
channel.Start(reqs.ToList<STask>());
}
catch (CommunicationException ex)
{
foreach (var stask in staskInfos)
HandleClientUpdate(stask.ClientRequest.Id, STaskStatus.Faulted, ex);
}
}
private static STask AddRequestToTracker(string requestId,
STaskInfo info, ITaskExecutionNode channel)
{
info.ExecutionRequestId = requestId;
info.ExecutionRequestChannel = channel;
if (_submissionTracker.TryAdd(info.ClientRequest.Id, info))
return info.ClientRequest;
HandleClientUpdate(info.ClientRequest.Id,
STaskStatus.Faulted, new Exception("Failed to add "));
return null;
}
Заметьте, что метод SendTaskToTaskExecutionNode создает экземпляр обратного вызова для обработки результата выполнения задачи в узле:
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler : ITaskUpdateCallback
{
public void UpdateStatus(string id, STaskStatus status, string result)
{
CoordinatorContext.HandleClientUpdate(id, status, result);
}
}
CallbackHandler обрабатывает операцию обратного вызова, запуская метод HandleClientUpdate. Этот метод извлекает и удаляет соответствующий экземпляр STaskInfo из submitterTracker и выполняет обратный вызов клиента для обновления результатов. Кроме того, если это последний запрос в группе, он закрывает канал между координатором задач и узлом выполнения задач. На рис. 17 показана реализация метода HandleClientUpdate.
Рис. 17. HandleClientUpdate и вспомогательные методы
internal async static void HandleClientUpdate(
string staskId, STaskStatus status, object result)
{
STaskInfo info;
if (!_submissionTracker.TryGetValue(staskId, out info))
throw new Exception("Could not get task from the tracker");
try
{
await Task.Run(() => info.CallbackChannel.UpdateStatus(
info.ClientRequest.Id, status, result));
RemoveComplete(info.ClientRequest.Id);
}
catch(AggregateException ex)
{
// ...
}
}
private static void RemoveComplete(string staskId)
{
STaskInfo info;
if (!_submissionTracker.TryRemove(staskId, out info))
throw new Exception("Failed to be removed from the tracking collection");
if (_submissionTracker.Values.Where((t) =>
t.ExecutionRequestId == info.ExecutionRequestId).Count() == 0)
CloseTaskRequestChannel((IChannel)info.ExecutionRequestChannel);
}
private static void CloseTaskRequestChannel(IChannel channel)
{
if (channel != null && channel.State != CommunicationState.Faulted)
channel.Close();
}
Компонент реализации задачи
В клиентском коде typeName — один из обязательных параметров при добавлении запросов. Это значение в конечном счете попадает в узел выполнения задач. Значением typeName является имя типа реализации интерфейса, который предоставляет делегат функции, инкапсулирующей функциональность для выполнения задачи как параллельной и находящейся во всех узлах выполнения задач. Я назову этот интерфейс IRunnableTask. Компоненты, реализующие этот интерфейс, должны ожидать получения в качестве параметров маркера отмены (cancellation token) и массива параметров от клиентов. Делегат также должен возвращать результат задачи. Вот этот интерфейс:
public interface IRunnableTask
{
Func<Object> Run(CancellationToken ct, params string[] taskArgs );
}
Запуск задачи в узле выполнения задач
На высоком уровне узел выполнения задач отвечает за «преобразование» запроса на выполнение задачи в реальную задачу, которую может выполнить TPL, т. е. за запуск TPL-задачи. На рис. 18 показана реализация этого процесса.
Рис. 18. Запуск задачи
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskExecutionNodeHandler : ITaskExecutionNode
{
public void Start(List<STask> stasks)
{
var callback = OperationContext.Current.GetCallbackChannel<
ITaskUpdateCallback>();
foreach (var t in stasks)
TaskExecutionContext.Start(t,callback);
}
...
}
public static class TaskExecutionContext
{
...
internal static void Start(STask stask, ITaskUpdateCallback callback)
{
try
{
// Этап 1a
var rtasks = CompositionUtil.ContainerInstance.GetExports<IRunnableTask>();
// Этап 1б
var rtask = from t in rtasks
where t.Value.GetType().FullName == stask.STaskTypeName
select t.Value;
// Этап 2
var cs = new CancellationTokenSource();
var ct = cs.Token;
TaskExecutionContext._cancellationSources.TryAdd(stask.Id, cs);
// Этап 3
Task<Object>
.Run(rtask.First().Run(ct, stask.STaskParameters), ct)
.ContinueWith(tes => UpdateStatus(tes, stask, callback));
}
catch (Exception ex)
{
...
}
}
...
}
Этапы 1a и 1б На этой стадии узел выполнения задач должен создать экземпляр IRunnableTask, который будет возвращать делегат, выполняемый как задача запрошенного клиентом типа. С этой целью я использую MEF и новую функциональность в .NET Framework 4.5, которая позволяет применять подход к конфигурированию без атрибутов. Код на рис. 19 создает единственный экземпляр контейнера, который экспортирует все реализации IRunnableTask, находящиеся в каталоге extensions. Подробнее о MEF и подходе к конфигурированию без атрибутов см. в статье «An Attribute-Free Approach to Configuring MEF» за июнь 2012 г. (msdn.microsoft.com/magazine/jj133818).
Рис. 19. Создание контейнера
internal static class CompositionUtil
{
private readonly static Lazy<CompositionContainer>
_container = new Lazy<CompositionContainer>(() =>
{
var builder = new RegistrationBuilder();
builder.ForTypesDerivedFrom<IRunnableTask>()
.Export<IRunnableTask>()
.SetCreationPolicy(CreationPolicy.NonShared);
var cat = new DirectoryCatalog("extensions", builder);
return new CompositionContainer(cat, true, null);
}
,true);
internal static CompositionContainer ContainerInstance
{
get { return _container.Value; }
}
}
Теперь вернемся к коду на рис. 18. В этом коде используется контейнер, чтобы получить экспорты типа IRunnableTask, а затем выбирается экземпляр с именем типа, подходящим к запросу клиента. Заметьте, что здесь я делаю ключевое допущение о том, что существует всего один экземпляр задачи, соответствующий типу, запрошенному клиентом. По этой причине я использую первый экземпляр, возвращаемый LINQ-запросом.
Этап 2 Прежде чем создавать TPL-задачу, код создает Cancellation Token Source и Cancellation Token. Я отслеживаю Cancellation Source в единственном экземпляре ConcurrentDictionary<TKey,TValue>. Узел выполнения задач будет использовать этот список источников отмены (cancellation sources), когда клиент запросит отмену. Вот определение этого экземпляра:
public static class TaskExecutionContext
{
...
private readonly static ConcurrentDictionary<string,
CancellationTokenSource> _cancellationSources =
new ConcurrentDictionary<string, CancellationTokenSource>();
...
}
Этап 3 Здесь я выполняю задачу с только что созданным маркером отмены. За этой задачей следует задача продолжения (continuation task). Необходимость в задаче продолжения возникает из-за того, что требуется обновлять координатор задач результатом выполнения при завершении обработки TPL-задачи (успешном или неудачном) и для этого вызывается соответствующая операция сервиса. Как показано на рис. 20, я инкапсулирую процесс обновления координатора задач в делегате, который принимает в качестве параметров TPL-задачу, запрос на выполнение задачи и экземпляр обратного вызова, адресованный координатору.
Рис. 20. Инкапсуляция процесса обновления
private static Action<Task<Object>, STask, ITaskUpdateCallback>
UpdateStatus = (t, st, cb) =>
{
try
{
STaskStatus s;
Object r = null;
switch (t.Status)
{
case TaskStatus.Canceled: s = STaskStatus.Canceled;
break;
case TaskStatus.Faulted:
s = STaskStatus.Faulted;
r = t.Exception.Flatten();
break;
case TaskStatus.RanToCompletion:
s = STaskStatus.Completed;
r = t.Result;
break;
default:
s = STaskStatus.Faulted;
r = new Exception("Invalid Status");
break;
}
CancellationTokenSource cs;
TaskExecutionContext._cancellationSources.TryRemove(st.Id, out cs);
cb.UpdateStatus(st.Id, s, r);
}
catch (Exception ex)
{
// Обработка ошибок
}
};
Запрос отмены и его обработка
TPL предоставляет механизм для реализации отмены задачи. С этой целью делегат, инкапсулирующий процесс, который выполняется как TPL-задача, должен реагировать на запрос отмены и прекращать выполнение. Подробнее об отмене задач см. статью «Task Cancellation» в MSDN Library по ссылке bit.ly/NYVTO0.
Один из параметров в интерфейсе IRunnableTask — маркер отмены. Узел выполнения задач создает такой маркер для каждой задачи, а далее компонент, реализующий этот интерфейс, должен определять, когда следует делать проверку на запрос отмены, и корректно завершать процесс. В коде на рис. 21 показана простая задача, которая вычисляет количество четных чисел в некоем диапазоне, параллельно проверяя, не была ли запрошена отмена.
Рис. 21. Проверка на запрос отмены
public class MySimpleCTask : IRunnableTask
{
public Func<Object> Run(Nullable<CancellationToken>
ct, params string[] taskArgs)
{
var j = int.Parse(taskArgs[0]);
var z = 0;
return (() =>
{
for (int i = 0; i < j; i++)
{
if (i % 2 != 0)
{
z++;
ct.Value.ThrowIfCancellationRequested();
}
}
return z;
});
}
}
Как вы видели при обсуждении клиента, можно добавить запрос с маркером отмены, и клиент на внутреннем уровне выполнит необходимую подписку. При отмене соответствующий запрос посылается координатору задач. Приняв этот запрос, координатор проверяет, был ли этот запрос адресован узлу выполнения задач, и пересылает запрос отмены. Узел выполнения задач ищет источник отмены, который соответствует задаче, запрошенной по идентификатору клиента. Передать запрос отмены в узел выполнения задач относительно несложно — вам нужно просто найти канал, соответствующий запросу, по которому координатор изначально передавал запрос на выполнение задачи. Эти каналы должны сохраняться открытыми для обратных вызовов, обновляющих состояние запроса на выполнение.
Реализация этих операций сервиса в координаторе задач представлена на рис. 22.
Рис. 22. Реализация операций сервиса в координаторе задач
public class TaskCoordinatorService : ITaskCoordinator
{
...
public bool CancelTask(string Id)
{
return CoordinatorContext.CancelTask(Id);
}
...}
public static class CoordinatorContext
{
...
internal static bool CancelTask(string Id)
{
STaskInfo info;
if(_submissionTracker.TryGetValue(
Id, out info) && info.ExecutionRequestChannel != null)
{
info.ExecutionRequestChannel.Cancel(Id);
return true;
}
return false;
}
...
}
Наконец, на рис. 23 показана реализация операций сервиса в узлах выполнения задач.
Рис. 23. Реализация операций сервиса в узлах выполнения задач
class CancellationHandler : ICancellationHandler
{
public void Cancel(STask stask)
{
TaskExecutionContext.CanceTask(stask.Id);
}
}
public static class TaskExecutionContext
{
...
internal static void CancelTask(string Id)
{
CancellationTokenSource tknSrc;
if (_cancellationSources.TryGetValue(Id, out tknSrc))
tknSrc.Cancel(); }
...
}
Масштабируемость координатора задач и другие соображения
Стоит отметить, что эта реализация предполагает, что координатор задач выполняется на одном узле, но вполне возможно горизонтальное масштабирование этого координатора. Это потребует, как минимум, следующих изменений:
- нужно будет ввести компонент балансировки нагрузки для доступа к координатору задач;
- как упоминалось, ключ к подходу с регулированием — наличие точных счетчиков количества выполняемых задач (как в целом, так и по типам). В сценарии, где в роли координаторов задач выступает более чем один узел, эти счетчики потребуется поддерживать централизованно (например, в базе данных), в то же время сохраняя возможность их обновления или чтения в синхронном режиме (чтобы избежать конкуренции, взаимоблокировок и т. д.).
Наконец, позвольте заметить, что, как и при любом другом подходе к разработке, вы должны взвесить все риски и выгоды с учетом альтернатив. Так, возможно, вы захотите присмотреться к таким технологиям, как сервер Microsoft HPC, — это вполне приемлемое решение для многих сценариев.
Оптимизация ресурсов
TPL предоставляет необходимую инфраструктуру для достижения наиболее оптимального использования процессорных ресурсов на компьютере с одним многоядерным процессором, и она также полезна при реализации подхода, при котором осуществляется масштабирование через границы компьютеров. Это удобно в сценариях с пакетной обработкой и автоматизацией распределения нагрузки, где распараллеливание требуется не только на одном многоядерном сервере, но и между несколькими серверами.
Ключ к подходу с регулированием — наличие точных счетчиков количества выполняемых задач (как в целом, так и по типам).
Для достижения этого горизонтального масштабирования нужно учесть несколько соображений по архитектуре. Наиболее важное из них — необходимость в распределении нагрузки по существующим ресурсам с сохранением возможности добавлять дополнительные ресурсы в имеющуюся ферму и регулировать ресурсы в соответствии с семантикой задач, которые вам требуется выполнять. Технологии разработки и инструментарий Microsoft предоставляют вам нужные строительные блоки для реализации архитектуры, учитывающей эти важнейшие вопросы.