Создание простого DataFlow в NiFi

В статье рассмотрим процесс создания простого потока данных, в терминах NiFi — Dataflow.

Словосочетание «поток данных» (DataFlow) имеет достаточно много значений, в контексте NiFi договоримся называть потоком данных цепочку, которая начинается с источника необработанных/непроанализированных данных и заканчивается получателем обработанных / проанализированных данных. Между источником и получателем в цепочке может находится набор обработчиков, называемых процессорами (процессоры являются ключевым элементом NiFi, будут детально рассмотрены в отдельной статье).

Здесь будет рассмотрен простейший пример, где источником данных будет выступать текстовый файл и через созданный DataFlow он будет просто копироваться из одной папки в другую.

  1. В файловой системе создадим две папки input и output. Из первой мы будем забирать данные, во вторую копировать. Например:
    /tmp/data/input
    /tmp/data/output
  2. В первой папке создать обычный текстовый файл, например:
    sample.txtС произвольным текстом.
  3. Далее открываем главную страницу NiFi и создаем процессор GetFile. В создаваемом DataFlow он будет забирать файл из заданной папки и передавать его следующему процессору.

    В верхней панели кликнуть на процессор и перетащить его на рабочую область.

    Добавление процессора в NiFi.

    В открывшемся окне выбора типа процессора, в строке поиска, вводим GetFile.

    Выбор процессора GetFile в NiFi.

  4. Перейти во вкладку «Properties» добавленного процессора и указать там значения для следующих свойств:
    • Папку в файловой системе, из которой будут загружаться файлы
    • Регулярное выражение для фильтрации файлов по имени (в нашем случае будут загружены все файлы)
    • Зададим удаление файла после загрузки, в противном случае он будет загружаться и копироваться в бесконечном цикле. Необходимо учесть, что после применения данной настройки исходный файл будет удален, поэтому если он понадобиться в дальнейшем, то лучше сделать резервную копию.

    Настройка процессора GetFile в NiFi.

  5. Аналогичным образом создаем процессор PutFile. В создаваемом DataFlow он будет сохранять полученный от предыдущего процессора файл в заданную папку.

    Выбор процессора PutFile в NiFi.

    Перейти во вкладку «Properties» добавленного процессора и указать там путь к папке в файловой системе в которую будут сохранены файлы.

    Настройка процессора PutFile в NiFi.

  6. Далее необходимо завершим наш DataFlow компонентом Output Port. В верхней панели кликнуть на компонент Output Port и перетащить его на рабочую область.

    Создание Output Port в NiFi.

    В появившемся окне ввести произвольное имя. В нашем случае будет FlowEnd.

    Настройка Output Port в NiFi.

    Примечание. Компонент Output Port можно не создавать, но тогда в компоненте PutFile нужно «включить» чекбоксы в блоке «Automatically Terminate Relationships». Влияние и использование таких настрек в DataFlow будет рассмотрено в отдельной статье.
  7. Соединить процессоры GetFile и PutFile. Для этого необходимо навести курсор мыши в середину процессора GetFile — появится черный кружек с белой стрелкой, направленной вниз вправо. Кликнуть на кружок и «протянуть» линию до процессора PutFile. При успешном соединении между процессорами появится черная линия.
  8. Соединить процессор PutFile и Output Port. В появившемся окне настроек в блоке настроек «For Relationships» отметить оба чекбокса (их назначение будет детально описано в других статьях).

    Создание соеднинения (Connection) между процессором и Output Port.

В результате получился DataFlow. Если есть какие то ошибки в процессорах, то на процессах будет «гореть» восклицательный знак. В нашем случае ошибок нет.

Пример простого DataFlow в NiFi.

Теперь созданный DataFlow нужно запустить. Для этого на каждом процессоре необходимо кликнуть правой кнопкой мыши и выполнить команду «Start».

Запуск DataFlow в NiFi.

Теперь любой файл, который будет копироваться в папку /tmp/data/input будет загружаться в созданный DataFlow в NiFi и копироваться в папку /tmp/data/output. После загрузки исходный файл из /tmp/data/input будет удаляться.

Понятно, что нет особого смысла делать такой поток в NiFi, так как задачу копирования файла легче реализовать используя скрипты. Описанный выше пример приведен исключительно для понимания структуры простейшего DataFlow и способов работы с ним.

При копировании материалов ссылка на сайт обязательна. Вопросы, замечания, предложения и комментарии к статьям можно направлять на адрес: info@datanetworks.ru