Flink SQL Gateway
Flink SQL Gateway is a service that allows multiple clients to run Flink SQL queries in parallel. With SQL Gateway, users can submit Flink jobs, look up the metadata, and analyze the data in real time.
SQL Gateway consists of the SqlGatewayService processor that handles SQL queries and pluggable endpoints through which users can submit the queries. As of ADH 3.1.2.b1, the supported endpoint is REST endpoint which assumes interaction with the service via HTTP.
At a high level, the SQL Gateway architecture is illustrated below.
Work with REST endpoint
The interaction flow through the REST endpoint is presented in the following diagram.
The key interaction steps are as follows.
-
Create session. First, a client requests to create a new session. The SQL Gateway creates a new session object and returns the session identifier —
sessionHandle
that is used for further communication. The session lives for a configurable time period and its lifetime can be manually prolonged. -
Submit SQL. When the session is created, the client submits SQL queries to the SQL Gateway server. For each query received, SQL Gateway creates a new Operation entity and returns an
operationHandle
ID that is used later to fetch query results. The Operation has its lifecycle and can be forcibly cancelled/closed to release the resources. -
Fetch results. Using the
operationHandle
, the client fetches results from the previously created Operation. If the Operation’s computations are ready, SQL Gateway returns a batch of result data and a URI pointing to the next batch. When all results are served, SQL Gateway returns a response with"resultType": "EOS"
, indicating that all the results have been fetched.
The REST URL to access the SQL Gateway service is available on the Clusters → <YOUR_CLUSTER> → Services → Flink → Info page in ADCM, for example: http://ka-adh-1.ru-central1.internal:8083/v1/info. The complete reference on the SQL Gateway REST API is available in Flink documentation.
Usage example
The example below walks you through basic SQL Gateway operations using the REST endpoint. To run the operations using curl, follow the steps below. Alternatively, you can download and import the Postman collection to run the requests.
-
Check the SQL Gateway service availability.
$ curl http://<sql-gateway-host>:8083/v1/info
This returns basic information about the service.
{"productName":"Apache Flink","version":"1.16.2"}
-
Create a new session.
$ curl -X POST http://<sql-gateway-host>:8083/v1/sessions
The server returns a
sessionHandle
that uniquely identifies the created user session.{"sessionHandle":"97fa59cf-6aa9-440b-b5cb-191f4167f0fb"}
-
Using the received
sessionHandle
, submit the SQL query.$ curl -X POST http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/statements/ -d @test_query.json
In this example, test_query.json stores a sample word count SQL query and looks as follows:
{ "statement": "SELECT word, SUM(frequency) AS `count` FROM ( VALUES ('Hello', 1), ('Ciao', 1), ('Hello', 2) ) AS WordTable(word, frequency) GROUP BY word" }
The response contains an
operationHandle
that uniquely identifies the submitted query.{"operationHandle":"b18e38b0-8a9b-4f36-9121-7d9f2eb46b81"}
-
Fetch the query results.
$ curl http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/operations/{operationHandle}/result/0
The response looks as follows.
{"results":{"columns":[{"name":"word","logicalType":{"type":"VARCHAR","nullable":false,"length":5},"comment":null},{"name":"count","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[{"kind":"INSERT","fields":["Hello",1]},{"kind":"INSERT","fields":["Ciao",1]},{"kind":"UPDATE_BEFORE","fields":["Hello",1]},{"kind":"UPDATE_AFTER","fields":["Hello",3]}]},"resultType":"PAYLOAD","nextResultUri":"/v1/sessions/b9ea29d4-e373-463f-bd9a-a943d853b093/operations/c5df0878-e102-4e02-a37d-3293aa53c038/result/1"}
Notice
"resultType":"PAYLOAD"
and"nextResultUri":"<nextURI>"
— the latter field carries a URI to get the next batch with results. If calling<nextURI>
returns a response that contains"resultType": "EOS"
, this indicates that all the results have been served for the given query. -
To prolong the session lifetime, submit a heartbeat so the session doesn’t get destroyed after the timeout.
$ curl -X POST http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/heartbeat
The
200 OK
response indicates that the session lifetime has been extended.
Apart from the requests above there are other useful actions like get status/cancel/stop operation, etc. For more details on these, see Flink documentation.