Flink SQL Gateway

Flink SQL Gateway is a service that allows multiple clients to run Flink SQL queries in parallel. With SQL Gateway, users can submit Flink jobs, look up the metadata, and analyze the data in real time.

SQL Gateway consists of the SqlGatewayService processor that handles SQL queries and pluggable endpoints through which users can submit the queries. As of ADH 3.1.2.b1, the supported endpoint is REST endpoint which assumes interaction with the service via HTTP.

At a high level, the SQL Gateway architecture is illustrated below.

flink sql arch light
Flink SQL Gateway architecture
flink sql arch dark
Flink SQL Gateway architecture

Work with REST endpoint

The interaction flow through the REST endpoint is presented in the following diagram.

flink rest flow light
Interaction via REST endpoint
flink rest flow dark
Interaction via REST endpoint

The key interaction steps are as follows.

  1. Create session. First, a client requests to create a new session. The SQL Gateway creates a new session object and returns the session identifier — sessionHandle that is used for further communication. The session lives for a configurable time period and its lifetime can be manually prolonged.

  2. Submit SQL. When the session is created, the client submits SQL queries to the SQL Gateway server. For each query received, SQL Gateway creates a new Operation entity and returns an operationHandle ID that is used later to fetch query results. The Operation has its lifecycle and can be forcibly cancelled/closed to release the resources.

  3. Fetch results. Using the operationHandle, the client fetches results from the previously created Operation. If the Operation’s computations are ready, SQL Gateway returns a batch of result data and a URI pointing to the next batch. When all results are served, SQL Gateway returns a response with "resultType": "EOS", indicating that all the results have been fetched.

The REST URL to access the SQL Gateway service is available on the Clusters<YOUR_CLUSTER>ServicesFlinkInfo page in ADCM, for example: http://ka-adh-1.ru-central1.internal:8083/v1/info. The complete reference on the SQL Gateway REST API is available in Flink documentation.

Usage example

The example below walks you through basic SQL Gateway operations using the REST endpoint. To run the operations using curl, follow the steps below. Alternatively, you can download and import the Postman collection to run the requests.

  1. Check the SQL Gateway service availability.

    $ curl http://<sql-gateway-host>:8083/v1/info

    This returns basic information about the service.

    {"productName":"Apache Flink","version":"1.16.2"}
  2. Create a new session.

    $ curl -X POST http://<sql-gateway-host>:8083/v1/sessions

    The server returns a sessionHandle that uniquely identifies the created user session.

    {"sessionHandle":"97fa59cf-6aa9-440b-b5cb-191f4167f0fb"}
  3. Using the received sessionHandle, submit the SQL query.

    $ curl -X POST http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/statements/ -d @test_query.json

    In this example, test_query.json stores a sample word count SQL query and looks as follows:

    {
        "statement": "SELECT word, SUM(frequency) AS `count` FROM (   VALUES ('Hello', 1), ('Ciao', 1), ('Hello', 2) ) AS WordTable(word, frequency) GROUP BY word"
    }

    The response contains an operationHandle that uniquely identifies the submitted query.

    {"operationHandle":"b18e38b0-8a9b-4f36-9121-7d9f2eb46b81"}
  4. Fetch the query results.

    $ curl http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/operations/{operationHandle}/result/0

    The response looks as follows.

    {"results":{"columns":[{"name":"word","logicalType":{"type":"VARCHAR","nullable":false,"length":5},"comment":null},{"name":"count","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[{"kind":"INSERT","fields":["Hello",1]},{"kind":"INSERT","fields":["Ciao",1]},{"kind":"UPDATE_BEFORE","fields":["Hello",1]},{"kind":"UPDATE_AFTER","fields":["Hello",3]}]},"resultType":"PAYLOAD","nextResultUri":"/v1/sessions/b9ea29d4-e373-463f-bd9a-a943d853b093/operations/c5df0878-e102-4e02-a37d-3293aa53c038/result/1"}

    Notice "resultType":"PAYLOAD" and "nextResultUri":"<nextURI>" — the latter field carries a URI to get the next batch with results. If calling <nextURI> returns a response that contains "resultType": "EOS", this indicates that all the results have been served for the given query.

  5. To prolong the session lifetime, submit a heartbeat so the session doesn’t get destroyed after the timeout.

    $ curl -X POST http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/heartbeat

    The 200 OK response indicates that the session lifetime has been extended.

Apart from the requests above there are other useful actions like get status/cancel/stop operation, etc. For more details on these, see Flink documentation.

Found a mistake? Seleсt text and press Ctrl+Enter to report it