*********************** Kafka to ADB (kadb-fdw) *********************** **kadb-fdw** - это расширение для ADB/GPDB, позволяющее производить транзакционную загрузку данных из кластера брокера сообщений Kafka. **Особенности**: + AVRO десереализация; + Хранение смещений Kafka вне кластера Kafka, на стороне потребителя; + Поддержка транзакций ADB/GPDB; + Поддержка Kerberos-аутентификации. Совместимость ======================== Коннектор совместим с Apache Kafka версий 1.0.0 и выше Установка с помощью ADCM ======================== Для установки коннектора с помощью **ADCM** требуется инсталлировать сервис *PXF* на все сегментные ноды кластера, а также в списке сервисов выбрать сервис *Kafka to ADB*. При этом необходимые пакеты и файлы автоматически устанавливаются на машины кластера. .. important:: Процесс установки включает в себя выполнение следующей команды: **RESET client_min_messages** SQL-интерфейс ============= Пример ------ .. code-block:: sql -- Create a SERVER DROP SERVER IF EXISTS ka_server; CREATE SERVER ka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'localhost:9092' ); -- Create a FOREIGN TABLE DROP FOREIGN TABLE IF EXISTS ka_table; CREATE FOREIGN TABLE ka_table(field1 INT, field2 TEXT) SERVER ka_server OPTIONS ( format 'avro', -- Data serialization format k_topic 'my_topic', -- Kafka topic k_consumer_group 'my_consumer_group', -- Kafka consumer group k_seg_batch '100', -- Limit on the number of Kafka messages retrieved by each GPDB segment k_timeout_ms '1000', -- Kafka response timeout k_initial_offset '42' -- Initial Kafka offset (for new or unknown partitions) ); -- Issue a SELECT query as usual SELECT * FROM ka_table; .. _offset_table: **kadb_fdw** предоставляет пользователю следующие интерфейсы взаимодействия через SQL: * FOREIGN TABLE OPTIONS. См. CREATE FOREIGN TABLE, ALTER FOREIGN TABLE документацию. Дополнительные опции **kadb_fdw** представлены в разделе "Опции внешних таблиц" * Таблица смещений * Набор kadb функций Таблица смещений ---------------- В ходе установки расширения создается служебная схема ``kadb``, содержащая таблицу ``kadb.offsets``. В данную таблицу помещаются пары соответствий партиция-смещение для любой когда-либо создававшейся в текущей базе внешней таблицы (FOREIGN TABLE). Таблицы идентифицируются по значению `OID `_, который можно узнать, выполнив следующую команду: .. code-block:: sql SELECT 'schema.table'::regclass::oid; При выполнении SELECT-запроса к внешней таблице чтение сообщений из Kafka производится, начиная со смещения, указанного для данной внешней таблицы в таблице ``kadb.offsets``. Смещения можно изменять при помощи обычного SQL-запроса к таблице ``kadb.offsets``. Для новых партиций, записей о которых нет в таблице ``kadb.offsets``, начальное смещение по умолчанию устанавливается равным 0 (значение может быть изменено в параметре :ref:`k_initial_offset`). После успешного выполнения SELECT-запроса к внешней таблице смещение обновляется в соответствии со значениями, полученными от Kafka. Например, если последнее прочитанное сообщение из некоторой партиции имело смещение 84, значение смещения для данной партиции в таблице ``kadb.offsets`` будет равным 85. Опции внешних таблиц -------------------- Для сервера (`SERVER `_) и внешней таблицы (`FOREIGN TABLE `_) возможно указание опций в виде пары ключ-значение. Опции, определенные для сервера и внешней таблицы, не отличаются друг от друга. Иными словами, не имеет значения, для какого объекта они были указаны. Однако опции, определенные для внешней таблицы, более приоритетны, чем опции сервера (в случае, если для обоих объектов была указана одинаковая опция). **Поддерживаемые опции:** librdkafka """"""""" Параметры для librdkafka можно установить через FOREIGN TABLE OPTIONS. TДля того чтоб установить конфигурационный параметр для librdkafka, добавь OPTION с добавлением к имени знака решетка (#). Пример создания SERVER: .. code-block:: sql CREATE SERVER my_kafka FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( "#bootstrap.servers" 'localhost:9092', "#client.id" 'My GPDB instance' ); Важно чтоб OPTION имена, содержащие знаки решетка (#) или точка (.) были обрамлены в двойные кавычки ("). Кофигурационные параметры для librdkafka имеют приоритет над их алиасами (см. ниже). Нижеперечисленные параметры для librdkafka запрещено устанавливать: * enable.auto.commit * enable.partition.eof * plugin.library.paths * interceptors * все параметры заканчивающиеся на ``_cb`` k_brokers """"""""" | *Обязательна.* Список брокеров Kafka, разделенный запятыми, где каждый элемент — это строка вида или :. k_topic """"""""" | *Обязательна.* Идентификатор топика Kafka. k_consumer_group """""""""""""""" | *Обязательна.* Идентификатор группы потребителей Kafka. .. _k_initial_offset: k_seg_batch """"""""""" | *Обязательна.* | *Положительное целое число.* Максимальное количество сообщений Kafka, запрашиваемое каждым сегментом кластера ADB/GPDB. При выполнении запроса с условием LIMIT сообщения в Kafka продолжают запрашиваться батчами. В результате смещения в :ref:`таблице смещений` устанавливается на основании *последнего* полученного сообщения для каждой партиции, даже если данные не содержались в результатах запроса. При достижении конца партиции Kafka отправляет специальное сообщение. Таким образом, если значение ``k_seg_batch`` меньше, чем число партиций, определенное для одного сегмента ADB/GPDB и ``k_seg_batch`` этих партиций было полностью прочитано, может возникнуть ситуация, при которой не будет прочитано никаких полезных данных. По этой причине опция ``k_seg_batch`` *должна быть больше, чем максимальное число партиций определенных для одного сегмента*. k_timeout_ms """""""""""" | *Обязательна.* | *Неотрицательное целое число.* Время ожидания выполнения запроса к Kafka в миллисекундах. За это время Kafka-клиентом будут извлечены и представлены в качестве результата SELECT-запроса только сообщения, доступные в Kafka в этот период времени. SELECT-запрос может выполняться быстрее, если в топике Kafka достаточно сообщений. Реальное время выполнения SELECT-запроса может быть значительно больше. Для оценки максимального значения используйте следующее выражение: .. code-block:: [duration] = [k_timeout_ms] * (2 + ceil( [number of partitions] / [number of segments in cluster] )) Более всего на длительность запроса влияют партиции Kafka, в которых недостаточно сообщений: коннектор ожидает появления в этих партициях новых сообщений в течение `k_timeout_ms` миллесекунд. На некоторых этапах выполнения SELECT-запроса его принудительная отмена может быть невозможна до окончания периода k_timeout_ms. format """""" | *Обязательна.* | *Одно из предопределенных значений (без учета регистра).* Формат сереализованных данных: + avro + csv + text k_latency_ms """""" | *Неотрицательное целое число. По уиолчанию 2000.* Таймаут запросов метаданных к Kafka. Значение этого параметра следует установить в максимально ожидаемое время отклика (по всем сегментам ADB) любого запроса к метаданным к Kafka. Самый длительный запрос к метаданным произодится от ``kadb.offsets_to_committed(OID)`` - эта функция может быть использована, чтоб определить разумное значение пераметра. Если таймаут превышен, запрос к метаданным упадет с ошибкой. Несколько запросов к метаданным происходят, когда выполняется SELECT; таким образом, падение может произойти, если параметр установлен в сравнительно низкое значение. k_initial_offset """""""""""""""" | *Неотрицательное целое число. Значение по умолчанию: 0.* Смещение, которое следует использовать для партиций, записей о которых нет в :ref:`таблице смещений`. k_automatic_offsets """""""""""""""" | *Булевое значение (true, false). По умолчанию true.* Позволяет **kadb_fdw** выполнять следующее: * Немедленно перед каждым SELECT из FOREIGN TABLE: * Добавить партицию, которая присутствует в Kafka, но отсутствует в таблице смещений, к набору партиций, откуда вычитывать данные; * Автоматически увеличивать стартовое смещение любой партиции к самомоу раннему смещению, доступному в Kafka. Если такое увеличение происходит, то формируется NOTICE. * Немедленно после каждого SELECT из FOREIGN TABLE: * Обновить таблицу смещений, добавляя партиции (с помощью INSERT запроса), которые присутствуют в Kafka, и отсутствуют в таблице смещений. Если выставлен в ``false``, вызывается ERROR, когда наименьшее смещение сообщения, присутствующего в Kafka больше чем смещение в таблице смещений (для любой партиции). Важно: присутствие партиций в Kafka, и их отсутствие в таблице смещений не видно пользователямесли используется CURSOR: INSERT новых записей в таблицу смещений происходит после SELECT, тогда как CURSOR постоянно в прогрессе. После успешного SELECT, смещения в таблице смещений изменяются независимо от значения этого параметра, отражая количество сообщений, прочитанных из Kafka. k_security_protocol """"""""""""""""""" | *Обязательна, если используется Kerberos-аутентификация.* Протокол безопасности, который необходимо использовать для подключения к Kafka. В данный момент доступен только протокол sasl_plaintext. .. _format: avro_schema """"""""""" AVRO-схема, которую необходимо использовать. Десериализованная AVRO схема представляет из себя JSON. Получаемые сообщения десериализуются двумя способами: + Если присутствует опция ``avro_schema``, используется указанная схема (сообщение так же должно быть представлено в `OCF `_ формате) + В противном случае, схема извлекается из полученного сообщения в `OCF `_ формате. .. important:: Пользовательская схема не может быть валидирована. Если реальная схема не соответствует указанной, десериализация завершается с ошибкой `ERROR: invalid memory alloc request size`. По этой причине опция ``avro_schema`` должна использоваться только в целях повышения производительности и только после тщательного изучения. .. _csv_quote: csv_quote """"""""" | *Одиночный символ, представимый одним байтом в текущей кодировке.* | *Значение по умолчанию: ``"``.* Символ, используемый в качестве кавычек при парсинге CSV. .. _csv_delimeter: csv_delimeter """"""""""""" | *Одиночный символ, представимый одним байтом в текущей кодировке.* | *Значение по умолчанию: ``,``.* Символ, используемый в качестве разделителя полей при парсинге CSV. csv_null """""""" Строка, используемая в качестве значения ``NULL`` в CSV. *По умолчанию пустая строка интерпретируется как NULL*. csv_ignore_header """"""""""""""""" | *Булево значение.* | *Значение по умолчанию: false.* Игнорировать первую строку каждого сообщения при парсинге. .. _csv_attribute_trim_whitespace: csv_attribute_trim_whitespace """"""""""""""""""""""""""""" | *Булево значение.* | *Значение по умолчанию: true.* Удалять пробелы в начале и конце строки для каждого атрибута (поля) записи. Набор kadb функций ------------------ Несколько функций предоставляются ``kadb_fdw`` для синхронизации смещений в Kafka с находящимися в таблицах смещений. Все функции находятся в схеме ``kadb`` Важно: * Функции не предоставляют транзакционную гарантию для Kafka. Это значит, что невозможно делать предположения об изменениях смещения в Kafka до или после того, как функция будет применена, даже если она объединена с ``SELECT`` (из ``FOREIGN TABLE``) в единой SQL транзакции; * Некоторые функции **не атомарны**. Это значит, что они не создают снапшота всех партиций в определенный момент времени; вместо этого их результат получается из каждой партиции независимо, в (слегка) разные моменты времени. kadb.commit_offsets(OID) """""""""""""""""""""""" *Параметры:* * OID внешней таблицы Применяет смещение из таблицы смещений (для указанного ``FOREIGN TABLE`` OID) к Kafka. Этот метод **атомарен**. kadb.load_partitions(OID) """"""""""""""""""""""""" *Параметры:* * OID внешней таблицы *Результат:* * ``floid``: такой же как указанный OID внешней таблицы * ``prt``: идентификатор партиции * ``off``: k_initial_offset Загружает список партиций, существующих в Kafka. Этот метод **не атомарен**. kadb.partitions_obtain(OID) """"""""""""""""""""""""""" *Параметры:* * OID внешней таблицы Добавляет партиции, возвращенные командой ``kadb.load_parittions(OID)`` к таблице смещений. Добавляются только новые партиции; существующие остаются без изменений. Этот метод **не атомарен**. kadb.partitions_clean(OID) """"""""""""""""""""""""""""" *Параметры:* * OID внешней таблицы Удаляет все записи из таблицы смещений (для указанного OID внешней таблицы), которые *отсутствуют* в Kafka. Этот метод **не атомарен**. kadb.partitions_reset(OID) """"""""""""""""""""""""""""" *Параметры:* * OID внешней таблицы Удаляет все записи из таблицы смещений (для указанного OID внешней таблицы), и добавляет записи, возвращенные командой ``kadb.load_parittions(OID)``. Этот метод **не атомарен**. kadb.load_offsets_at_timestamp(OID, BIGINT) """"""""""""""""""""""""""""" *Параметры:* * OID внешней таблицы * Временная метка: прошедшие миллисекунды с UNIX Epoch (UTC) *Результат:* * ``floid``: такой же как указанный OID внешней таблицы * ``ort``: идентификатор партиции * ``off``: результат Загружает самые ранние смещения, существующие в Kafka, чьи временные метки больше или равны указанной в команде (для указанного OID внешней таблицы, и только для партиций уже существующих в таблице смещений). Этот метод **атомарен**. kadb.offsets_to_timestamp(OID, BIGINT) """"""""""""""""""""""""""""" *Параметры:* * OID внешней таблицы * Временная метка: прошедшие миллисекунды с UNIX Epoch (UTC) Изменяет смещения в таблице смещений (для указанного OID внешней таблицы, и только для партиций уже существующих в таблице смещений) на самые ранние смещения, существующие в Kafka, чьи временные метки больше или равны указанной в команде. Этот метод **атомарен**. kadb.load_offsets_earliest(OID) """"""""""""""""""""""""""""" *Параметры:* * OID внешней таблицы *Результат:* * ``floid``: такой же как указанный OID внешней таблицы * ``ort``: идентификатор партиции * ``off``: результат Загружает самые ранние смещения, существующие в Kafka (для указанного OID внешней таблицы, и только для партиций уже существующих в таблице смещений). Этот метод **не атомарен**. kadb.offsets_to_earliest(OID) """"""""""""""""""""""""""""" *Параметры:* * OID внешней таблицы Меняет смещения в таблице смещений (для указанного OID внешней таблицы, и только для партиций уже существующих в таблице смещений) на самые ранние смещения, существующие в Kafka. Этот метод **не атомарен**. kadb.load_offsets_latest(OID) """"""""""""""""""""""""""""" *Параметры:* * OID внешней таблицы *Результат:* * ``floid``: такой же как указанный OID внешней таблицы * ``ort``: идентификатор партиции * ``off``: результат Загружает самые поздние смещения, существующие в Kafka (для указанного OID внешней таблицы, и только для партиций уже существующих в таблице смещений). Этот метод **не атомарен**. kadb.offsets_to_latest(OID) """"""""""""""""""""""""""""" *Параметры:* * OID внешней таблицы * Чтобы получить OID из названия таблицы можно использовать ``'table_schema.table_name'::regclass::oid`` Меняет смещения в таблице смещений (для указанного OID внешней таблицы, и только для партиций уже существующих в таблице смещений) на самые поздние смещения, существующие в Kafka. В результате, команды ``SELECT`` из указанной внешней таблицы возвращают только сообщения, введенные в Kafka после использования этой команды. Этот метод **не атомарен**. kadb.load_offsets_commited(OID) """"""""""""""""""""""""""""" *Параметры:* * OID внешней таблицы *Результат:* * ``floid``: такой же как указанный OID внешней таблицы * ``ort``: идентификатор партиции * ``off``: результат Загружает самые поздние примененные смещения, существующие в Kafka (для указанного OID внешней таблицы, и только для партиций уже существующих в таблице смещений). Этот метод **атомарен**. kadb.offsets_to_commited(OID) """"""""""""""""""""""""""""" *Параметры:* * OID внешней таблицы Меняет смещения в таблице смещений (для указанного OID внешней таблицы, и только для партиций уже существующих в таблице смещений) на самые поздние примененные смещения, существующие в Kafka. Этот метод **не атомарен**. Десериализация -------------- В настоящее время расширение *kafka-fdw* поддерживает сообщения Kafka, сериализованные при помощи одного из следующих форматов: + AVRO `OCF `_ + CSV + text Метод десериалиализации должен быть указан в опции :ref:`format`. Вне зависимости от используемого метода десериализуется только сообщение Kafka, ключ сообщения игнорируется. AVRO """" `kadb-fdw` предоставляет поддержку формата сериализации AVRO OCF с некоторыми ограничениями. Схема должна содержать только `примитивные `_ типы данных или объединения одного примитивного типа данных с типом ``NULL``. Также поддерживается тип ``fixed``, поскольку он расценивается как ``bytes``. Поддерживаются все логические типы данных, указанные в спецификации AVRO. Определение внешней таблицы ADB/GPDB должно совпадать с настоящей схемой AVRO. Поддерживаются следующие типы маппинга: +------------------------+------------------------------------------------------------------------------------------------------------------------+ | AVRO тип | PostgreSQL тип | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``string`` | ``TEXT``, ``BPCHAR``, ``VARCHAR`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``string`` | Пользовательский тип PostgreSQL (напр. ``MONEY``). Преобразование такое же, как для пользовательских текстовых вводов. | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``null`` | Любой тип PostgeSQL в столбце с ненулевыми рамками. | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``int`` | ``INTEGER`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``long`` | ``BIGINT`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``float`` | ``REAL`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``double`` | ``DOUBLE PRECISION`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``bytes``, ``fixed`` | ``BOOLEAN`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``decimal`` | ``BYTEA`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``date`` | ``NUMERIC`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``time-millis``, | ``DATE`` | | ``time-micros`` | | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``timestamp-millis`` | ``TIME`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``timestamp-micros`` | ``TIMESTAMP(N)``, где ``N`` это ``1``, ``2``, или ``3`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``boolean`` | ``TIMESTAMP``, ``TIMESTAMP(N)``, где ``N`` это ``4`` или более | +------------------------+------------------------------------------------------------------------------------------------------------------------+ | ``duration`` | ``INTERVAL`` | +------------------------+------------------------------------------------------------------------------------------------------------------------+ Также **порядок** столбцов должен совпадать с порядком полей в схеме AVRO. **Пример схемы:** .. code-block:: JSON { "name": "doc", "type": "record", "fields": [ { "name": "id", "type": "int" }, { "name": "text", "type": ["string", "null"] }, { "name": "issued_on", "type": "int", }, ] } .. code-block:: JSON { "name":"doc", "type":"record", "fields":[ { "name":"d", "type":"int", "logicalType":"date" }, { "name":"t_ms", "type":"int", "logicalType":"time-millis" }, { "name":"t_us", "type":"long", "logicalType":"time-micros" }, { "name":"ts_ms", "type":"long", "logicalType":"timestamp-millis" }, { "name":"ts_us", "type":"long", "logicalType":"timestamp-micros" }, { "name":"dur", "type":{ "name":"dur_fixed", "type":"fixed", "size":12, "logicalType":"duration" } }, { "name":"dec_1", "type":{ "name":"dec_2_fixed", "type":"fixed", "size":6, "logicalType":"decimal" } }, { "name":"dec_2", "type":{ "name":"dec_2_fixed", "type":"bytes", "logicalType":"decimal", "precision":14, "scale":4 } } ] } CSV """" Поддержка CSV реализована при помощи `libcsv `_. Таким образом, учитываются все соглашения о формате CSV, устанавливаемые этой библиотекой. Перечень соглашений представлен в `данном документе `_. Спецификация CSV определена в `RFC 4180 `_. Принимая во внимание эти рекомендации, `kadb-fdw` использует следующие правила парсинга CSV: + Поля (атрибуты) разделяются специальным символом (:ref:`разделителем`); + Строки (записи) разделяются символом новой строки; + Поле может быть заключено в специальные символы (:ref:`кавычки`); + Поля, содержащие символ разделителя, кавычки или новой строки, должны быть заключены в кавычки; + Каждый символ :ref:`кавычки` должен быть экранирован путем его дублирования (добавления идентичного символа перед ним); + Пустое поле всегда интерпретируется как значение ``NULL``; + Пустые строки игнорируются; + Пробелы в начале и конце поля, не заключенного в кавычки, удаляются, если соответствующая :ref:`опция` выставлена в значение true. Значения могут быть конвертированы в любой тип данных PostgreSQL. Используемые правила конвертации аналогичны правилам, применяемым для входных данных `psql`. text """" text - это сериализованный формат для представления данных в сыром виде в Kafka сообщении. Когда испоьзуется этот формат, `kadb_fdw` действует следующим образом: + Каждое сообщение предполагает представление одного атрибута (колонки) одной строки из ``FOREIGN TABLE`` + Данные парсятся PostgreSQL как текстовые в сводобной форме (user-provided textual data). + Такой формат требует, чтобы в ``FOREIGN TABLE`` содержался ровно один атрибут (колонку), которая может быть любого типа PostgreSQL. Kafka сообщения с пустым содержимым (или нулевой длины) парсятся как ``NULL`` значения, и также учитываются. Примеры использования ===================== Таблица для данных в формате AVRO --------------------------------- .. code-block:: sql DROP SERVER IF EXISTS ka_server CASCADE; CREATE SERVER ka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'localhost:9092' ); CREATE FOREIGN TABLE ka_table( i INT, t TEXT ) SERVER ka_server OPTIONS ( format 'avro', k_topic 'my_topic', k_consumer_group 'my_consumer_group', k_seg_batch '5', k_timeout_ms '1000' ); Таблица для данных в формате CSV -------------------------------- .. code-block:: sql DROP SERVER IF EXISTS ka_kerberized_server CASCADE; CREATE SERVER ka_kerberized_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'ke-kafka-sasl.ru-central1.internal:9092', "#security.protocol" 'sasl_plaintext', "#sasl.kerberos.keytab" '/root/adbkafka.service.keytab', "#sasl.kerberos.principal" 'adbkafka' ); CREATE FOREIGN TABLE ka_table( i INT, t TEXT ) SERVER ka_server OPTIONS ( format 'avro', k_topic 'my_topic', k_consumer_group 'my_consumer_group', k_seg_batch '5', k_timeout_ms '1000' ); Таблица с Kerberos-аутентификацией ---------------------------------- .. code-block:: sql DROP SERVER IF EXISTS ka_kerberized_server CASCADE; CREATE SERVER ka_kerberized_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'ke-kafka-sasl.ru-central1.internal:9092', "#security.protocol" 'sasl_plaintext', "#sasl.kerberos.keytab" '/root/adbkafka.service.keytab', "#sasl.kerberos.principal" 'adbkafka' ); CREATE FOREIGN TABLE ka_table( i INT, t TEXT ) SERVER ka_server OPTIONS ( format 'avro', k_topic 'my_topic', k_consumer_group 'my_consumer_group', k_seg_batch '5', k_timeout_ms '1000' ); Замечания по применению ======================= Данный раздел содержит заметки по применению ``kadb_fdw``. Он предназначен, чтобы перечислить характерные поведения и ожидаемые результаты. Одновременные ``SELECT`` ------------------------ ``kadb_fdw`` использует таблицу смещения при каждом запросе ``SELECT`` из внешней таблицы. Это одиночная (``DISTRIBUTED REPLICATED``) таблица ADB/GPDB. ``kadb_fdw`` может послать запросы ``INSERT`` и ``UPDATE`` таблице смещений. Таким образом, пределы одновременных операций влияют на ``SELECT`` из внешних таблиц ``kadb_fdw``. Обнаружение блокировок """""""""""""""""""""" Одновременные транзакции проходящие в GPDB обрабатываются детектором блокировок. Когда детектор блокировок **отключен**, каждый ``UPDATE`` требует ``ExclusiveLock``, который, по сути, блокирует всю таблицу для операции обновления. Для ``kadb_fdw`` это означает, что множественные одновременные ``SELECT``(из *разных* внешних таблиц) невозможны. Такие операции выполняются последовательно. Чтобы разрешить множественные одновременные ``SELECT``(из *разных* внешних таблиц), **включите** детектор блокировок. При работающем детекторе каждый ``SELECT`` требует только ``RowExclusiveLock``, таким образом разрешая одновременные запросы ``UPDATE`` к внешним таблицам. Одновременные ``SELECT`` из единой внешней таблицы невозможны. В некоторых обстоятельствах такие операции могут выполниться корректно, но это не гарантированно. Чтобы включить детектор блокировок, установите параметр конфигурации GPDB ``gp_enable_global_deadlock_detector`` как ``on``: .. code-block:: sql gpconfig -c gp_enable_global_deadlock_detector -v on Распределение партиций """""""""""""""""""""" Каждый ``SELECT`` рассматривает только партиции существующие в таблице смещений. Ее содержание можно изменить до операции ``SELECT`` если ``k_automatic_offsets`` настроен, либо при помощи другой функции. Партиции распределены между *сегментами GPDB* по следующим правилам: 1. Партиции распределены в равных пропорциях между всеми сегментами. Реальное количество партиций привязанных к сегменту различается не больше чем на 1 по всем сегментам. 2. Порядок партиций (возвращенный Kafka) и порядок привязанных к ним сегментов совпадают. Пример: * Кластер состоит из трех сегментов * Kafka возвращает пять партиций: ``[0, 2, 3, 4, 1]`` Партиции распределяются по сегментам следующим образом: 1. ``[0, 2]`` 2. ``[3, 4]`` 3. ``[1]`` Пользовательская отмена запроса """"""""""""""""""""""""""""""" Любой запрос, включающий функции ``librdkafka`` или внешние таблицы может быть отменен пользователем до завершения. Запрос гарантированно будет завершен меньше, чем за одну секунду после того, как все сегменты GPDB получат сигнал об отмене запроса. Заметьте, что активные подключения к Kafka не завершаются мгновенно после отмены запроса. В нормальных условиях подключения будут завершены в пределах ``k_latency_ms``. Но, когда Kafka не отвечает (например, не прошла аутентификация Kerberos), подключения остаются активными бесконечно. Чтобы гарантировать освобождение всех ресурсов, закройте сессию GPDB откуда поступил запрос. Продолжительность ``SELECT`` """""""""""""""""""""""""""" Когда запрос ``SELECT`` не остановлен, его максимальная длительность приблизительно: .. code-block:: sql [duration] = [k_latency_ms] * (1 + ceil([number of partitions] / [number of GPDB segments])) + [k_timeout_ms] ``k_timeout_ms`` и ``k_latency_ms`` значительно влияют на продолжительность ``SELECT`` и потому должны иметь правильные значения (особенно ``k_timeout_ms``).