Распределение потоков в DataFlow по результатам выполнения операций (Success, Failure)

Apache NiFi позволяет распределять потоки в зависимости от того, успешно ли был обработан поток в процессоре или нет. Продемонстрируем на простом примере потока, который мы рассматривали в статье Создание простого DataFlow в NiFi. Напомним, у нас было два процессора GetFilePutFile и Output Port, который мы назвали FlowEnd.

Модифицируем поток таким образом, чтобы вместо Output Port у нас был еще один PutFile. Смысл эксперимента в том, что в случае успешного сохранения файла в папку /tmp/data/output поток будет прекращать свою работу. А в случае ошибки поток будет продолжать работу и сохранит файл в следующем процессоре PutFile (чтобы не путаться назовем его ErrorLog). Для последнего процессора создадим папку /tmp/data/errlog, туда будут попадать все файлы, которые по каким то причинам не сохранились в первом процессоре PutFile.

  1. Модифицируем DataFlow удалив Output Port и добавив еще один PutFile с именем ErrorLog.

    Apache NiFi DataFlow
  2. Настроим первый процессор PutFile таким образом, чтобы обработка прекращалась в случае успешного прохождения потока (успешного сохранения файла в /tmp/data/output).

    Настройка процессора PutFile (Automatically terminate relationships - Success)
  3. Кроме того, для того чтобы эксперимент удался, укажем процессору чтобы не создавал папку /tmp/data/output в случае ее отсутствия.

    Настройка процессора PutFile (Create missing directories - False)
  4. Соединим процессор PutFile c процессором ErrorLog. В настройках соединения укажем, что по нему будут проходить только ошибочные потоки, то есть файлы, которые по каким то причинам не обработались в предыдущем процессоре.

    Настройка соединения между двумя процессорами PutFile
  5. Настроим процессор ErrorLog таким образом, чтобы он сохранял файлы в папку /tmp/data/errlog.

    Настройка процессора PutFile (ErrorLog)
  6. Чтобы не добавлять Output Port укажем, что поток заканчивается в процессоре ErrorLog.

    Настройка процессора PutFile (Automatically terminate relationships - Success,Failure)
  7. Запускаем поток (необходимо запустить все процессоры потока).
  8. Скопируем любой файл в папку /tmp/data/input и убедимся, что через DataFlow этот файл будет помещен в папку /tmp/data/output. Поток на этом закончится и в следующий процессор ErrorLog файл не уйдет.
  9. Чтобы в процессоре PutFile произошла ошибка, в файловой системе удалим папку /tmp/data/output и еще раз скопируем любой файл в папку /tmp/data/input.
  10. Убедимся, что через DataFlow этот файл будет помещен в папку /tmp/data/errlog.

Вышеописанный пример показывает как можно распределить потоки в DataFlow в зависимости от успешности или не успешности выполнения обработки данных в процессоре. Кроме успешности, есть возможность распределять потоки в зависимости от условий, заданных в процессорах. Об этом будет рассказано в других статьях.

При копировании материалов ссылка на сайт обязательна. Вопросы, замечания, предложения и комментарии к статьям можно направлять на адрес: info@datanetworks.ru