Работа с базами данных в Apache NiFi

Взаимодействие с различными СУБД — один из главных кейсов использования Apache NiFi. В статье традиционно будет рассмотрен простейший пример работы с БД.

Предполагается, что имеется работающий сервер СУБД PostgreSQL с базой mydb и созданной таблицей filesystem. В таблице есть два поля: filenamefilepath.

Задача. Получить информацию о файлах (имя файла и его путь), находящихся в заданной папке, и сохранить ее в базу данных в соответствующие поля таблицы.

  1. Создадим структуру папок и файлов и настроим на нее процессор GetFile по аналогии с тем как это было сделано в предыдущих статьях (например,Создание простого DataFlow в NiFi).
  2. Добавим процессор ReplaceText. В общем случае можно обойтись и без него, но мы хотим заменить на SQL-запрос весь контент, который приходит из GetFile. В нем определим запрос:INSERT INTO filesystem(filename, filepath) VALUES (‘${filename}’, ‘${path}’)Переменные filename и path это атрибуты, которые передает процессор GetFile.

    Apache NiFi ReplaceText
  3. Добавим процессор PutSQL. В его настройках нужно указать пул подключений к БД, но, так как у нас его еще нет, то создадим новый. Для этого выберем пункт «Create new service…». В окне подтверждения нажать кнопку «Create».

    Apache NiFi Create new JDBC Connection Pool

    Apache NiFi Add controller service
  4. В открывшемся окне указать URL к серверу БД, имя класса драйвера БД, пусть к драйвер для указанной БД (jar-файл), а также логин и пароль для подключения.

    Apache NiFi Controller service details

    В нашем случае используется PostgreSQL, но можно использовать и другие популярные СУБД. По некоторым из них в таблице ниже есть информация.

    DBMS Database Driver Class Name Database Driver
    MySql com.mysql.jdbc.Driver mysql-connector-java-*.*.*.jar
    Oracle oracle.jdbc.driver.OracleDriver ojdbc*.jar
    MS SQL Server com.microsoft.sqlserver.jdbc.SQLServerDriver sqljdbc*.jar

    Файлы драйверов можно легко найти и скачать в интернете.
  5. Включим созданное подключение нажав на значек с «молнией».

    Apache NiFi Connection Pool Parameters
  6. Подтвердим включение в окне подтверждения нажав кнопку «Enable». В этом же окне можно посмотреть в каких процессорах используется подключение.

    Apache NiFi Connection Pool Enable
  7. Запустим созданное подключение и вернуться в настройки процессора PutSQL. В блоке «Automatically Terminate Relationships» включить все три чекбокса.

    Apache NiFi PutSQL Settings
  8. После соединения всех процессоров получилась завершенная схема.

    Apache NiFi DataFlow PutSQL
  9. Запустим созданный DataFlow и с помощью средств работы с БД (pgAdmin, например) откроем таблицу filesystem. В таблице присутствует информация о файлах из папки, на которую был настроен процессор GetFile. В нашем случае файлы повторяются, так как одни и те же файлы были скопированы в разные папки.

    Apache NiFi DataFlow PutSQL

Стоит отметить, что в процессоре PutSQL можно задать значение свойства SQL Statement. Тогда будет исполняться только тот запрос, который указан в этом поле. Но исполнится он столько раз, сколько транзакций придет из предыдущего процессора (если GetFile считает 8 файлов, то он исполнится 8 раз).

Apache NiFi PutSQL SQL Satatement

При копировании материалов ссылка на сайт обязательна. Вопросы, замечания, предложения и комментарии к статьям можно направлять на адрес: info@datanetworks.ru