Для моих коллег не секрет, что я довольно равнодушен к Windows Azure Service Bus. Однако с появлением Windows Azure AppFabric CTP June Update компания Microsoft наконец добавила достаточно функциональности, чтобы Service Bus превратилась из того, что я считаю не более чем заготовкой, в действительно полезную технологию. Для моих задач важной частью технологии обмена сообщениями, предлагаемой теперь AppFabric Service Bus, является Topics — богатая функциональность публикации и подписки. В этой статье я сосредоточусь на Topics и, как я часто это делаю, использую свой опыт работы с предприятиями розничной торговли, чтобы посмотреть, как эта технология позволяет упростить проверку наличия товаров между магазинами.
Вы никогда не сталкивались с тем, что при покупке какого-то товара вдруг обнаруживалось, что последний экземпляр только что продан или упаковка повреждена? Когда возникает такая ситуация, продавец зачастую обращается к POS-системе и проверяет наличие этого товара в ближайших магазинах. Как правило, такая проверка осуществляется по записям учета товаров, которые хранятся в централизованной базе данных или в какой-то разновидности ERP-системы, а продавец обычно вводит номера магазинов, руководствуясь своим знанием ближайших точек. Нередко эти данные являются устаревшими, так как обновляются только в конце рабочего дня, когда журналы транзакций и прочие данные отправляются в корпоративную систему и обрабатываются в ней.
Более оптимальный вариант выглядел бы так. Магазин в любой момент мог бы выдать широковещательный запрос на наличие товара, а близлежащие магазины, располагающие этим товаром, реагировали бы на него, сообщая о его наличии у них. Именно такую систему я и собираюсь построить, используя Windows Azure AppFabric Service Bus, как показано на рис. 1.
Рис. 1. Сообщение проверки на наличие товара
Topics предоставляют надежный механизм, позволяющий передавать контент вплоть до 2000 подписчикам на каждый Topic. Это ограничение в количестве подписчиков вызывает сожаление, так как потенциально заставляет придумывать в архитектуре решения (наподобие той, которую я опишу) обходные пути, создавая тем или иным способом сегменты в Topic. Например, вместо U.S. Inventory Check Topic, который подписчики фильтруют по региону, мне пришлось бы создать SouthCentral U.S. Inventory Check Topic, а затем дополнительно отфильтровать его по конкретному местному подразделению. Помня об этом подвохе, я продолжу работать со своим единственным Inventory Check Topic, так как могу пообещать, что не стану использовать слишком много подписчиков.
Передача сообщений
Вы можете скачать Windows Azure AppFabric CTP June Update по ссылке bit.ly/p3DhAU,а портал управления находится по адресу portal.appfabriclabs.com. Я буду обращаться к этому порталу управления, чтобы получать кое-какую информацию, необходимую в моем коде (рис. 2).
Рис. 2. Получение информации от портала управления Windows Azure AppFabric
Мне нужно получить Service Gateway, Default Issuer (в CTP это всегда «owner») и Default Key. Так как эта информация потребуется в разных частях кода в моем примере, я создам их на уровне объекта:
private string sbNamespace = "jofultz";
private string issuerName = "owner";
private string issuerKey =
"25vtsCkUPBtB39RdiBbUtCQglYRPOdHHC4MvX5fvxxxx";
Я буду использовать эти переменные в своих функциях, чтобы создать Topic, а затем асинхронно посылать сообщения в этот Topic. Для желающих и тех, кто имеет дело с платформой, не позволяющей использовать клиентскую библиотеку, предусмотрен REST-интерфейс. Поскольку я создаю Windows-клиент, я задействую библиотеку. Для этого надо добавить ссылки на Microsoft.ServiceBus, Microsoft.ServiceBus.Message и System.ServiceModel, как показано на рис. 3.
Рис. 3. Добавление ссылок
Теперь для подготовки Topic потребуется всего несколько строк кода:
SharedSecretCredential credential =
TransportClientCredentialBase.CreateSharedSecretCredential(
issuerName, issuerKey);
Uri sbUri = ServiceBusEnvironment.CreateServiceUri(
"sb", sbNamespace, String.Empty);
ServiceBusNamespaceClient sbNSClient =
new ServiceBusNamespaceClient(sbUri, credential);
Topic newTopic = sbNSClient.CreateTopic(topicName);
Когда я начну посылать сообщения, важно, чтобы магазины могли фильтровать их и отбирать только релевантные для своего региона. Этот API полноценно поддерживает идентификаторы корреляции и подписки, но для фильтрации мне нужно использовать свое знание данных и фильтр, основанный на содержимом запроса. Таким образом, мне нужен поиск территориально близких магазинов, и они должны реагировать на запросы друг к другу на наличие товаров. В то же время необходимо обеспечить, чтобы изначальный магазин не обрабатывал свои же запросы. В запросы будет включаться SKU (артикул) искомого элемента (товара), регион, в котором был выдан запрос, и RequesterID, чтобы магазин-инициатор мог отфильтровывать собственные запросы по данному Topic:
[Serializable]
class InventoryQueryData
{
public string Region;
public string ResponderID;
public string RequesterID;
public string Sku;
public int Count;
}
Обратите внимание на атрибут [Serializable], добавленный к классу. Вы могли бы помечать свойства как DataMember и использовать DataContract, но смысл в том, что любой тип, передаваемый в Topic, должен быть сериализуемым.
Я создал простую форму, которая позволяет вводить произвольную строку в качестве SKU и выбирать из списка магазин — инициатор запроса. Код, обрабатывающий кнопку запроса, аналогичен коду для подключения и создания Topic и напоминает типичные конструкции при работе с API обмена сообщениями, как показано на рис. 4.
Рис. 4. Получение значений данных
// Присваивание значений данных
InventoryQueryData data = new InventoryQueryData();
data.Sku = txtSKU.Text;
data.StoreID = cboStore.SelectedText;
data.Region = cboStore.SelectedValue.ToString();
Uri sbUri = ServiceBusEnvironment.CreateServiceUri("sb",
sbNamespace, string.Empty);
SharedSecretCredential credential =
TransportClientCredentialBase.CreateSharedSecretCredential(
issuerName, issuerKey);
MessagingFactory msgFactory = MessagingFactory.Create(
sbUri, credential);
TopicClient topicClient =
msgFactory.CreateTopicClient(topicName);
MessageSender MsgSender = topicClient.CreateSender();
BrokeredMessage msg = BrokeredMessage.CreateMessage(data);
// Добавляем свойства в сообщение для фильтрации
msg.Properties["Region"] = data.Region;
msg.Properties["RequesterID"] = data.RequesterID;
msg.TimeToLive = TimeSpan.FromSeconds(60);
MsgSender.Send(data);
Здесь стоит отметить два момента. Во-первых, я добавил в набор BrokeredMessage.Properties пары «имя-значение», необходимые для фильтрации. Во-вторых, с учетом исполняющей среды я присвоил TimeToLive (TTL) значение, равное 60 секундам. Конечно, вам наверняка потребуется более обоснованное решение для выбора значения TTL, но я считаю, что, если запрос за это время не достигает ни одного подписчика, то, вероятно, больше незачем заставлять клиента ждать ответа. Кроме того, это просто пример.
Любое сообщение, переданное по шине, является формой BrokeredMessage с методом фабрики CreateMessage. Он просто обертывает данные в экземпляр типа BrokeredMessage, который содержит все конструкции, необходимые для полнофункциональной системы обмена сообщениями.
Благодаря этому у меня есть все, что нужно для выдачи сообщения подписчикам Inventory Check Topic, поэтому теперь я займусь подготовкой клиентов подписки для выборки сообщений и ответа.
Прослушивание потока Topic и реагирование
Подготовив клиент, можно посылать поток запросов, но прямо сейчас это будет несколько потруднее простой отправки битов в эфир. Я создал приложение Windows Form и повторно использовал XML-список магазинов и InventoryQueryData из первого приложения (отправителя). Мне понадобится подписка с уникальным именем для каждого клиента, который будет прослушивать данный Topic. Для этого достаточно скомбинировать имя подписки с номером магазина, запросы от которого я хочу получать. Мое небольшое тестовое приложение позволяет выбрать номер магазина из поля со списком (combobox), и я просто добавляю это значение к имени базового Subscription, чтобы создать уникальное имя. Важно обеспечить, чтобы у каждого клиента была уникальная подписка; иначе возникнет конкуренция за сообщение между подписчиками и победит тот, кто первым примет его и удалит:
Topic invTopic = sbNSClient.GetTopic(topicName);
SubscriptionName = "InventoryQuerySubscription" +
this.cboStore.Text;
SqlFilterExpression RegionFilter =
new SqlFilterExpression("Region = '" +
cboStore.SelectedValue + "' AND RequesterID <> '" +
cboStore.Text + "'");
Subscription sub = invTopic.AddSubscription(SubscriptionName,
RegionFilter);
Добавляя Subscription в Topic, я также могу передать фильтр, чтобы каждый магазин получал только запросы на наличие товара в своем регионе. Заметьте, что вы можете создать свой тип FilterExpression из базового типа, но API уже включает четыре типа, которые охватывают большую часть сценариев, особенно если используются совместно: CorrelationFilterExpression, MatchAllFilterExpression, MatchNoneFilterExpression и SqlFilterExpression. Я задействовал SqlFilterExpression, который позволяет легко написать выражение, чтобы получать сообщения, предназначенные, например, для региона Техаса, и исключать сообщения, исходящие из моего магазина:
"Region = '[Region]' AND RequesterID <> '[StoreID]'"
Мне нужно лишь фильтровать запросы, но в некоторых случаях теоретически я мог бы «подправлять» некоторые данные «на лету», используя RuleDescription, чтобы скомбинировать, скажем, SqlFilterExpression с SqlFilterAction. Первый идентифицирует сообщение-мишень, а второй определяет операцию, которую нужно выполнить над сообщением. Такая функциональность может оказаться полезной, когда передаваемые данные нужно преобразовывать в нечто понятное получателю, но изменять что-либо на обеих сторонах нельзя.
После настройки подписки, она останется даже после того, как клиент будет закрыт. В данном сценарии такое поведение идеально; я просто буду создавать SubscriptionClient каждый раз, когда буду начинать мониторинг, и он будет подключаться к существующему соединению. Однако это поведение не всем придется по вкусу. Можно легко представить ситуации, в которых при закрытии клиента желательно удалять подписку:
SubscriptionClient subClient = msgFactory.
CreateSubscriptionClient(topicName, SubscriptionName);
MessageReceiver msgReceiver = subClient.CreateReceiver(
ReceiveMode.ReceiveAndDelete);
msgReceiver.Open();
Заметьте, что в вызове CreateReceiver я установил ReceiveMode в ReceiveAndDelete. Вы также могли бы использовать PeekLock для ReceiveMode. Здесь я просто хочу получить сообщение и обработать его; мне не нужно гарантировать прием и обработку сообщения до его удаления: пропадет — невелика потеря. Но, если бы мне требовались гарантии и надежность, я скорее всего сделал бы две вещи. Я не стал бы задавать TTL для Message, Topic или Subscription и тем самым позволил бы существовать сообщениям неопределенно долго. Либо присвоил бы очень высокое значение TTL, чтобы получатель заведомо успевал бы обработать сообщение или переместить его в очередь исключений; в этом случае в очередь «невостребованных писем» попадали бы лишь те сообщения, которые действительно нельзя доставить. Кроме того, я включил бы на стороне получателя режим PeekLock для чтения и обработки данных с удалением сообщения только по окончании этого процесса. Самостоятельное формирование поведения в стиле распределенных транзакций может быстро привести к другим проблемам, но об этом мы поговорим как-нибудь в другой раз.
Открыв получатель, я вхожу в цикл, где проверяется наличие сообщений. В API есть метод Receive, напрямую возвращающий BrokeredMessage. Однако я воспользуюсь методом TryReceive, который возвращает булево значение, уведомляющее об успехе или неудаче (рис. 5). В этот метод я передаю сравнительно короткий интервал ожидания, но вполне достаточный для проверки и приема сообщения. Если сообщение принято, я обрабатываю его и сразу же проверяю, не поступило ли другое сообщение. Если другого сообщения нет, я «усыпляю» поток на некоторое время, а потом повторяю проверку.
Рис. 5. Проверка наличия сообщений
while (MonitorOn)
{
BrokeredMessage NewMsg;
bool recvSuccess = msgReceiver.TryReceive(
TimeSpan.FromSeconds(3), out NewMsg);
if (recvSuccess)
{
this.lbMessages.Items.Add(NewMsg.MessageId);
InventoryQueryData RequestObject =
NewMsg.GetBody<CreateInquiry.InventoryQueryData>();
ProcessMessage(RequestObject);
}
else
{
System.Threading.Thread.Sleep(3000);
}
}
Мой объект запроса содержится в BrokeredMessage, и, чтобы получить его, я вызываю GetBody<T>, передавая тип объекта. На обеих сторонах шины типы объектов должны соответствовать друг другу. Для этого можно использовать прокси-типы или передавать объект как строку и обрабатывать ее как XML.
До следующего раза…
В этой статье я продемонстрировал средства, позволяющие не только широковещательно передавать сообщение, но и фильтровать поток сообщений так, как это подходит для подписчика-получателя. В следующий раз мы рассмотрим использование очередей и корреляции, что позволит мне закончить текущий пример, а вам — получить базовое представление о новых средствах в Windows Azure AppFabric Service Bus. Кроме того, вы получите возможность скачать полный исходный код примера.