*********************** Kafka to ADB (kadb-fdw) *********************** **kadb-fdw** - это расширение для ADB/GPDB, позволяющее производить транзакционную загрузку данных из кластера брокера сообщений Kafka. **Особенности**: + AVRO десереализация; + Хранение смещений Kafka вне кластера Kafka, на стороне потребителя; + Поддержка транзакций ADB/GPDB; + Поддержка Kerberos-аутентификации. Установка с помощью ADCM ======================== Для установки коннектора с помощью **ADCM** требуется инсталлировать сервис *PXF* на все сегментные ноды кластера, а также в списке сервисов выбрать сервис *Kafka to ADB*. При этом необходимые пакеты и файлы автоматически устанавливаются на машины кластера. .. important:: Процесс установки включает в себя выполнение следующей команды: **RESET client_min_messages** SQL-интерфейс ============= Пример ------ .. code-block:: sql -- Create a SERVER DROP SERVER IF EXISTS ka_server; CREATE SERVER ka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'localhost:9092' ); -- Create a FOREIGN TABLE DROP FOREIGN TABLE IF EXISTS ka_table; CREATE FOREIGN TABLE ka_table(field1 INT, field2 TEXT) SERVER ka_server OPTIONS ( format 'avro', k_topic 'test', k_consumer_group 'my_consumer_group', k_seg_batch '100', k_timeout_ms '1000', k_initial_offset '42' ); -- Issue a SELECT query as usual SELECT * FROM ka_table; .. _offset_table: Таблица смещений ---------------- В ходе установки расширения создается служебная схема ``kadb``, содержащая таблицу ``kadb.offsets``. В данную таблицу помещаются пары соответствий партиция-смещение для любой когда-либо создававшейся в текущей базе внешней таблицы (FOREIGN TABLE). Таблицы идентифицируются по значению `OID `_, который можно узнать, выполнив следующую команду: .. code-block:: sql SELECT ''::regclass::oid; При выполнении SELECT-запроса к внешней таблице чтение сообщений из Kafka производится, начиная со смещения, указанного для данной внешней таблицы в таблице ``kadb.offsets``. Смещения можно изменять при помощи обычного SQL-запроса к таблице ``kadb.offsets``. Для новых партиций, записей о которых нет в таблице ``kadb.offsets``, начальное смещение по умолчанию устанавливается равным 0 (значение может быть изменено в параметре :ref:`k_initial_offset`). После успешного выполнения SELECT-запроса к внешней таблице смещение обновляется в соответствии со значениями, полученными от Kafka. Например, если последнее прочитанное сообщение из некоторой партиции имело смещение 84, значение смещения для данной партиции в таблице ``kadb.offsets`` будет равным 85. Опции внешних таблиц -------------------- Для сервера (`SERVER `_) и внешней таблицы (`FOREIGN TABLE `_) возможно указание опций в виде пары ключ-значение. Опции, определенные для сервера и внешней таблицы, не отличаются друг от друга. Иными словами, не имеет значения, для какого объекта они были указаны. Однако опции, определенные для внешней таблицы, более приоритетны, чем опции сервера (в случае, если для обоих объектов была указана одинаковая опция). **Поддерживаемые опции:** k_brokers """"""""" | *Обязательна.* Список брокеров Kafka, разделенный запятыми, где каждый элемент — это строка вида или :. k_topic """"""""" | *Обязательна.* Идентификатор топика Kafka. k_consumer_group """""""""""""""" | *Обязательна.* Идентификатор группы потребителей Kafka. .. _k_initial_offset: k_initial_offset """""""""""""""" | *Неотрицательное целое число. Значение по умолчанию: 0.* Смещение, которое следует использовать для партиций, записей о которых нет в :ref:`таблице смещений`. k_allow_offset_increase """"""""""""""""""""""" | *Булево значение.* | *Значение по умолчанию: true.* Позволять коннектору автоматически увеличивать смещение для любой партиции, если он обнаруживает, что наименьшее смещение среди существующих сообщений в Kafka больше, чем смещение, хранящееся в :ref:`таблице смещений`, или чем значение опции :ref:`k_initial_offset`. В случае, если опция имеет значение false, и наименьшее смещение существующего в Kafka сообщения больше, чем желаемое начальное смещение, генерируется ошибка. k_seg_batch """"""""""" | *Обязательна.* | *Положительное целое число.* Максимальное количество сообщений Kafka, запрашиваемое каждым сегментом кластера ADB/GPDB. При выполнении запроса с условием LIMIT сообщения в Kafka продолжают запрашиваться батчами. В результате смещения в :ref:`таблице смещений` устанавливается на основании *последнего* полученного сообщения для каждой партиции, даже если данные не содержались в результатах запроса. При достижении конца партиции Kafka отправляет специальное сообщение. Таким образом, если значение ``k_seg_batch`` меньше, чем число партиций, определенное для одного сегмента ADB/GPDB и ``k_seg_batch`` этих партиций было полностью прочитано, может возникнуть ситуация, при которой не будет прочитано никаких полезных данных. По этой причине опция ``k_seg_batch`` *должна быть больше, чем максимальное число партиций определенных для одного сегмента*. k_timeout_ms """""""""""" | *Обязательна.* | *Неотрицательное целое число.* Время ожидания выполнения запроса к Kafka в миллисекундах. За это время Kafka-клиентом будут извлечены и представлены в качестве результата SELECT-запроса только сообщения, доступные в Kafka в этот период времени. SELECT-запрос может выполняться быстрее, если в топике Kafka достаточно сообщений. Реальное время выполнения SELECT-запроса может быть значительно больше. Для оценки максимального значения используйте следующее выражение: | *[duration] = [k_timeout_ms] * (2 + ceil( [number of partitions] / [number of segments in cluster] ))* Более всего на длительность запроса влияют партиции Kafka, в которых недостаточно сообщений: коннектор ожидает появления в этих партициях новых сообщений в течение `k_timeout_ms` миллесекунд. На некоторых этапах выполнения SELECT-запроса его принудительная отмена может быть невозможна до окончания периода k_timeout_ms. k_security_protocol """"""""""""""""""" | *Обязательна, если используется Kerberos-аутентификация.* Протокол безопасности, который необходимо использовать для подключения к Kafka. В данный момент доступен только протокол sasl_plaintext. kerberos_keytab """"""""""""""" | *Обязательна, если используется Kerberos-аутентификация.* Путь до keytab-файла для Kerberos-аутентификации. Этот файл должен быть доступен для пользователя, от имени которого происходит запуск процесса ADB/GPDB, а также должен присутствовать на каждом сегменте кластера (по одному и тому же пути). Установка данной опции включает Kerberos-аутентификацию. kerberos_principal """""""""""""""""" | *Значение по умолчанию: kafkaclient.* Имя Kerberos-принципала клиента, который получает доступ к Kafka. kerberos_service_name """"""""""""""""""""" | *Значение по умолчанию: kafka.* Имя Kerberos-принципала, который использует Kafka (не включая ``/hostname@REALM``). При указании данного параметра librdkafka передает его значение как первый аргумент (service) в вызов `sasl_client_new() `_ kerberos_min_time_before_relogin """""""""""""""""""""""""""""""" | *Неотрицательное целое число.* | *Значение по умолчанию: 6000.* Минимальное время в миллисекундах между попытками обновления ключа. Для отключения автоматического обновления ключа неоходимо выставить опцию в значение 0. .. _format: format """""" | *Обязательна.* | *Одно из предопределенных значений (без учета регистра).* Формат сереализованных данных: + avro + csv avro_schema """"""""""" AVRO-схема, которую необходимо использовать. Десериализованная AVRO схема представляет из себя JSON. Получаемые сообщения десериализуются двумя способами: + Если присутствует опция ``avro_schema``, используется указанная схема (сообщение так же должно быть представлено в `OCF `_ формате) + В противном случае, схема извлекается из полученного сообщения в `OCF `_ формате. .. important:: Пользовательская схема не может быть валидирована. Если реальная схема не соответствует указанной, десериализация завершается с ошибкой `ERROR: invalid memory alloc request size`. По этой причине опция ``avro_schema`` должна использоваться только в целях повышения производительности и только после тщательного изучения. .. _csv_quote: csv_quote """"""""" | *Одиночный символ, представимый одним байтом в текущей кодировке.* | *Значение по умолчанию: ``"``.* Символ, используемый в качестве кавычек при парсинге CSV. .. _csv_delimeter: csv_delimeter """"""""""""" | *Одиночный символ, представимый одним байтом в текущей кодировке.* | *Значение по умолчанию: ``,``.* Символ, используемый в качестве разделителя полей при парсинге CSV. csv_null """""""" Строка, используемая в качестве значения ``NULL`` в CSV. *По умолчанию пустая строка интерпретируется как NULL*. csv_ignore_header """"""""""""""""" | *Булево значение.* | *Значение по умолчанию: false.* Игнорировать первую строку каждого сообщения при парсинге. .. _csv_attribute_trim_whitespace: csv_attribute_trim_whitespace """"""""""""""""""""""""""""" | *Булево значение.* | *Значение по умолчанию: true.* Удалять пробелы в начале и конце строки для каждого атрибута (поля) записи. Десериализация -------------- В настоящее время расширение *kafka-fdw* поддерживает сообщения Kafka, сериализованные при помощи одного из следующих форматов: + AVRO `OCF `_ + CSV Метод десериалиализации должен быть указан в опции :ref:`format`. Вне зависимости от используемого метода десериализуется только сообщение Kafka, ключ сообщения игнорируется. AVRO """" `kadb-fdw` предоставляет поддержку формата сериализации AVRO OCF с некоторыми ограничениями. Схема должна содержать только `примитивные `_ типы данных или объединения одного примитивного типа данных с типом ``NULL``. В настоящее время логические типы данных не поддерживаются. Они обрабатываются как обычные типы. **Пример схемы:** .. code-block:: JSON { "name": "doc", "type": "record", "fields": [ { "name": "id", "type": "int" }, { "name": "text", "type": ["string", "null"] }, { "name": "issued_on", "type": "int", }, ] } CSV """ Поддержка CSV реализована при помощи `libcsv `_. Таким образом, учитываются все соглашения о формате CSV, устанавливаемые этой библиотекой. Перечень соглашений представлен в `данном документе `_. Спецификация CSV определена в `RFC 4180 `_. Принимая во внимание эти рекомендации, `kadb-fdw` использует следующие правила парсинга CSV: + Поля (атрибуты) разделяются специальным символом (:ref:`разделителем`); + Строки (записи) разделяются символом новой строки; + Поле может быть заключено в специальные символы (:ref:`кавычки`); + Поля, содержащие символ разделителя, кавычки или новой строки, должны быть заключены в кавычки; + Каждый символ :ref:`кавычки` должен быть экранирован путем его дублирования (добавления идентичного символа перед ним); + Пустое поле всегда интерпретируется как значение ``NULL``; + Пустые строки игнорируются; + Пробелы в начале и конце поля, не заключенного в кавычки, удаляются, если соответствующая :ref:`опция` выставлена в значение true. Значения могут быть конвертированы в любой тип данных PostgreSQL. Используемые правила конвертации аналогичны правилам, применяемым для входных данных `psql`. Примеры использования ===================== Таблица для данных в формате AVRO --------------------------------- .. code-block:: sql DROP SERVER IF EXISTS ka_server CASCADE; CREATE SERVER ka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'localhost:9092' ); CREATE FOREIGN TABLE ka_avro_table( i INT, t TEXT ) SERVER ka_server OPTIONS ( format 'avro', k_topic 'my_topic', k_consumer_group 'my_consumer_group', k_seg_batch '5', k_timeout_ms '1000' ); Таблица для данных в формате CSV -------------------------------- .. code-block:: sql DROP SERVER IF EXISTS ka_server CASCADE; CREATE SERVER ka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'localhost:9092' ); CREATE FOREIGN TABLE ka_csv_table( i INT, t TEXT ) SERVER ka_server OPTIONS ( format 'csv', k_topic 'my_topic', k_consumer_group 'my_consumer_group', k_seg_batch '5', k_timeout_ms '1000' ); Таблица с Kerberos-аутентификацией ---------------------------------- .. code-block:: sql DROP SERVER IF EXISTS ka_kerberized_server CASCADE; CREATE SERVER ka_kerberized_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'ke-kafka-sasl.ru-central1.internal:9092', k_security_protocol 'sasl_plaintext', kerberos_keytab '/root/adbkafka.service.keytab', kerberos_principal 'adbkafka' ); CREATE FOREIGN TABLE ka_table( i INT, t TEXT ) SERVER ka_kerberized_server OPTIONS ( format 'avro', k_topic 'my_topic', k_consumer_group 'my_consumer_group', k_seg_batch '5', k_timeout_ms '1000' );