Flink SQL Gateway

Flink SQL Gateway — это сервис, позволяющий нескольким клиентам параллельно выполнять запросы Flink SQL. Используя SQL Gateway, можно запускать задачи Flink, выполнять поиск по метаданным и анализировать данные в реальном времени.

SQL SQL Gateway состоит из следующих компонентов:

  • SqlGatewayService, основной задачей которого является обработка SQL-запросов.

  • Подключаемые endpoints, через которые пользователи могут отправлять запросы на сервер.

На момент релиза ADH 3.1.2.b1, основной поддерживаемый endpoint —  REST endpoint, подразумевающий взаимодействие с сервисом по HTTP.

Ниже представлена высокоуровневая архитектура Flink SQL Gateway.

flink sql arch light
Архитектура Flink SQL Gateway
flink sql arch dark
Архитектура Flink SQL Gateway

Работа с REST endpoint

Схема взаимодействия с использованием REST endpoint представлена ниже.

flink rest flow light
Взаимодействие через REST endpoint
flink rest flow dark
Взаимодействие через REST endpoint

Основные этапы схемы:

  1. Создание сессии. Клиент отправляет POST-запрос на создание сессии. SQL Gateway создает новую сессию и возвращает ее идентификатор — sessionHandle, который нужен для дальнейшего общения с сервером. Сессия существует в течение настраиваемого периода времени, а срок ее жизни может быть продлен вручную.

  2. Отправка запроса. После создания сессии клиент отправляет SQL-запрос на сервер SQL Gateway. Для каждого запроса SQL Gateway создает новую сущность Operation и возвращает ее идентификатор (operationHandle), необходимый в будущем для получения результатов запроса. Экземпляр Operation может быть принудительно отменен/закрыт для освобождения ресурсов.

  3. Получение результатов. Используя operationHandle, клиент получает результаты из ранее созданного экземпляра Operation. Если вычисления по запросу готовы, SQL Gateway возвращает пакет результатов и URI, указывающий на следующий пакет. Если SQL Gateway вернул все результаты для данного запроса, возвращается специальный ответ с полем "resultType": "EOS".

REST URL сервиса SQL Gateway доступен на странице Clusters<YOUR_CLUSTER>ServicesFlinkInfo в ADCM, например: http://ka-adh-1.ru-central1.internal:8083/v1/info. Полная справочная информация о SQL Gateway REST API доступна в документации Flink.

Пример использования

Ниже представлен пример взаимодействия с SQL Gateway, демонстрирующий основные операции с использованием REST endpoint. Вы можете выполнить следующие шаги, используя curl, или можете скачать Postman-коллекцию с готовыми запросами.

  1. Проверьте доступность сервиса SQL Gateway.

    $ curl http://<sql-gateway-host>:8083/v1/info

    В ответе содержится базовая информация о сервисе.

    {"productName":"Apache Flink","version":"1.16.2"}
  2. Создайте новую сессию.

    $ curl -X POST http://<sql-gateway-host>:8083/v1/sessions

    Сервер возвращает sessionHandle, который уникально идентифицирует созданную сессию.

    {"sessionHandle":"97fa59cf-6aa9-440b-b5cb-191f4167f0fb"}
  3. Используя полученный sessionHandle, отправьте SQL-запрос.

    $ curl -X POST http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/statements/ -d @test_query.json

    В этом примере файл test_query.json содержит тестовый SQL-запрос, выполняющий подсчет количества слов, и выглядит следующим образом:

    {
        "statement": "SELECT word, SUM(frequency) AS `count` FROM (   VALUES ('Hello', 1), ('Ciao', 1), ('Hello', 2) ) AS WordTable(word, frequency) GROUP BY word"
    }

    В ответе содержится operationHandle, который идентифицирует отправленный запрос.

    {"operationHandle":"b18e38b0-8a9b-4f36-9121-7d9f2eb46b81"}
  4. Получите результаты запроса.

    $ curl http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/operations/{operationHandle}/result/0

    Ответ от сервера имеет следующий вид.

    {"results":{"columns":[{"name":"word","logicalType":{"type":"VARCHAR","nullable":false,"length":5},"comment":null},{"name":"count","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[{"kind":"INSERT","fields":["Hello",1]},{"kind":"INSERT","fields":["Ciao",1]},{"kind":"UPDATE_BEFORE","fields":["Hello",1]},{"kind":"UPDATE_AFTER","fields":["Hello",3]}]},"resultType":"PAYLOAD","nextResultUri":"/v1/sessions/b9ea29d4-e373-463f-bd9a-a943d853b093/operations/c5df0878-e102-4e02-a37d-3293aa53c038/result/1"}

    Обратите внимание на поля "resultType":"PAYLOAD" и "nextResultUri":"<nextURI>". В последнем содержится URI, указывающий на следующий пакет с данными. Если при обращении к <nextURI> ответ содержит "resultType": "EOS", это означает, что SQL Gateway вернул все результаты для данного запроса.

  5. Чтобы продлить срок действия сессии, отправьте heartbeat-запрос.

    $ curl -X POST http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/heartbeat

    Ответ 200 OK указывает, что таймер жизни сессии был сброшен.

Кроме вышеупомянутых запросов существуют и другие полезные операции, например, отмена/остановка/проверка статуса операции и так далее. Подробная информация о REST API доступна в документации Flink.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней