Настройка Kafka to ADB Connector
Объекты для работы с коннектором
Для отправки данных из Kafka в кластер ADB через Kafka to ADB Connector необходимо предварительно создать следующие объекты на стороне ADB:
-
Server — инкапсулирует информацию о соединении с внешним источником данных.
-
Foreign table — таблица ADB, определяющая структуру внешних данных. Foreign-таблица не хранит данные в ADB, но к ней можно обращаться с помощью запросов как к обычной таблице.
Примеры использования перечисленных объектов для отправки данных из кластера ADS в ADB приведены в статье Примеры использования Kafka to ADB Connector.
ВАЖНО
|
Server
Для создания сервера предназначена команда CREATE SERVER
, базовый синтаксис которой приведен ниже:
CREATE SERVER <server_name> [ TYPE '<server_type>' ] [ VERSION '<server_version>' ]
FOREIGN DATA WRAPPER <fdw_name>
[ OPTIONS ( [ <option> '<value>' [, ... ]] ) ]
где:
-
<server_name>
— имя сервера в ADB. Должно быть уникальным в рамках текущей базы данных ADB. -
<server_type>
— необязательный параметр, определяющий тип сервера. -
<server_version>
— необязательный параметр, определяющий версию сервера. -
<fdw_name>
— имя обертки внешних данных (foreign data wrapper). Необходимо указатьkadb_fdw
— foreign data wrapper, который создается автоматически после установки коннектора (см. шаг 4 в статье Установка Kafka to ADB Connector). -
<option>
— параметры сервера, определяющие детали подключения к внешнему источнику данных. Список возможных опций для Kafka to ADB Connector приведен в таблице Опции сервера ниже. Обратите внимание, что эти опции могут быть определены как на уровне сервера, так и на уровне foreign-таблицы. При этом опции, отмеченные как обязательные, должны быть указаны на одном из уровней. -
<value>
— значения соответствующих параметров<option>
.
ПРИМЕЧАНИЕ
|
Имя | Описание | Default | Обязательность |
---|---|---|---|
k_brokers |
Разделенный запятыми список адресов брокеров Kafka, каждый из которых указан в формате Алиас свойства librdkafka |
— |
Да |
k_topic |
Идентификатор топика Kafka (см. Концепции хранения в Kafka) |
— |
Да |
k_consumer_group |
Идентификатор группы потребителей Kafka. Алиас свойства librdkafka |
— |
Да |
k_seg_batch |
Максимальное количество сообщений Kafka, извлекаемое каждым сегментом кластера ADB. Обратите внимание, что при выполнении запроса с условием Также следует учитывать, что поскольку опция Для ввода допускаются положительные целые числа |
— |
Да |
k_timeout_ms |
Время ожидания выполнения запроса на получение сообщений из Kafka в миллисекундах. Только сообщения, доступные в Kafka в период времени с начала выполнения Запрос Обратите внимание, что фактическое время выполнения запроса На некоторых этапах выполнения запроса Для ввода допускаются неотрицательные целые числа |
— |
Да |
format |
— |
Да |
|
k_latency_ms |
Тайм-аут запросов к метаданным Kafka в миллисекундах. Значение этого параметра следует установить в максимально ожидаемое время отклика (по всем сегментам ADB) любого запроса к метаданным Kafka. Самый длительный запрос к метаданным произодится со стороны функции Следует учитывать, что при выполнении Для ввода допускаются неотрицательные целые числа |
2000 |
Нет |
k_initial_offset |
Начальное смещение, которое следует использовать для партиций, записей о которых нет в таблице kadb.offsets. Эта величина используется, когда установлен флаг Для ввода допускаются неотрицательные целые числа |
0 |
Нет |
k_automatic_offsets |
Параметр, присвоение которому значения
При установке параметра в Обратите внимание, что партиции, присутствующие в Kafka, но отсутствующие в таблице kadb.offsets, не будут видны пользователю в случае использования После успешного выполнения |
true |
Нет |
avro_schema |
AVRO-схема в JSON-формате, которую необходимо использовать для десериализации сообщений. Поступающие сообщения десериализуются коннектором одним из следующих способов: Обратите внимание, что указанная пользователем схема не валидируется. Если фактическая схема не соответствует указанной, десериализация завершается с ошибкой: |
— |
Нет |
csv_quote |
Символ, используемый в качестве кавычек при парсинге CSV. Необходимо указать одиночный символ, представимый одним байтом в текущей кодировке |
" |
Нет |
csv_delimeter |
Символ, используемый в качестве разделителя полей при парсинге CSV. Необходимо указать одиночный символ, представимый одним байтом в текущей кодировке |
, |
Нет |
csv_null |
Строка, используемая в качестве значения |
Пустая строка |
Нет |
csv_ignore_header |
Флаг, указывающий на необходимость игнорирования первой строки каждого сообщения при парсинге CSV |
false |
— |
csv_attribute_trim_whitespace |
Флаг, указывающий на необходимость удаления пробелов в начале и конце каждого поля (атрибута) записи CSV |
true |
— |
Foreign table
Для создания foreign-таблицы предназначена команда CREATE FOREIGN TABLE
, базовый синтаксис которой приведен ниже:
CREATE FOREIGN TABLE [ IF NOT EXISTS ] <table_name> ( [
<column_name> <data_type> [ COLLATE <collation> ] [ <column_constraint> [ ... ] ]
[, ... ]
] )
SERVER <server_name>
[ OPTIONS ( <option> '<value>' [, ... ] ) ]
где:
-
<table_name>
— имя foreign-таблицы в ADB. -
<column_name>
— имя столбца. -
<data_type>
— тип данных столбца. -
<collation>
— используемая для столбца сортировка (collation). -
<column_constraint>
— ограничение (constraint), определенное на уровне столбца. Имя ограничения<constraint_name>
указывается опционально. Синтаксис:[ CONSTRAINT <constraint_name> ] { NOT NULL | NULL | DEFAULT <default_expr> }
Возможные ограничения:
-
NOT NULL
— указывает, что столбец не может содержать null-значений. -
NULL
— указывает, что столбец может содержать null-значения. Это ограничение используется по умолчанию (если не указаноNOT NULL
). -
DEFAULT
— определяет для столбца значение по умолчанию<default_expr>
.
-
-
<server_name>
— имя сервера. -
<option>
— параметры foreign-таблицы. Для Kafka to ADB Connector все опции, определенные на уровне сервера , могут быть переопределены на уровне foreign-таблиц (частично или полностью). При одновременном указании какой-либо опции для сервера и таблицы табличный уровень имеет больший приоритет. -
<value>
— значения соответствующих параметров<option>
.
ПРИМЕЧАНИЕ
|
Схема kadb
В ходе установки Kafka to ADB Connector в дефолтной базе данных ADB создается специальная схема kadb
, содержащая таблицу kadb.offsets и набор функций для синхронизации смещений.
Таблица смещений kadb.offsets
В таблице kadb.offsets
сохраняются соответствия вида "партиция/смещение" для каждой foreign-таблицы, когда-либо созданной в текущей базе данных. Использование таблицы kadb.offsets
обеспечивает хранение смещений вне Kafka — на стороне потребителя. Ниже приведена структура таблицы kadb.offsets
.
Столбец | Описание |
---|---|
ftoid |
Идентификатор foreign-таблицы (OID) |
prt |
Идентификатор партиции в Kafka |
off |
Смещение |
Для идентификации каждой foreign table используется OID. Чтобы определить OID, можно использовать следующий запрос:
SELECT '<schema>.<table>'::regclass::oid;
где:
-
<schema>
— название схемы, в которой создана foreign-таблица. Можно не указывать, если настроен schema search path. -
<table>
— имя foreign-таблицы.
При каждом выполнении запроса SELECT
к foreign-таблице выполняется обращение к таблице kadb.offsets
, после чего сообщения считываются из Kafka начиная с номера смещения, указанного в таблице kadb.offsets
для выбранных OID таблицы и партиции. Например, если номер смещения для некоторой партиции установлен в 15
— первым из партиции Kafka будет прочитано сообщение со смещением 15
.
Набор партиций и их смещений можно редактировать при помощи обычного SQL-запроса к таблице kadb.offsets
. Также для этой цели могут быть использованы функции.
Для новых партиций, записей о которых еще нет в таблице kadb.offsets
, начальное смещение по умолчанию устанавливается равным 0
. Эту величину можно изменить путем редактирования опции k_initial_offset
.
После успешного выполнения запроса SELECT
к foreign-таблице значения смещений обновляются в таблице kadb.offsets
на основе полученных из Kafka данных. Например, если последним из какой-либо партиции было прочитано сообщение со смещением 25
, в столбце kadb.offsets.off
для этой партиции будет сохранено значение 26
. И при следующем чтении данных из Kafka именно эта величина будет использоваться в качестве начального смещения.
ВНИМАНИЕ
На текущий момент содержимое таблицы |
Функции
Коннектор Kafka to ADB поддерживает несколько функций для синхронизации смещений в Kafka с соответствующими смещениями в таблице kadb.offsets. Функции хранятся в схеме kadb
. При их вызове учитывайте следующее:
-
Ни одна из функций не предоставляет транзакционных гарантий для Kafka — невозможно предположить изменение смещений в Kafka до или после вызова функции, даже если он произведен в рамках одной транзакции с запросом
SELECT
к foreign-таблице. -
Некоторые функции не атомарны — они не создают "снепшот" всех партиций Kafka, результат формируется для каждой партиции независимо, в немного разные моменты времени.
Функция | Параметры | Описание | Атомарность |
---|---|---|---|
kadb.commit_offsets(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID коммитит смещения из |
Да |
kadb.load_partitions(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID загружает список партиций, существующих в Kafka. Результат функции содержит следующие поля:
|
Нет |
kadb.partitions_obtain(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID добавляет партиции, полученные функцией |
Нет |
kadb.partitions_clean(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID удаляет из таблицы |
Нет |
kadb.partitions_reset(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID удаляет все записи из таблицы |
Нет |
kadb.load_offsets_at_timestamp(OID, BIGINT) |
|
Для foreign-таблицы с указанным OID загружает из Kafka наиболее ранние смещения с временными метками большими либо равными, чем метка, указанная в параметрах функции. Результат возвращается только для партиций, присутствующих в таблице Результат функции содержит следующие поля:
|
Да |
kadb.offsets_to_timestamp(OID, BIGINT) |
|
Для foreign-таблицы с указанным OID изменяет смещения в таблице |
Да |
kadb.load_offsets_earliest(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID загружает из Kafka наиболее ранние смещения. Результат возвращается только для партиций, присутствующих в таблице Результат функции содержит следующие поля:
|
Нет |
kadb.offsets_to_earliest(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID изменяет смещения в таблице |
Нет |
kadb.load_offsets_latest(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID загружает из Kafka наиболее поздние смещения. Результат возвращается только для партиций, присутствующих в таблице Результат функции содержит следующие поля:
|
Нет |
kadb.offsets_to_latest(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID изменяет смещения в таблице В результате последующие запросы |
Нет |
kadb.load_offsets_committed(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID загружает из Kafka наиболее поздние закоммиченные смещения. Результат возвращается только для партиций, присутствующих в таблице Результат функции содержит следующие поля:
|
Да |
kadb.offsets_to_committed(OID) |
OID foreign-таблицы |
Для foreign-таблицы с указанным OID изменяет смещения в таблице |
Да |
Поддерживаемые форматы
Kafka to ADB Connector поддерживает десериализацию сообщений, созданных с применением следующих форматов:
Метод десериализации необходимо явно указать при помощи опции format
.
ПРИМЕЧАНИЕ
Независимо от выбранного формата, десериализуются только значения в составе сообщений Kafka (values). Ключи сообщений (keys) игнорируются. |
AVRO OCF
Формат AVRO OCF поддерживается коннектором с рядом ограничений:
-
Схема должна содержать только примитивные типы данных. Комплексные типы не поддерживаются, однако есть два исключения:
-
Объединения (
union
) одного примитивного типа данных с типомnull
(за исключением объединения полученных объединений с типомnull
). -
Тип
fixed
, поскольку он обрабатывается аналогичноbytes
.
-
-
Логические типы разрешены к использованию.
-
Определение foreign-таблицы в ADB должно соответствовать используемой схеме AVRO. При этом важно, чтобы порядок столбцов в таблице совпадал с порядком полей в схеме.
В таблице ниже приведены возможные варианты преобразования типов данных AVRO в типы PostgreSQL при использовании коннектора Kafka to ADB.
Тип данных AVRO | Тип данных PostgreSQL |
---|---|
string |
TEXT, BPCHAR, VARCHAR |
string |
Кастомный тип PostgreSQL (например, |
null |
Любой тип данных PostgreSQL в столбце без ограничения (constraint) |
boolean |
BOOLEAN |
int |
INTEGER |
long |
BIGINT |
float |
REAL |
double |
DOUBLE PRECISION |
bytes, fixed |
BYTEA |
decimal |
NUMERIC |
date |
DATE |
time-millis, time-micros |
TIME |
timestamp-millis |
TIMESTAMP(<N>), где |
timestamp-micros |
TIMESTAMP; TIMESTAMP(<N>), где |
duration |
INTERVAL |
Ниже приведены примеры схем AVRO, являющихся валидными для коннектора Kafka to ADB.
{
"name":"doc",
"type":"record",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"text",
"type":[
"string",
"null"
]
},
{
"name":"issued_on",
"type":"int",
"logicalType":"date"
}
]
}
{
"name":"doc",
"type":"record",
"fields":[
{
"name":"d",
"type":"int",
"logicalType":"date"
},
{
"name":"t_ms",
"type":"int",
"logicalType":"time-millis"
},
{
"name":"t_us",
"type":"long",
"logicalType":"time-micros"
},
{
"name":"ts_ms",
"type":"long",
"logicalType":"timestamp-millis"
},
{
"name":"ts_us",
"type":"long",
"logicalType":"timestamp-micros"
},
{
"name":"dur",
"type":{
"name":"dur_fixed",
"type":"fixed",
"size":12,
"logicalType":"duration"
}
},
{
"name":"dec_1",
"type":{
"name":"dec_2_fixed",
"type":"fixed",
"size":6,
"logicalType":"decimal"
}
},
{
"name":"dec_2",
"type":{
"name":"dec_2_fixed",
"type":"bytes",
"logicalType":"decimal",
"precision":14,
"scale":4
}
}
]
}
CSV
Поддержка формата CSV реализована в коннекторе при помощи библиотеки lbcsv. Таким образом, учитываются все соглашения о формате CSV, устанавливаемые этой библиотекой. Спецификация CSV определена в RFC 4180.
Учитывая упомянутые выше рекомендации, Kafka to ADB Connector использует следующие правила обработки CSV:
-
Поля (атрибуты) CSV-сообщения разделяются специальным символом, определенным с помощью опции
csv_delimeter
. -
Строки (записи) CSV-сообщения разделяются символами перехода на новую строку.
-
Поля могут быть заключены в кавычки, определенные с помощью опции
csv_quote
. -
Поля, содержащие символы разделителя, кавычки или перехода на новую строку, должны быть заключены в кавычки.
-
Каждый символ кавычек, используемый в полях, должен быть экранирован путем добавления дублирующего символа кавычек перед ним.
-
Пустые поля парсятся как значения
NULL
. -
Пустые строки игнорируются.
-
Пробелы в начале и в конце полей, не заключенных в кавычки, удаляются (если установлена опция
csv_attribute_trim_whitespace
).
Поля из CSV-сообщений могут быть конвертированы в любой тип данных PostgreSQL. Правила конвертации идентичны тем, что используются для текстового ввода в psql.
Text
Текстовый формат используется для десериализации данных, представленных в Kafka-сообщениях в "сыром" (raw) виде. При выборе текстового формата сообщения обрабатываются коннектором следующим образом:
-
Каждое сообщение, прочитанное из Kafka, рассматривается как один столбец одного кортежа (строки) foreign-таблицы.
-
Данные парсятся PostgreSQL как предоставленные пользователем текстовые данные.
-
Сообщения с пустым содержимым (нулевой длины) парсятся как значения
NULL
.
ВНИМАНИЕ
Чтобы использовать формат |
Особенности использования коннектора
Ниже приведены особенности реализации коннектора Kafka to ADB, оказывающие непосредственное влияние на его использование.
Параллельные запросы SELECT и global deadloack detector
Как упоминалось выше, коннектор обращается к таблице kadb.offsets
при каждом запросе SELECT
к foreign-таблицам. Таблица смещений kadb.offsets
имеет тип DISTRIBUTED REPLICATED
в базе данных ADB. При этом со стороны коннектора к ней возможны как запросы INSERT
, так и UPDATE
. Таким образом, ограничения, касающиеся параллельных транзакций, так или иначе затрагивают запросы SELECT
к foreign-таблицам.
Способ обработки параллельных (конкурирующих) транзакций в Greenplum определяется текущим состоянием global deadlock detector.
Когда global deadlock detector отключен, каждому запросу UPDATE
требуется блокировка с типом ExclusiveLock
, что фактически означает блокировку всей обновляемой таблицы. В Kafka to ADB Connector это означает, что параллельные запросы SELECT
к разным foreign-таблицам невозможны. Такие запросы SELECT
будут выполняться последовательно, по одному.
Чтобы разрешить выполнение множества одновременных запросов SELECT
к разным foreign-таблицам, необходимо включить global deadlock detector. Когда детектор блокировок включен, каждому запросу UPDATE
требуется блокировка с типом RowExclusiveLock
, что делает возможными параллельные запросы UPDATE
к таблице kadb.offsets
, а значит, и запросы SELECT
к foreign-таблицам.
Для включения global deadlock detector, присвойте параметру GUC gp_enable_global_deadlock_detector
значение on
:
$ gpconfig -c gp_enable_global_deadlock_detector -v on
Одновременные запросы SELECT
к одной foreign-таблице невозможны. В некоторых случаях подобные запросы могут завершиться успешно, вернув корректные результаты (одинаковые для всех одновременных запросов); однако это не гарантируется.
Распределение партиций между сегментами
Каждый запрос SELECT
к foreign-таблице учитывает только те партиции, что определены в таблице kadb.offsets. Содержимое kadb.offsets
может быть изменено до выполнения SELECT
— автоматически, если установлена опция k_automatic_offsets
, либо вручную через функции.
Партиции распределяются между сегментами кластера ADB для обработки согласно следующим правилам:
-
Распределение происходит равномерно по всем сегментам. Число партиций, назначенное каждому сегменту, не отличается более, чем на единицу, по отношению к другим сегментам.
-
Порядок партиций (возвращенный Kafka) и порядок сегментов, которым они назначаются, совпадает.
Например, в кластере ADB, состоящем из 3
сегментов, партиции Kafka с номерами [0, 2, 3, 4, 1]
будут распределены следующим образом.
Сегмент | Партиции |
---|---|
1 |
[0, 2] |
2 |
[3, 4] |
3 |
[1] |
Отмена запросов пользователем
Любой запрос, включающий вызов функций librdkafka либо SELECT
к foreign-таблице, может быть прерван пользователем до его завершения. Запрос гарантированно завершится менее, чем за одну секунду после того, как все сегменты ADB получат сигнал об отмене.
Однако активные подключения к Kafka не закрываются моментально после отмены запроса. В нормальных условиях подключения будут закрыты в пределах времени, указанного в опции k_latency_ms
. Но в ситуациях, когда Kafka не отвечает (например, при сбое в Kerberos-аутентификации), соединение может оставаться активным неограниченный период времени.
Для гарантированного освобождения всех ресурсов необходимо закрыть сессию ADB, в рамках которой был отправлен запрос.
Длительность запросов SELECT
Если запрос SELECT
к foreign-таблице не прерывается, его максимальная длительность может быть оценена по следующей формуле:
Опции k_latency_ms
и k_timeout_ms
оказывают значительное влияние на продолжительность запросов SELECT
. Поэтому следует внимательно отнестись к выбору их значений.