Сегодня посмотрим на Apache NiFi с точки зрения разработчика Data Flow и разберем ключевые нюансы обработки ошибок, генерации исключений и лучшие практики работы с ними для практических задач дата-инженерии.
Исключения процессоров Apache NiFi
При том, что Apache NiFi имеет множество готовых процессоров для подключения ко внешним источникам или приемникам данных, а также внутренней обработки Flow Files, иногда дата-инженеру нужен собственный обработчик. В этом случае приходится создавать собственный процессор, что мы рассматривали здесь и здесь. Однако, при разработке своего процессора могут возникнуть ошибки, исключения и другие неожиданные ситуации. Например, внутренние исключения процессора во время выполнения метода onTrigger() могут случиться по одной из следующих причин:
- входные данные пришли не в том формате, который ожидался;
- неудачное завершение сетевых подключений к внешним службам;
- сбой чтения или записи данных на диск;
- ошибка в самом процессоре или зависимой библиотеке.
Любое из этих условий может привести к возникновению исключения из процессора. Если генерируется исключение ProcessException, NiFi предполагает, что это сбой, результат которого известен, а повторная попытка обработки данных позже может быть успешной. В результате фреймворк откатит обрабатываемый сеанс и обрабатываемые Flow Files.
Но, если какое-то другое исключение вышло за пределы процессора, NiFi считает предполагает, что это сбой, который не был учтен разработчиком. В этом случае платформа также откатит сеанс и обрабатываемые Flow Files. При этом процессор может постоянно работать, истощая системные ресурсы, но фактически не выполняя никакой полезной работы. Это часто случается, когда исключение NullPointerException генерируется постоянно. Чтобы избежать этого, при исключении, отличном от ProcessException, инфраструктура административно уступает процессору, не позволяя ему запускаться снова в течение времени, заданного в файле nifi.properties и по умолчанию равного 10 секунд.
На практике чаще всего, когда в процессоре возникает исключение, оно происходит из обратного вызова InputStreamCallback, OutputStreamCallback или StreamCallback во время обработки содержимого Flow File. Обратные вызовы могут вызывать исключение RuntimeException или IOException. В случае RuntimeException это исключение будет передаваться обратно методу onTrigger(). Исключение IOException заключается в ProcessException, которое затем выбрасывается фреймворком. Поэтому процессорам, использующим обратные вызовы, рекомендуется делать это в блоке try/catch и перехватывать ProcessException, а также любое исключение RuntimeException, которое может вызвать их обратный вызов. Но не рекомендуется, чтобы процессоры перехватывали общие случаи Exception или Throwable по следующим причинам:
- если выдается неожиданное исключение RuntimeException, это обычно вызывается ошибкой, откат сеанса, разрешенный фреймворку, гарантирует отсутствие потери данных и то, что диспетчеры потоков данных смогут обработать их, сохраняя данные в очереди в место;
- когда исключение IOException вызывается из обратного вызова, оно может выброситься из кода процессора, например, когда входные данные пришли не в том формате или случился сбой сетевого подключения. Или же исключение вызвано репозиторием контента (Content Repository), где хранится содержимое Flow File. NiFi поймает это исключение IOException и завернет его в исключение FlowFileAccessException, которое расширяет RuntimeException. Это сделано явным образом, чтобы обойти метод процессора onTrigger(), и фреймворк мог соответствующим образом обработать это условие через перехват общего исключения.
Ошибки во время обработки
Когда проблема возникает во время обработки, Apache NiFi предоставляет два метода, позволяющих разработчикам процессоров избежать выполнения ненужной работы: penalization (штраф) и yielding (уступка). Разработчик может применить штраф к Flow File, вызвав метод penalize(FlowFile) класса ProcessSession. Это приводит к тому, что сам Flow File становится недоступным для нижестоящих процессоров в течение периода времени, заданного диспетчером потока данных в параметре Penalty Duration в диалоговом окне конфигурации процессора. По умолчанию значение этого параметра равно 30 секунд. Это полезно, когда процессор определяет, что данные не могут быть обработаны из-за причин, которые разрешатся сами собой. Например, процессор PutSFTP, который штрафует Flow File, направляя его в отношение сбой, если файл с таким же именем уже существует на сервере SFTP. Затем DataFlow Manager может перенаправить ошибку обратно на тот же процессор PutSFTP, не пытаясь снова отправить файл в течение заданного периода, и обрабатывая в это время другие Flow Files.
Метод уступки позволяет разработчику процессора указать платформе, что она не сможет выполнять какие-либо полезные функции в течение некоторого периода времени. Это обычно происходит с процессором, который взаимодействует с удаленным ресурсом. Если Процессор не может подключиться к удаленному ресурсу или если ожидается, что удаленный ресурс предоставит данные, но их нет, процессор должен вызвать метод yield() для объекта ProcessContext, а затем вернуться. Делая это, процессор сообщает платформе, что он не должен тратить ресурсы на запуск этого процессора, вместо этого выделив их другим.
Наконец, ProcessSession – это не просто механизм доступа к Flow Files, он также реализует транзакционность, выполняя все свои методы как транзакции. Завершить транзакцию можно, вызвав commit() или rollback(). Обычно это обрабатывается классом AbstractProcessor: если метод onTrigger() генерирует исключение, AbstractProcessor перехватывает его, вызывает session.rollback(), а затем повторно генерирует исключение. Иначе AbstractProcessor вызовет метод commit() для ProcessSession.
Если дата-инженеру нужно явно откатить сеанс, это можно сделать в любое время, вызвав метод rollback() или rollback(boolean), когда логическое значение указывает, должны ли быть оштрафованы те Flow Files, которые были извлечены из очередей через методы получения ProcessSession, перед добавлением обратно в свои очереди. При вызове отката любые изменения, произошедшие с потоковыми файлами в этом сеансе, отбрасываются, включая изменения содержимого и атрибутов. Кроме того, откатываются все события происхождения, кроме событий SEND, которые были созданы путем передачи значения true для аргумента force. Потоковые файлы, извлеченные из входных очередей, передаются обратно и могут быть оштрафованы, чтобы их можно было снова обработать.
При вызове метода commit() новое состояние потокового файла сохраняется в репозитории Flow File, а любые произошедшие события – в репозитории Provenance. Предыдущее содержимое при этом уничтожается, если другой Flow File не ссылается на тот же фрагмент содержимого, а потоковые файлы передаются в исходящие очереди, чтобы следующие процессоры могли работать с данными.
Когда процессор использует аннотацию org.apache.nifi.annotation.behavior.SupportsBatching, вызовы commit() могут сработать не сразу. Обычно эти фиксации объединяются в пакеты, чтобы повысить пропускную способность NiFi. Но если в какой-то момент процессор откатит ProcessSession, все изменения с момента последнего вызова фиксации будут отменены, а все пакетные фиксации вступят в силу и их невозможно будет откатить. О тонкостях тестирования процессоров и потоков данных читайте в нашей новой статье.
Узнайте больше подробностей по администрированию и эксплуатации Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: