Создание интеграции сервиса Apache Kafka с базами данных средствами Apache NiFi

В одной из предыдущих статей Работа с базами данных в Apache NiFi мы описали способ взаимодействия Apache NiFi с базой данных. В данной статье будет описан метод интеграции Apache Kafka c базами данных посредством Apache NiFi.

Пояснение. Apache Kafka — брокер сообщений, система для передачи сообщений между различными системами. Получил большую популярность благодаря таким свойствам как распределенность, масштабируемость, способность наращивать пропускную способность при возрастании нагрузки и ряду других особенностей. Как и в других подобных системах, клиенты делятся на подписчиков (Consumer), которые читают сообщения из очередей, и писателей (Publisher), которые записывают сообщения в очереди. Более детально с системой можно ознакомится на сайте проекта Apache Kafka.

Задача. Получить сообщение из топика (очереди) Apache Kafka и сохранить его в базу данных в поле kafka_message таблицы Check.

  1. На главную страницу Apache NiFi добавим процессор ConsumeKafka. В настройках процессора укажем брокер (сервер) Kafka и название топика. В нашем случае топик называется testkf Также нужно указать свойство Group ID, его значение можно найти в папке config Kafka в файле consumer.properties.

    Файл настроек Apache Kafka consumer.properties

    Настройки процессора ConsumeKafka:

    Apache NiFi Настройки процессора ConsumeKafka
  2. Добавим процессор ExtractText. С его помощью мы поместим сообщение, считанное из топика в добавленный атрибут kafka.message. Регулярное выражение (.*) указывает, что мы будем записывать в атрибут весь контент, то есть все полученное значение. Если нужно, то с помощью регулярных выражений можно добавить фильтрацию, тогда в атрибут будут записываться только те сообщения, которые соответствуют фильтру.

    Apache NiFi Настройки процессора ExtractText
  3. Добавим процессор ReplaceText. Его мы используем для построения SQL запроса. В целом, без него можно было бы обойтись и добавить запрос в процессоре PutSql, но для простоты понимания мы решили разделить задачи и сформировать запрос на отдельном, от его выполнения, шаге.

    Apache NiFi Настройки процессора ReplaceText
  4. Добавим процессор PutSQL. Рекомендации по его настройке приводились в упомянутой выше статье про работу с базами данными в Apache NiFi.
  5. Соединим процессоры между собой, чтобы получить завершенную схему.

    Apache NiFi DataFlow интеграции Apache Kafka и базы данных
  6. Запустим все процессоры и с помощью какого либо клиента Kafka (или тестовой утилиты Kafka) отправим в топик testkf несколько сообщений. В нашем случае это сообщения вида «Message from kafka (Topic: testkf) 000000».
  7. C помощью средств работы с БД (pgAdmin, например) откроем таблицу Check. Поле kafka_message таблицы содержит сообщения, которые были отправлены в топик testkf.

    Apache NiFi PutSQL

Для взаимодействия с брокерами сообщений, которые используют протокол AMQP (например, RabbitMQ, Apache Qpid, Apache ActiveMQ), в Apache NiFi тоже есть процессоры ConsumeAMQP и PublishAMQP для чтения и записи сообщений в очереди (топики в терминах Kafka).

При копировании материалов ссылка на сайт обязательна. Вопросы, замечания, предложения и комментарии к статьям можно направлять на адрес: info@datanetworks.ru