Маршрутизация потоков в DataFlow по атрибутам

В статье Распределение потоков в DataFlow по успешному и неуспешному пути (Success, Failure) рассматривался вопрос распределения потоков в Apache NiFi в зависимости от того, успешно ли был обработан поток в процессоре или нет. В сегодняшней статье рассмотрим распределение (маршрутизацию) потоков в зависимости от атрибутов.

Задача. На вход поступает архив в формате zip. Требуется:
1) Распаковать архив и извлечь из него файлы;
2) Текстовые фaйлы (*.txt) скопировать в папку text, dat-файлы (*.dat) скопировать в папку dat. Все остальные типы файлы скопировать в папку Unmatched.

Для создания DataFlow, который решит проблему:

  1. Воспользуемся той же файловой структурой что и в упомянутой выше статье. Процессор GetFile настроим на папку /tmp/data/input.
  2. Сформируем zip-архив sample.zip в который запакуем файлы с расширением txtdat и, например, doc.
  3. Для распаковки архива добавим процессор UnpackContent. В настройках в блоке «Automatically Terminate Relationships» включить чек бокс Original. В нашем случае настройка означает, что оригинальный zip-файл после распаковки не пойдет дальше по потоку.

    Apache NiFi UnpackContent Settings
  4. В разделе «Properties» настроек процессора UnpackContent указать, что тип архива будет zip.

    Apache NiFi UnpackContent Properties
  5. Добавим процессор RouteOnAttribute. В разделе «Properties» добавим свойства для текстового и dat-файла и укажем стратегию маршрутизации по имени атрибута. Так же добавим атрибуты datFile и textFile и условия поиска для них.

    Apache NiFi RouteOnAttribute Properties
  6. Добавим три процессора PutFile. Они будут настроены на папки /output/text/output/dat и /output/unmatched соответственно. Чтобы было понятно на схеме какой процессор к какой папке относится, назовем процессоры PutFile как For Dat-filesFor Text-files и Unmatched. Соединим процессор с процессорами PutFile и укажем отношения для каждого типа файлов. Например, для dat-файлов будет включен чек-бокс datFile.

    Apache NiFi Процессоры PutFile
  7. После соединения всех процессоров получилась завершенная схема.

    Apache NiFi DataFlow
  8. Запустим все процессоры в получившимся DataFlow и скопируем созданный на шаге 2 архив sample.zip в папку /tmp/data/input. Если поток настроен правильно, по описанным выше шагам, то все успешно отработает, в папках /output/text/output/dat и /output/unmatched появятся файлы, которые были упакованы в архив, с расширениями, соответствующими папке.

Если немного усложнить задачу добавив условие, например, что в архиве может быть упакован другой zip-архив, то потребуется внести незначительные изменения в поток.

  1. В процессор RouteOnAttribute добавить свойство zipFile и задать соответствующее условие поиска для него.

    Apache NiFi RouteOnAttribute zip
  2. Соединение zipFile подключить к процессору UnpackContent, таким образом вложенный файл тоже будет распаковываться и идти дальше по потоку данных.

    Apache NiFi UnpackContent zip

При необходимости с помощью процессора IdentifyMimeType можно «научить» DataFlow определять тип архива (zip, gzip, bzip2, 7z и другие) и, в зависимости от архива, направлять его в соответствующий распаковщик.

При копировании материалов ссылка на сайт обязательна. Вопросы, замечания, предложения и комментарии к статьям можно направлять на адрес: info@datanetworks.ru