Настройка Kafka to ADB Connector

Объекты для работы с коннектором

Для отправки данных из Kafka в кластер ADB через Kafka to ADB Connector необходимо предварительно создать следующие объекты на стороне ADB:

  1. Server — инкапсулирует информацию о соединении с внешним источником данных.

  2. Foreign table — таблица ADB, определяющая структуру внешних данных. Foreign-таблица не хранит данные в ADB, но к ней можно обращаться с помощью запросов как к обычной таблице.

Примеры использования перечисленных объектов для отправки данных из кластера ADS в ADB приведены в статье Примеры использования Kafka to ADB Connector.

ВАЖНО
  • Для каждого кластера Kafka достаточно создать один server на стороне ADB. Foreign-таблиц может быть несколько, если чтение данных запланировано, например, из различных топиков Kafka.

  • Опции, приведенные ниже, могут быть заполнены как при объявлении server, так и при создании foreign table. При одновременном указании в двух местах наибольший приоритет у опций, определенных на уровне foreign table.

  • Значения всех опций заполняются в строковом виде (в одинарных кавычках ').

  • Названия опций, содержащие знаки # или ., необходимо помещать в двойные кавычки ". В иных случаях кавычки для имен опций не требуются.

Server

Для создания сервера предназначена команда CREATE SERVER, базовый синтаксис которой приведен ниже:

CREATE SERVER <server_name> [ TYPE '<server_type>' ] [ VERSION '<server_version>' ]
    FOREIGN DATA WRAPPER <fdw_name>
    [ OPTIONS ( [ <option> '<value>' [, ... ]] ) ]

где:

  • <server_name> — имя сервера в ADB. Должно быть уникальным в рамках текущей базы данных ADB.

  • <server_type> — необязательный параметр, определяющий тип сервера.

  • <server_version> — необязательный параметр, определяющий версию сервера.

  • <fdw_name> — имя обертки внешних данных (foreign data wrapper). Необходимо указать kadb_fdw — foreign data wrapper, который создается автоматически после установки коннектора (см. шаг 4 в статье Установка Kafka to ADB Connector).

  • <option> — параметры сервера, определяющие детали подключения к внешнему источнику данных. Список возможных опций для Kafka to ADB Connector приведен в таблице Опции сервера ниже. Обратите внимание, что эти опции могут быть определены как на уровне сервера, так и на уровне foreign-таблицы. При этом опции, отмеченные как обязательные, должны быть указаны на одном из уровней.

  • <value> — значения соответствующих параметров <option>.

ПРИМЕЧАНИЕ
  • Чтобы создать сервер, пользователю необходима привилегия USAGE для обертки внешних данных kadb_fdw. Пользователь, создавший сервер, становится его владельцем.

  • Полную версию синтаксиса команды CREATE SERVER можно посмотреть в документации Greenplum.

  • Для редактирования параметров сервера предназначена команда ALTER SERVER, для удаления — DROP SERVER.

Опции сервера
Имя Описание Default Обязательность

k_brokers

Разделенный запятыми список адресов брокеров Kafka, каждый из которых указан в формате <имя хоста>:<порт> или <IP-адрес>:<порт>.

Алиас свойства librdkafka bootstrap.servers

 — 

Да

k_topic

Идентификатор топика Kafka (см. Концепции хранения в Kafka)

 — 

Да

k_consumer_group

Идентификатор группы потребителей Kafka.

Алиас свойства librdkafka group.id

 — 

Да

k_seg_batch

Максимальное количество сообщений Kafka, извлекаемое каждым сегментом кластера ADB.

Обратите внимание, что при выполнении запроса с условием LIMIT сообщения в Kafka также запрашиваются батчами. В результате смещения в таблице kadb.offsets устанавливаются равными смещениям последних полученных сообщений для каждой партиции, даже если данные этих сообщений не будут содержаться в результатах запроса.

Также следует учитывать, что поскольку опция k_seg_batch ограничивает число сообщений для чтения, при выполнении некоторых запросов SELECT могут появиться партиции, из которых не будет прочитано никаких полезных данных.

Для ввода допускаются положительные целые числа

 — 

Да

k_timeout_ms

Время ожидания выполнения запроса на получение сообщений из Kafka в миллисекундах. Только сообщения, доступные в Kafka в период времени с начала выполнения SELECT до истечения k_timeout_ms, будут извлечены и представлены как результат запроса SELECT. Каждый сегмент потребляет сообщения одновременно из всех партиций, назначенных ему.

Запрос SELECT может выполниться быстрее, если в партициях Kafka достаточно сообщений (если k_seg_batch сообщений прочитано каждым сегментом). Более всего на длительность запроса влияют партиции Kafka, в которых недостаточно сообщений: коннектор ожидает появления в этих партициях новых сообщений в течение k_timeout_ms миллесекунд.

Обратите внимание, что фактическое время выполнения запроса SELECT может быть больше, чем величина k_timeout_ms.

На некоторых этапах выполнения запроса SELECT его принудительная отмена может быть невозможна до окончания периода k_timeout_ms.

Для ввода допускаются неотрицательные целые числа

 — 

Да

format

Формат сереализации данных. Возможные значения (без учета регистра):

 — 

Да

k_latency_ms

Тайм-аут запросов к метаданным Kafka в миллисекундах.

Значение этого параметра следует установить в максимально ожидаемое время отклика (по всем сегментам ADB) любого запроса к метаданным Kafka. Самый длительный запрос к метаданным произодится со стороны функции kadb.offsets_to_committed(<OID>) — ее можно использовать для выбора минимального значения параметра k_latency_ms.

Следует учитывать, что при выполнении SELECT происходит несколько запросов к метаданным. Если тайм-аут превышен, запрос завершается с ошибкой.

Для ввода допускаются неотрицательные целые числа

2000

Нет

k_initial_offset

Начальное смещение, которое следует использовать для партиций, записей о которых нет в таблице kadb.offsets. Эта величина используется, когда установлен флаг k_automatic_offsets, а также при вызове функций по управлению смещениями.

Для ввода допускаются неотрицательные целые числа

0

Нет

k_automatic_offsets

Параметр, присвоение которому значения true приводит к следующему:

  • Перед каждым SELECT к foreign table:

    • Добавление партиций, присутствующих в Kafka и отсутствующих в таблице kadb.offsets, к набору партиций, откуда производится чтение данных.

    • Автоматическое увеличение стартового смещения любой партиции до наиболее раннего смещения в Kafka. Если такое увеличение происходит, формируется уведомление (NOTICE).

  • После каждого SELECT к foreign table:

    • Обновление таблицы kadb.offsets путем добавления партиций, присутствующих в Kafka, но отсутствующих в ADB (с помошью INSERT).

При установке параметра в false формируется ошибка ERROR, если наименьшее смещение сообщения, присутствующего в Kafka, больше смещения в таблице kadb.offsets (для любой партиции).

Обратите внимание, что партиции, присутствующие в Kafka, но отсутствующие в таблице kadb.offsets, не будут видны пользователю в случае использования CURSOR: как сказано выше, вставка данных в таблицу kadb.offsets производится после выполнения SELECT, тогда как запросы с CURSOR непрерывно выполняются.

После успешного выполнения SELECT смещения в таблице kadb.offsets изменяются независимо от значения параметра k_automatic_offsets, отражая число сообщений, прочитанных из Kafka

true

Нет

avro_schema

AVRO-схема в JSON-формате, которую необходимо использовать для десериализации сообщений.

Поступающие сообщения десериализуются коннектором одним из следующих способов:

  • Если заполнена опция avro_schema, для десериализации используется представленная в ней схема (входящие сообщения при этом должны быть по-прежнему в формате AVRO OCF).

  • В противном случае схема определяется непосредственно на основе входящего сообщения в формате AVRO OCF.

Обратите внимание, что указанная пользователем схема не валидируется. Если фактическая схема не соответствует указанной, десериализация завершается с ошибкой: ERROR: invalid memory alloc request size. По этой причине опция avro_schema должна использоваться только в целях повышения производительности и после тщательного анализа

 — 

Нет

csv_quote

Символ, используемый в качестве кавычек при парсинге CSV.

Необходимо указать одиночный символ, представимый одним байтом в текущей кодировке

"

Нет

csv_delimeter

Символ, используемый в качестве разделителя полей при парсинге CSV.

Необходимо указать одиночный символ, представимый одним байтом в текущей кодировке

,

Нет

csv_null

Строка, используемая в качестве значения NULL в CSV

Пустая строка

Нет

csv_ignore_header

Флаг, указывающий на необходимость игнорирования первой строки каждого сообщения при парсинге CSV

false

 — 

csv_attribute_trim_whitespace

Флаг, указывающий на необходимость удаления пробелов в начале и конце каждого поля (атрибута) записи CSV

true

 — 

Параметры librdkafka

 
Наряду с перечисленными выше опциями, при объявлении server и foreign table можно указать конфигурационные параметры librdkafka.

Имя каждого параметра librdkafka необходимо начинать со знака # и обрамлять двойными кавычками ". Например: "#bootstrap.servers", "#security.protocol" и так далее.

Параметры с именем из документации librdkafka имеют приоритет над своими алиасами, приведенными выше.

Перечисленные ниже параметры librdkafka устанавливать запрещено:

  • enable.auto.commit;

  • enable.partition.eof;

  • plugin.library.paths;

  • interceptors;

  • все параметры, имена которых заканчиваются на _cb.

Foreign table

Для создания foreign-таблицы предназначена команда CREATE FOREIGN TABLE, базовый синтаксис которой приведен ниже:

CREATE FOREIGN TABLE [ IF NOT EXISTS ] <table_name> ( [
    <column_name> <data_type> [ COLLATE <collation> ] [ <column_constraint> [ ... ] ]
      [, ... ]
] )
    SERVER <server_name>
  [ OPTIONS ( <option> '<value>' [, ... ] ) ]

где:

  • <table_name> — имя foreign-таблицы в ADB.

  • <column_name> — имя столбца.

  • <data_type> — тип данных столбца.

  • <collation> — используемая для столбца сортировка (collation).

  • <column_constraint> — ограничение (constraint), определенное на уровне столбца. Имя ограничения <constraint_name> указывается опционально. Синтаксис:

    [ CONSTRAINT <constraint_name> ]
    { NOT NULL |
      NULL |
      DEFAULT <default_expr> }

    Возможные ограничения:

    • NOT NULL — указывает, что столбец не может содержать null-значений.

    • NULL — указывает, что столбец может содержать null-значения. Это ограничение используется по умолчанию (если не указано NOT NULL).

    • DEFAULT — определяет для столбца значение по умолчанию <default_expr>.

  • <server_name> — имя сервера.

  • <option> — параметры foreign-таблицы. Для Kafka to ADB Connector все опции, определенные на уровне сервера , могут быть переопределены на уровне foreign-таблиц (частично или полностью). При одновременном указании какой-либо опции для сервера и таблицы табличный уровень имеет больший приоритет.

  • <value> — значения соответствующих параметров <option>.

ПРИМЕЧАНИЕ
  • Чтобы создать foreign-таблицу, пользователю необходима привилегия USAGE для соответствующего сервера, а также для всех типов данных, используемых в таблице. Пользователь, создавший foreign-таблицу, становится ее владельцем.

  • Полную версию синтаксиса команды CREATE FOREIGN TABLE можно посмотреть в документации Greenplum.

  • Для редактирования параметров foreign-таблицы предназначена команда ALTER FOREIGN TABLE, для удаления — DROP FOREIGN TABLE.

Схема kadb

В ходе установки Kafka to ADB Connector в дефолтной базе данных ADB создается специальная схема kadb, содержащая таблицу kadb.offsets и набор функций для синхронизации смещений.

Таблица смещений kadb.offsets

В таблице kadb.offsets сохраняются соответствия вида "партиция/смещение" для каждой foreign-таблицы, когда-либо созданной в текущей базе данных. Использование таблицы kadb.offsets обеспечивает хранение смещений вне Kafka — на стороне потребителя. Ниже приведена структура таблицы kadb.offsets.

Столбец Описание

ftoid

Идентификатор foreign-таблицы (OID)

prt

Идентификатор партиции в Kafka

off

Смещение

Для идентификации каждой foreign table используется OID. Чтобы определить OID, можно использовать следующий запрос:

SELECT '<schema>.<table>'::regclass::oid;

где:

  • <schema> — название схемы, в которой создана foreign-таблица. Можно не указывать, если настроен schema search path.

  • <table> — имя foreign-таблицы.

При каждом выполнении запроса SELECT к foreign-таблице выполняется обращение к таблице kadb.offsets, после чего сообщения считываются из Kafka начиная с номера смещения, указанного в таблице kadb.offsets для выбранных OID таблицы и партиции. Например, если номер смещения для некоторой партиции установлен в 15 — первым из партиции Kafka будет прочитано сообщение со смещением 15.

Набор партиций и их смещений можно редактировать при помощи обычного SQL-запроса к таблице kadb.offsets. Также для этой цели могут быть использованы функции.

Для новых партиций, записей о которых еще нет в таблице kadb.offsets, начальное смещение по умолчанию устанавливается равным 0. Эту величину можно изменить путем редактирования опции k_initial_offset.

После успешного выполнения запроса SELECT к foreign-таблице значения смещений обновляются в таблице kadb.offsets на основе полученных из Kafka данных. Например, если последним из какой-либо партиции было прочитано сообщение со смещением 25, в столбце kadb.offsets.off для этой партиции будет сохранено значение 26. И при следующем чтении данных из Kafka именно эта величина будет использоваться в качестве начального смещения.

ВНИМАНИЕ

На текущий момент содержимое таблицы kadb.offsets не бэкапируется утилитой gpbackup, поскольку таблица не рассматривается как конфигурационная. Поэтому при использовании Kafka to ADB Connector резервные копии kadb.offsets необходимо создавать самостоятельно.

Функции

Коннектор Kafka to ADB поддерживает несколько функций для синхронизации смещений в Kafka с соответствующими смещениями в таблице kadb.offsets. Функции хранятся в схеме kadb. При их вызове учитывайте следующее:

  • Ни одна из функций не предоставляет транзакционных гарантий для Kafka — невозможно предположить изменение смещений в Kafka до или после вызова функции, даже если он произведен в рамках одной транзакции с запросом SELECT к foreign-таблице.

  • Некоторые функции не атомарны — они не создают "снепшот" всех партиций Kafka, результат формируется для каждой партиции независимо, в немного разные моменты времени.

Доступные функции для работы со смещениями
Функция Параметры Описание Атомарность

kadb.commit_offsets(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID коммитит смещения из kadb.offsets в Kafka

Да

kadb.load_partitions(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID загружает список партиций, существующих в Kafka.

Результат функции содержит следующие поля:

  • ftoid — OID foreign-таблицы;

  • prt — идентификатор партиции;

  • off — значение опции k_initial_offset.

Нет

kadb.partitions_obtain(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID добавляет партиции, полученные функцией kadb.load_partitions(OID), в kadb.offsets. При этом добавляются только новые партиции, уже существующие остаются без изменений

Нет

kadb.partitions_clean(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID удаляет из таблицы kadb.offsets записи обо всех партициях, отсутствующих в Kafka

Нет

kadb.partitions_reset(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID удаляет все записи из таблицы kadb.offsets, добавляя вместо этого записи, возвращенные функцией kadb.load_partitions(OID)

Нет

kadb.load_offsets_at_timestamp(OID, BIGINT)

  • OID foreign-таблицы.

  • Временная метка — количество миллисекунд после начала UNIX Epoch (UTC).

Для foreign-таблицы с указанным OID загружает из Kafka наиболее ранние смещения с временными метками большими либо равными, чем метка, указанная в параметрах функции. Результат возвращается только для партиций, присутствующих в таблице kadb.offsets для выбранной foreign-таблицы.

Результат функции содержит следующие поля:

  • ftoid — OID foreign-таблицы;

  • prt — идентификатор партиции;

  • off — найденное значение смещения в Kafka.

Да

kadb.offsets_to_timestamp(OID, BIGINT)

  • OID foreign-таблицы.

  • Временная метка — количество миллисекунд после начала UNIX Epoch (UTC).

Для foreign-таблицы с указанным OID изменяет смещения в таблице kadb.offsets на наиболее ранние смещения, найденные в Kafka — с временными метками большими либо равными, чем метка, указанная в параметрах функции. Смещения обновляются только для партиций, присутствующих в таблице kadb.offsets для выбранной foreign-таблицы

Да

kadb.load_offsets_earliest(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID загружает из Kafka наиболее ранние смещения. Результат возвращается только для партиций, присутствующих в таблице kadb.offsets для выбранной foreign-таблицы.

Результат функции содержит следующие поля:

  • ftoid — OID foreign-таблицы;

  • prt — идентификатор партиции;

  • off — найденное значение смещения в Kafka.

Нет

kadb.offsets_to_earliest(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID изменяет смещения в таблице kadb.offsets на наиболее ранние смещения, найденные в Kafka. Смещения обновляются только для партиций, присутствующих в таблице kadb.offsets для выбранной foreign-таблицы

Нет

kadb.load_offsets_latest(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID загружает из Kafka наиболее поздние смещения. Результат возвращается только для партиций, присутствующих в таблице kadb.offsets для выбранной foreign-таблицы.

Результат функции содержит следующие поля:

  • ftoid — OID foreign-таблицы;

  • prt — идентификатор партиции;

  • off — найденное значение смещения в Kafka.

Нет

kadb.offsets_to_latest(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID изменяет смещения в таблице kadb.offsets на наиболее поздние смещения, найденные в Kafka. Смещения обновляются только для партиций, присутствующих в таблице kadb.offsets для выбранной foreign-таблицы.

В результате последующие запросы SELECT к выбранной foreign-таблице будут возвращать сообщения, добавленные в соответствующие партиции Kafka после вызова функции kadb.offsets_to_latest(OID)

Нет

kadb.load_offsets_committed(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID загружает из Kafka наиболее поздние закоммиченные смещения. Результат возвращается только для партиций, присутствующих в таблице kadb.offsets для выбранной foreign-таблицы.

Результат функции содержит следующие поля:

  • ftoid — OID foreign-таблицы;

  • prt — идентификатор партиции;

  • off — найденное значение смещения в Kafka.

Да

kadb.offsets_to_committed(OID)

OID foreign-таблицы

Для foreign-таблицы с указанным OID изменяет смещения в таблице kadb.offsets на наиболее поздние закоммиченные смещения, найденные в Kafka. Смещения обновляются только для партиций, присутствующих в таблице kadb.offsets для выбранной foreign-таблицы

Да

Поддерживаемые форматы

Kafka to ADB Connector поддерживает десериализацию сообщений, созданных с применением следующих форматов:

Метод десериализации необходимо явно указать при помощи опции format.

ПРИМЕЧАНИЕ

Независимо от выбранного формата, десериализуются только значения в составе сообщений Kafka (values). Ключи сообщений (keys) игнорируются.

AVRO OCF

Формат AVRO OCF поддерживается коннектором с рядом ограничений:

  • Схема должна содержать только примитивные типы данных. Комплексные типы не поддерживаются, однако есть два исключения:

    • Объединения (union) одного примитивного типа данных с типом null (за исключением объединения полученных объединений с типом null).

    • Тип fixed, поскольку он обрабатывается аналогично bytes.

  • Логические типы разрешены к использованию.

  • Определение foreign-таблицы в ADB должно соответствовать используемой схеме AVRO. При этом важно, чтобы порядок столбцов в таблице совпадал с порядком полей в схеме.

В таблице ниже приведены возможные варианты преобразования типов данных AVRO в типы PostgreSQL при использовании коннектора Kafka to ADB.

Поддерживаемые варианты маппинга типов данных
Тип данных AVRO Тип данных PostgreSQL

string

TEXT, BPCHAR, VARCHAR

string

Кастомный тип PostgreSQL (например, MONEY). Преобразование происходит так же, как при вводе текста пользователем

null

Любой тип данных PostgreSQL в столбце без ограничения (constraint) NOT NULL

boolean

BOOLEAN

int

INTEGER

long

BIGINT

float

REAL

double

DOUBLE PRECISION

bytes, fixed

BYTEA

decimal

NUMERIC

date

DATE

time-millis, time-micros

TIME

timestamp-millis

TIMESTAMP(<N>), где <N> равно 1, 2 или 3

timestamp-micros

TIMESTAMP; TIMESTAMP(<N>), где <N> >= 4

duration

INTERVAL

Ниже приведены примеры схем AVRO, являющихся валидными для коннектора Kafka to ADB.

Примеры валидных AVRO-схем
{
  "name":"doc",
  "type":"record",
  "fields":[
    {
      "name":"id",
      "type":"int"
    },
    {
      "name":"text",
      "type":[
        "string",
        "null"
      ]
    },
    {
      "name":"issued_on",
      "type":"int",
      "logicalType":"date"
    }
  ]
}
{
  "name":"doc",
  "type":"record",
  "fields":[
    {
      "name":"d",
      "type":"int",
      "logicalType":"date"
    },
    {
      "name":"t_ms",
      "type":"int",
      "logicalType":"time-millis"
    },
    {
      "name":"t_us",
      "type":"long",
      "logicalType":"time-micros"
    },
    {
      "name":"ts_ms",
      "type":"long",
      "logicalType":"timestamp-millis"
    },
    {
      "name":"ts_us",
      "type":"long",
      "logicalType":"timestamp-micros"
    },
    {
      "name":"dur",
      "type":{
        "name":"dur_fixed",
        "type":"fixed",
        "size":12,
        "logicalType":"duration"
      }
    },
    {
      "name":"dec_1",
      "type":{
        "name":"dec_2_fixed",
        "type":"fixed",
        "size":6,
        "logicalType":"decimal"
      }
    },
    {
      "name":"dec_2",
      "type":{
        "name":"dec_2_fixed",
        "type":"bytes",
        "logicalType":"decimal",
        "precision":14,
        "scale":4
      }
    }
  ]
}

CSV

Поддержка формата CSV реализована в коннекторе при помощи библиотеки lbcsv. Таким образом, учитываются все соглашения о формате CSV, устанавливаемые этой библиотекой. Спецификация CSV определена в RFC 4180.

Учитывая упомянутые выше рекомендации, Kafka to ADB Connector использует следующие правила обработки CSV:

  • Поля (атрибуты) CSV-сообщения разделяются специальным символом, определенным с помощью опции csv_delimeter.

  • Строки (записи) CSV-сообщения разделяются символами перехода на новую строку.

  • Поля могут быть заключены в кавычки, определенные с помощью опции csv_quote.

  • Поля, содержащие символы разделителя, кавычки или перехода на новую строку, должны быть заключены в кавычки.

  • Каждый символ кавычек, используемый в полях, должен быть экранирован путем добавления дублирующего символа кавычек перед ним.

  • Пустые поля парсятся как значения NULL.

  • Пустые строки игнорируются.

  • Пробелы в начале и в конце полей, не заключенных в кавычки, удаляются (если установлена опция csv_attribute_trim_whitespace).

Поля из CSV-сообщений могут быть конвертированы в любой тип данных PostgreSQL. Правила конвертации идентичны тем, что используются для текстового ввода в psql.

Text

Текстовый формат используется для десериализации данных, представленных в Kafka-сообщениях в "сыром" (raw) виде. При выборе текстового формата сообщения обрабатываются коннектором следующим образом:

  • Каждое сообщение, прочитанное из Kafka, рассматривается как один столбец одного кортежа (строки) foreign-таблицы.

  • Данные парсятся PostgreSQL как предоставленные пользователем текстовые данные.

  • Сообщения с пустым содержимым (нулевой длины) парсятся как значения NULL.

ВНИМАНИЕ

Чтобы использовать формат text для десериализации сообщений, в foreign-таблице необходимо определить ровно один столбец (с любым типом данных PostgreSQL).

Особенности использования коннектора

Ниже приведены особенности реализации коннектора Kafka to ADB, оказывающие непосредственное влияние на его использование.

Параллельные запросы SELECT и global deadloack detector

Как упоминалось выше, коннектор обращается к таблице kadb.offsets при каждом запросе SELECT к foreign-таблицам. Таблица смещений kadb.offsets имеет тип DISTRIBUTED REPLICATED в базе данных ADB. При этом со стороны коннектора к ней возможны как запросы INSERT, так и UPDATE. Таким образом, ограничения, касающиеся параллельных транзакций, так или иначе затрагивают запросы SELECT к foreign-таблицам.

Способ обработки параллельных (конкурирующих) транзакций в Greenplum определяется текущим состоянием global deadlock detector.

Когда global deadlock detector отключен, каждому запросу UPDATE требуется блокировка с типом ExclusiveLock, что фактически означает блокировку всей обновляемой таблицы. В Kafka to ADB Connector это означает, что параллельные запросы SELECT к разным foreign-таблицам невозможны. Такие запросы SELECT будут выполняться последовательно, по одному.

Чтобы разрешить выполнение множества одновременных запросов SELECT к разным foreign-таблицам, необходимо включить global deadlock detector. Когда детектор блокировок включен, каждому запросу UPDATE требуется блокировка с типом RowExclusiveLock, что делает возможными параллельные запросы UPDATE к таблице kadb.offsets, а значит, и запросы SELECT к foreign-таблицам.

Для включения global deadlock detector, присвойте параметру GUC gp_enable_global_deadlock_detector значение on:

$ gpconfig -c gp_enable_global_deadlock_detector -v on

Одновременные запросы SELECT к одной foreign-таблице невозможны. В некоторых случаях подобные запросы могут завершиться успешно, вернув корректные результаты (одинаковые для всех одновременных запросов); однако это не гарантируется.

Распределение партиций между сегментами

Каждый запрос SELECT к foreign-таблице учитывает только те партиции, что определены в таблице kadb.offsets. Содержимое kadb.offsets может быть изменено до выполнения SELECT — автоматически, если установлена опция k_automatic_offsets, либо вручную через функции.

Партиции распределяются между сегментами кластера ADB для обработки согласно следующим правилам:

  • Распределение происходит равномерно по всем сегментам. Число партиций, назначенное каждому сегменту, не отличается более, чем на единицу, по отношению к другим сегментам.

  • Порядок партиций (возвращенный Kafka) и порядок сегментов, которым они назначаются, совпадает.

Например, в кластере ADB, состоящем из 3 сегментов, партиции Kafka с номерами [0, 2, 3, 4, 1] будут распределены следующим образом.

Сегмент Партиции

1

[0, 2]

2

[3, 4]

3

[1]

Отмена запросов пользователем

Любой запрос, включающий вызов функций librdkafka либо SELECT к foreign-таблице, может быть прерван пользователем до его завершения. Запрос гарантированно завершится менее, чем за одну секунду после того, как все сегменты ADB получат сигнал об отмене.

Однако активные подключения к Kafka не закрываются моментально после отмены запроса. В нормальных условиях подключения будут закрыты в пределах времени, указанного в опции k_latency_ms. Но в ситуациях, когда Kafka не отвечает (например, при сбое в Kerberos-аутентификации), соединение может оставаться активным неограниченный период времени.

Для гарантированного освобождения всех ресурсов необходимо закрыть сессию ADB, в рамках которой был отправлен запрос.

Длительность запросов SELECT

Если запрос SELECT к foreign-таблице не прерывается, его максимальная длительность может быть оценена по следующей формуле:

Опции k_latency_ms и k_timeout_ms оказывают значительное влияние на продолжительность запросов SELECT. Поэтому следует внимательно отнестись к выбору их значений.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней