Наблюдение
Продолжаем разговор о том, что можно и нужно делать в НиФи, а что можно делать, но лучше не делать. Если вы пропустили первую часть разговора, то вот вам. Это сделано для улучшения читаемости схемы и повышения производительности (ну, почти). Здесь же мы поговорим о том, как следить за бизнес-частью системы, чтобы у всех все было хорошо (хорошо или неплохо), и немного о переносимости процессора. Идти!
Есть мнение, что самое страшное — вообще не следить за бизнес-частью программы, пользуясь популярным подходом «все будет хорошо!». Но если подумать, есть одна вещь хуже, чем отсутствие надзора, и это плохой надзор. Типичный “временный костыль”: добавить группу отключенных/остановленных процессоров, передать ей поток ошибок, отладить….и забыть. Что будет дальше? Да-да, очередь переполнилась, противодавление остановило процессор и “Все пропало, босс!!!”. Не делай так. Если мы это делаем, то делаем это хорошо. И как это справедливо? Не претендуя на Нобелевскую премию в области слежки, дам десяток непрошеных советов.
Рисунок 1. Настройка срока действия FlowFile
-
Мы устанавливаем тайм-аут FlowFile на «достаточно длительный» и получаем простой и удобный механизм онлайн-отладки с небольшим риском выключения всего. См. рис. 1.
-
Если говорить об очередях, то попробуем настроить мониторинг очередей быстро и просто самым простым способом. Выгружаем статус всех очередей, если есть обратка в какой-то из них, бежим выяснять, что именно сломано. Для передачи данных будем использовать задачу site-to-site report, интегрированную в NiFi, результат работы которой будем передавать себе через группу удаленной обработки и обрабатывать там — см. 2.

Рис. 2. Задача отчета Site-to-Site
Обратите внимание, что хотя :8080/nifi (Interface) указан в параметрах destinationUrl, он используется только для согласования соединения, сама передача осуществляется через порт 10000.

Рисунок 3. Фильтрация отчета с помощью queryrecord
Для анализа мы используем процессор QueryRecord (никакого split-evaluate-route!). И с результатом в виде [{“sourceId”:”cbffbfdc-017d-1000-9942-e797792a97a9″,”sourceName”:”GenerateFlowFile”}]указав что сломалось, можно уже работать.
3. Аналогично можно настроить SiteToSiteBulletinReportTask – он будет присылать нам ВСЕ сообщения об ошибках обработки (помните те красные квадратики в углу, которые вы никогда не успеваете сделать скриншот?) с атрибутами из файла потока. Это генерирует изрядное количество трафика на загруженных (скрученных) инстансах, но чем больше авторов потоков, тем больше причин исправлять это быстрее.
Но все это, конечно, «с высоты птичьего полета», то есть весь кластер. А хотелось бы обработку ошибок на уровне самой группы ЦП. В конце концов, прежде чем сообщать об ошибке, вы должны попытаться устранить ее на месте. Те. Тихо удалить сбойную ветку — это еще нехорошо, а сразу бить тревогу — не совсем правильно. Как будет правильно? См. пункт 4.
4. Тот же InvokeHTTP, помимо Failure, выдает отдельный результат Retry. Это так называемый «восстановимый» класс ошибок. Конечно, если сервер вам вернул 400 – “скрученный запрос”, то он не станет “прямым” сам по себе, и если ответ “50 (0|2|3)” через N секунд (перезагрузка сервис, запрос к другому экземпляру и т. д.), результат может быть совершенно другим. Проще (но не правильнее) очередь Retry намотать на сам ЦП, но в случае “сбоя загрузки” результат бесконечного количества обращений к сервису может быть далеко не желаемым. Вот тут-то и приходит на помощь процессор ControlRate — посмотрите на рис. 4.

Рисунок 4. Базовая обработка ошибок
Ждем N секунд и повторяем отправку N потоковых файлов. Чтобы избежать бесконечного цикла скрученных запросов, мы устанавливаем для очереди параметр “FlowFile Expiration”. Минимум накладных расходов + условно достойный результат. Почему “условно”? Потому что если выпало 100500 кривых запросов, возможен вариант, при котором вся очередь ошибок не успевает обработаться, учитывая время жизни потоковых файлов и задержки. Те. с MaximumRate=10, Schedule=10 sec и FlowFile Expiration=60 sec мы обработаем 60 ошибок, все остальное молча умрет. Конечно, эти настройки можно и нужно переносить, но при таком подходе заявленный риск будет всегда. Также в этой схеме мы никак не узнаем, что выполнение действия завершилось ошибкой. Если вам нужно более строгое лечение, см. пункт 5.

Рисунок 5. Более сложный вариант
5. В UpdateAttribute устанавливаем счетчик как ${counter:replaceNull(0):plus(1)}, в RouteOnAttribute условие ${counter:ge(5)}. На выходе получается фиксированное количество повторений для каждого stream-файла с генерацией аварийного сигнала в случае сбоя. Усложнение весьма существенное (чем меньше, тем лучше, не так ли?), но с учетом (обычно!) небольшого количества ошибок вполне оправдано. В случае большого потока, чтобы избежать обратного давления со стороны потока обработки ошибок, может быть целесообразно увеличить размер соответствующих очередей.
О, да. Важно: 100% покрытие обработки ошибок вообще нигде не требуется. Не обязательно делать 10 повторений по 10 секунд, если пользователь ждет ответа на свой запрос в течение таймаута по умолчанию 60 секунд 😊.
6. Еще одним важным механизмом является контроль работы при условии явного «отсутствия ошибок». Нередки ситуации типа «проблем нет, а данных тоже нет». Иногда косвенным признаком зависания процесса является рост очереди, но если, например, ConsumeKafka зависает при входе в группу, то это оказывается совсем нехорошо. Для решения этой проблемы на помощь приходит процессор MonitorActivity. Он может генерировать сообщение, если данные не поступают дольше, чем указано в пороговой продолжительности. На выходе цепляем ConsumeKafka параллельно основной ветке MonitorActivity с параметром минут 15, на выходе делаем logmessage|putemail примерно как на рис. 6, … и наслаждайтесь результатом.

Рисунок 6. Проверка входящих данных
Очевидно, что фактические пороги устанавливаются вашим потоком данных, и ложные срабатывания в этом случае неизбежны. Но это определенно лучше, чем ничего. В качестве альтернативы возможна реализация функции обнаружения сердцебиения интересующей службы. При этом вы должны понимать, что то, что сервис работает, и то, что у вас есть данные, это не одно и то же. Было бы неплохо совместить подходы и для ключевых схем настроить их примерно так, как на рис. 7.

Рисунок 7. Проверка службы Heartbeat
Мы рассылаем тестовые запросы по расписанию. В случае уникальных проблем – пишем сообщения в лог, если проблема “сама не разрешилась” – с помощью того же MonitorActivity отправляем сообщение администратору. В общем, если говорить о платформе ZIIoT, то это должен делать не NiFi, а «Мониторинг платформы». А вот, например, применительно к клиентским системам и/или при необходимости более высокого уровня мониторинга (делаем фактический запрос и проверяем не на наличие, а на корректность ответа) – c вполне вариант. При его реализации имеет смысл помнить, что этот способ несколько «инвазивный» и кривыми руками (например, без установки Scheduling для GenerateFlowFile) можно все сломать.
7. Итак, пока мы следим за состоянием всего экземпляра и разбираемся со всеми ошибками, которые имеет смысл обрабатывать. Остается сообщить, что лечение не помогло. Здесь у нас есть два штатных инструмента — logmessage и logattribute. Первый логически позволяет написать в журнале сообщение типа “Я упал!” с указанием уровня лога и префикса лога, что очень полезно для логов grep’at. Второй также позволяет отправлять в лог содержимое файла фида и набор атрибутов, заданный регулярным выражением. Его следует использовать с осторожностью — есть риск заспамить журнал крошек.
Помимо «встроенных» средств записи в лог для последующего анализа в ключевых местах, имеет смысл явно сообщать об ошибках администратору потока, например, с помощью обработчика PutEmail. По опыту, 100 500 писем примерно на 100 500 одинаковых ошибок никому не нужны, поэтому поможет ControlRate + FlowFile Expiration.
Кстати, помимо инструментов NiFi, в платформе ZIIoT есть еще один информационный канал — встроенный механизм zif-events. Если вдруг кого-то заинтересует, мы расскажем об этом подробнее. А пока я собираюсь написать третью часть саги о NiFi, в которой изложу свои мысли о том, как обеспечить переносимость процессоров, а также разберу несколько модельных решений для популярных задач на платформе ZIIoT.