В Apache NiFi есть возможность объединять потоки данных из разных источников в один поток. В данной статье будет рассмотрен пример объединения двух файлов посредством процессора MergeContent.
Задача. Есть два процессора GetFile, настроенные на папки /tmp/data/input1 и /tmp/data/input2 соответственно. Файлы, которые в них копируются, нужно объединить в один файл. Например, есть файл text1.txt, который содержит строки:
A
B
C
Также есть файл text2.txt, который содержит строки:
1
2
3
В результате в папке /tmp/data/output должен появиться файл, который содержит следующие строки:
A
B
C
1
2
3
- На главную страницу Apache NiFi добавим процессор MergeContent. В настройках укажем, что минимальное количество входных файлов (записей) должно быть равно «2». Это нужно для того, чтобы, если из одного процессора GetFile придет файл, то процессор MergeContent ожидал сначала файл из второго процессора GetFile, а затем уже обрабатывал (объединял) их и передавал дальше. Если оставить значение по умолчанию «1», то процессор MergeContent просто передаст полученный файл дальше, без какой либо отработки.
- Далее укажем в настройках, что передавать дальше нужно только результат объединения полученного контента.
- Соединим процессоры между собой, чтобы получить завершенную схему.
- Запустим все процессоры и скопируем файл text1.txt в папку /tmp/data/input1 и файл text2.txt в папку /tmp/data/input2. Если настройки были выполнены корректно, то в папке /tmp/data/output появится файл со сгенерированным именем и со следующим содержанием:
A
B
C
1
2
3
Следует отметить, что если мы скопируем файл только в одну из папок, то, благодаря настройке, сделанной в пункте 1, он будет «висеть» в очереди, дожидаясь пока на вход в MergeContent поступит файл из второй папки.
Если входящих соединений очень много, то можно воспользоваться специальным компонентом Apache NiFi, называемом воронкой (Funnel).
Можно направить все соединения на воронку, а воронку уже подключить к целевому процессору.
Это позволит достаточно быстро поменять целевой процессор, если возникнет такая необходимость. По сути данный компонент просто передает данные дальше по цепочке не выполняя каких либо функций.
При копировании материалов ссылка на сайт обязательна. Вопросы, замечания, предложения и комментарии к статьям можно направлять на адрес: info@datanetworks.ru