Объединение потоков данных в Apache NiFi

В Apache NiFi есть возможность объединять потоки данных из разных источников в один поток. В данной статье будет рассмотрен пример объединения двух файлов посредством процессора MergeContent.

Задача. Есть два процессора GetFile, настроенные на папки /tmp/data/input1 и /tmp/data/input2 соответственно. Файлы, которые в них копируются, нужно объединить в один файл. Например, есть файл text1.txt, который содержит строки:

A
B
C

Также есть файл text2.txt, который содержит строки:

1
2
3

В результате в папке /tmp/data/output должен появиться файл, который содержит следующие строки:

A
B
C
1
2
3

  1. На главную страницу Apache NiFi добавим процессор MergeContent. В настройках укажем, что минимальное количество входных файлов (записей) должно быть равно «2». Это нужно для того, чтобы, если из одного процессора GetFile придет файл, то процессор MergeContent ожидал сначала файл из второго процессора GetFile, а затем уже обрабатывал (объединял) их и передавал дальше. Если оставить значение по умолчанию «1», то процессор MergeContent просто передаст полученный файл дальше, без какой либо отработки.

    Apache NiFi MergeContent Properties
  2. Далее укажем в настройках, что передавать дальше нужно только результат объединения полученного контента.

    Apache NiFi MergeContent Settings
  3. Соединим процессоры между собой, чтобы получить завершенную схему.

    Apache NiFi DataFlow MergeContent
  4. Запустим все процессоры и скопируем файл text1.txt в папку /tmp/data/input1 и файл text2.txt в папку /tmp/data/input2. Если настройки были выполнены корректно, то в папке /tmp/data/output появится файл со сгенерированным именем и со следующим содержанием:

    A
    B
    C
    1
    2
    3

Следует отметить, что если мы скопируем файл только в одну из папок, то, благодаря настройке, сделанной в пункте 1, он будет «висеть» в очереди, дожидаясь пока на вход в MergeContent поступит файл из второй папки.

Apache NiFi MergeContent Очередь

Если входящих соединений очень много, то можно воспользоваться специальным компонентом Apache NiFi, называемом воронкой (Funnel).

Apache NiFi Funnel

Можно направить все соединения на воронку, а воронку уже подключить к целевому процессору.

Apache NiFi использование воронки Funnel

Это позволит достаточно быстро поменять целевой процессор, если возникнет такая необходимость. По сути данный компонент просто передает данные дальше по цепочке не выполняя каких либо функций.

При копировании материалов ссылка на сайт обязательна. Вопросы, замечания, предложения и комментарии к статьям можно направлять на адрес: info@datanetworks.ru