Kafka to ADB (kadb-fdw)¶
kadb-fdw - это расширение для ADB/GPDB, позволяющее производить транзакционную загрузку данных из кластера брокера сообщений Kafka.
Особенности:
- AVRO десереализация;
- Хранение смещений Kafka вне кластера Kafka, на стороне потребителя;
- Поддержка транзакций ADB/GPDB;
- Поддержка Kerberos-аутентификации.
Установка с помощью ADCM¶
Для установки коннектора с помощью ADCM требуется инсталлировать сервис PXF на все сегментные ноды кластера, а также в списке сервисов выбрать сервис Kafka to ADB. При этом необходимые пакеты и файлы автоматически устанавливаются на машины кластера.
Important
Процесс установки включает в себя выполнение следующей команды: RESET client_min_messages
SQL-интерфейс¶
Пример¶
-- Create a SERVER
DROP SERVER IF EXISTS ka_server;
CREATE SERVER ka_server
FOREIGN DATA WRAPPER kadb_fdw
OPTIONS (
k_brokers 'localhost:9092'
);
-- Create a FOREIGN TABLE
DROP FOREIGN TABLE IF EXISTS ka_table;
CREATE FOREIGN TABLE ka_table(field1 INT, field2 TEXT)
SERVER ka_server
OPTIONS (
format 'avro',
k_topic 'test',
k_consumer_group 'my_consumer_group',
k_seg_batch '100',
k_timeout_ms '1000',
k_initial_offset '42'
);
-- Issue a SELECT query as usual
SELECT * FROM ka_table;
Таблица смещений¶
В ходе установки расширения создается служебная схема kadb
, содержащая таблицу kadb.offsets
.
В данную таблицу помещаются пары соответствий партиция-смещение для любой когда-либо создававшейся в текущей базе
внешней таблицы (FOREIGN TABLE).
Таблицы идентифицируются по значению OID, который можно узнать, выполнив следующую команду:
SELECT '<table name>'::regclass::oid;
При выполнении SELECT-запроса к внешней таблице чтение сообщений из Kafka производится, начиная со смещения, указанного
для данной внешней таблицы в таблице kadb.offsets
.
Смещения можно изменять при помощи обычного SQL-запроса к таблице kadb.offsets
.
Для новых партиций, записей о которых нет в таблице kadb.offsets
, начальное смещение по умолчанию устанавливается
равным 0 (значение может быть изменено в параметре k_initial_offset).
После успешного выполнения SELECT-запроса к внешней таблице смещение обновляется в соответствии со значениями, полученными
от Kafka. Например, если последнее прочитанное сообщение из некоторой партиции имело смещение 84, значение смещения для
данной партиции в таблице kadb.offsets
будет равным 85.
Опции внешних таблиц¶
Для сервера (SERVER) и внешней таблицы (FOREIGN TABLE) возможно указание опций в виде пары ключ-значение.
Опции, определенные для сервера и внешней таблицы, не отличаются друг от друга. Иными словами, не имеет значения, для какого объекта они были указаны. Однако опции, определенные для внешней таблицы, более приоритетны, чем опции сервера (в случае, если для обоих объектов была указана одинаковая опция).
Поддерживаемые опции:
k_brokers¶
Список брокеров Kafka, разделенный запятыми, где каждый элемент — это строка вида <host> или <host>:<port>.
k_initial_offset¶
Смещение, которое следует использовать для партиций, записей о которых нет в таблице смещений.
k_allow_offset_increase¶
Позволять коннектору автоматически увеличивать смещение для любой партиции, если он обнаруживает, что наименьшее смещение среди существующих сообщений в Kafka больше, чем смещение, хранящееся в таблице смещений, или чем значение опции k_initial_offset. В случае, если опция имеет значение false, и наименьшее смещение существующего в Kafka сообщения больше, чем желаемое начальное смещение, генерируется ошибка.
k_seg_batch¶
Максимальное количество сообщений Kafka, запрашиваемое каждым сегментом кластера ADB/GPDB.
При выполнении запроса с условием LIMIT сообщения в Kafka продолжают запрашиваться батчами. В результате смещения в таблице смещений устанавливается на основании последнего полученного сообщения для каждой партиции, даже если данные не содержались в результатах запроса.
При достижении конца партиции Kafka отправляет специальное сообщение. Таким образом, если значение k_seg_batch
меньше,
чем число партиций, определенное для одного сегмента ADB/GPDB и k_seg_batch
этих партиций было полностью прочитано,
может возникнуть ситуация, при которой не будет прочитано никаких полезных данных. По этой причине опция k_seg_batch
должна быть больше, чем максимальное число партиций определенных для одного сегмента.
k_timeout_ms¶
Время ожидания выполнения запроса к Kafka в миллисекундах. За это время Kafka-клиентом будут извлечены и представлены в качестве результата SELECT-запроса только сообщения, доступные в Kafka в этот период времени.
SELECT-запрос может выполняться быстрее, если в топике Kafka достаточно сообщений.
Реальное время выполнения SELECT-запроса может быть значительно больше. Для оценки максимального значения используйте следующее выражение:
Более всего на длительность запроса влияют партиции Kafka, в которых недостаточно сообщений: коннектор ожидает появления в этих партициях новых сообщений в течение k_timeout_ms миллесекунд.
На некоторых этапах выполнения SELECT-запроса его принудительная отмена может быть невозможна до окончания периода k_timeout_ms.
k_security_protocol¶
Протокол безопасности, который необходимо использовать для подключения к Kafka. В данный момент доступен только протокол sasl_plaintext.
kerberos_keytab¶
Путь до keytab-файла для Kerberos-аутентификации. Этот файл должен быть доступен для пользователя, от имени которого происходит запуск процесса ADB/GPDB, а также должен присутствовать на каждом сегменте кластера (по одному и тому же пути).
Установка данной опции включает Kerberos-аутентификацию.
kerberos_principal¶
Имя Kerberos-принципала клиента, который получает доступ к Kafka.
kerberos_service_name¶
Имя Kerberos-принципала, который использует Kafka (не включая /hostname@REALM
).
При указании данного параметра librdkafka передает его значение как первый аргумент (service) в вызов sasl_client_new()
kerberos_min_time_before_relogin¶
Минимальное время в миллисекундах между попытками обновления ключа. Для отключения автоматического обновления ключа неоходимо выставить опцию в значение 0.
format¶
Формат сереализованных данных:
- avro
- csv
avro_schema¶
AVRO-схема, которую необходимо использовать. Десериализованная AVRO схема представляет из себя JSON. Получаемые сообщения десериализуются двумя способами:
- Если присутствует опция
avro_schema
, используется указанная схема (сообщение так же должно быть представлено в OCF формате) - В противном случае, схема извлекается из полученного сообщения в OCF формате.
Important
Пользовательская схема не может быть валидирована. Если реальная схема не соответствует указанной, десериализация
завершается с ошибкой ERROR: invalid memory alloc request size. По этой причине опция avro_schema
должна использоваться
только в целях повышения производительности и только после тщательного изучения.
csv_quote¶
Символ, используемый в качестве кавычек при парсинге CSV.
csv_delimeter¶
Символ, используемый в качестве разделителя полей при парсинге CSV.
csv_null¶
Строка, используемая в качестве значения NULL
в CSV.
По умолчанию пустая строка интерпретируется как NULL.
csv_ignore_header¶
Игнорировать первую строку каждого сообщения при парсинге.
csv_attribute_trim_whitespace¶
Удалять пробелы в начале и конце строки для каждого атрибута (поля) записи.
Десериализация¶
В настоящее время расширение kafka-fdw поддерживает сообщения Kafka, сериализованные при помощи одного из следующих форматов:
- AVRO OCF
- CSV
Метод десериалиализации должен быть указан в опции format.
Вне зависимости от используемого метода десериализуется только сообщение Kafka, ключ сообщения игнорируется.
AVRO¶
kadb-fdw предоставляет поддержку формата сериализации AVRO OCF с некоторыми ограничениями.
Схема должна содержать только примитивные типы
данных или объединения одного примитивного типа данных с типом NULL
.
В настоящее время логические типы данных не поддерживаются. Они обрабатываются как обычные типы.
Пример схемы:
{
"name": "doc",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "text",
"type": ["string", "null"]
},
{
"name": "issued_on",
"type": "int",
},
]
}
CSV¶
Поддержка CSV реализована при помощи libcsv. Таким образом, учитываются все соглашения о формате CSV, устанавливаемые этой библиотекой. Перечень соглашений представлен в данном документе. Спецификация CSV определена в RFC 4180.
Принимая во внимание эти рекомендации, kadb-fdw использует следующие правила парсинга CSV:
- Поля (атрибуты) разделяются специальным символом (разделителем);
- Строки (записи) разделяются символом новой строки;
- Поле может быть заключено в специальные символы (кавычки);
- Поля, содержащие символ разделителя, кавычки или новой строки, должны быть заключены в кавычки;
- Каждый символ кавычки должен быть экранирован путем его дублирования (добавления идентичного символа перед ним);
- Пустое поле всегда интерпретируется как значение
NULL
; - Пустые строки игнорируются;
- Пробелы в начале и конце поля, не заключенного в кавычки, удаляются, если соответствующая опция выставлена в значение true.
Значения могут быть конвертированы в любой тип данных PostgreSQL. Используемые правила конвертации аналогичны правилам, применяемым для входных данных psql.
Примеры использования¶
Таблица для данных в формате AVRO¶
DROP SERVER IF EXISTS ka_server CASCADE;
CREATE SERVER ka_server
FOREIGN DATA WRAPPER kadb_fdw
OPTIONS (
k_brokers 'localhost:9092'
);
CREATE FOREIGN TABLE ka_avro_table(
i INT,
t TEXT
)
SERVER ka_server
OPTIONS (
format 'avro',
k_topic 'my_topic',
k_consumer_group 'my_consumer_group',
k_seg_batch '5',
k_timeout_ms '1000'
);
Таблица для данных в формате CSV¶
DROP SERVER IF EXISTS ka_server CASCADE;
CREATE SERVER ka_server
FOREIGN DATA WRAPPER kadb_fdw
OPTIONS (
k_brokers 'localhost:9092'
);
CREATE FOREIGN TABLE ka_csv_table(
i INT,
t TEXT
)
SERVER ka_server
OPTIONS (
format 'csv',
k_topic 'my_topic',
k_consumer_group 'my_consumer_group',
k_seg_batch '5',
k_timeout_ms '1000'
);
Таблица с Kerberos-аутентификацией¶
DROP SERVER IF EXISTS ka_kerberized_server CASCADE;
CREATE SERVER ka_kerberized_server
FOREIGN DATA WRAPPER kadb_fdw
OPTIONS (
k_brokers 'ke-kafka-sasl.ru-central1.internal:9092',
k_security_protocol 'sasl_plaintext',
kerberos_keytab '/root/adbkafka.service.keytab',
kerberos_principal 'adbkafka'
);
CREATE FOREIGN TABLE ka_table(
i INT,
t TEXT
)
SERVER ka_kerberized_server
OPTIONS (
format 'avro',
k_topic 'my_topic',
k_consumer_group 'my_consumer_group',
k_seg_batch '5',
k_timeout_ms '1000'
);