OpenAPI connector usage example
This article describes a step-by-step scenario that shows how to use the Trino OpenAPI connector. The high-level scenario workflow is presented in the following diagram.
Prerequisites
To run the scenario, the following prerequisites must be met:
-
An ADH cluster with the 4.1.0 version or higher is installed and running. The cluster includes the following services:
-
ADPG
-
Core configuration
-
HDFS
-
Hive
-
Trino
-
YARN
-
ZooKeeper
-
-
A test REST API service with an OpenAPI schema. In this example, a simple Python application emulates a mock RESTful server with bank transactions. The sample code and the instructions on running the app are provided in the Prepare a test OpenAPI server section.
-
Two Trino catalogs are created:
-
openapi-catalog. The catalog for fetching data from the mock server. -
iceberg-adh. The catalog for writing filtered data to an Iceberg table.
Catalog creation steps are described in the scenario.
-
-
A JDBC client (for example, DBeaver) to connect to the Trino service.
Prepare a test OpenAPI server
In this example, a Python application built with the FastAPI framework emulates a REST server that returns bank transactions over HTTP. The mock server exposes the following REST endpoints:
Returns a list of all transactions, for example:
[
{
"txn_id": 1,
"acc_id": 1002,
"txn_value": 184.0,
"txn_type": "withdrawal"
},
{
"txn_id": 2,
"acc_id": 1000,
"txn_value": 282.95,
"txn_type": "spb"
},
{
"txn_id": 3,
"acc_id": 1001,
"txn_value": 100.00,
"txn_type": "deposit"
}
]
Returns a batch of transactions of a fixed size:
{
"transactions_count": 2,
"items":
[
{
"txn_id": 1,
"acc_id": 1002,
"txn_value": 184.0,
"txn_type": "withdrawal"
},
{
"txn_id": 2,
"acc_id": 1000,
"txn_value": 282.95,
"txn_type": "spb"
}
]
}
Returns an OpenAPI schema of the mock server.
FastAPI provides automatic schema generation and custom x-… fields are injected into the schema programmatically.
For example:
openapi: 3.1.0
info:
title: Transaction API
description: Transaction API with x-unwrap demo
version: 1.0.0
paths:
/transactions:
get:
summary: Get Transactions
operationId: get_transactions_transactions_get
responses:
'200':
description: Successful Response
content:
application/json:
schema:
items:
$ref: '#/components/schemas/Transaction'
type: array
title: Response Get Transactions Transactions Get
/transactions_batch:
get:
summary: Get Transactions Batch
operationId: get_transactions_batch_transactions_batch_get
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TransactionBatch'
x-unwrap:
resultParam: $response.body#/items
includeRoot: 'false'
# ...
Run the test server
To run the mock server, follow the steps:
|
TIP
Use the Python interpreter bundled with ADH (/opt/python3.10/).
|
-
Create and activate a new virtual environment.
$ mkdir demo_mock_server $ cd demo_mock_server $ /opt/python3.10/bin/python3 -m venv venv $ source venv/bin/activate -
Create the server.py file and paste the following code:
server.pyfrom fastapi import FastAPI from fastapi.openapi.utils import get_openapi from fastapi.responses import Response from pydantic import BaseModel from typing import List import random import uvicorn import yaml class Transaction(BaseModel): txn_id: int acc_id: int txn_value: float txn_type: str class TransactionBatch(BaseModel): total_count: int items: List[Transaction] app = FastAPI(title="Transaction API", version="1.0.0") def generate_transactions(n: int = 10) -> List[Transaction]: acc_ids = [1000, 1001, 1002] txn_types = ["deposit", "withdrawal", "spb"] transactions = [] for txn_id in range(1, n + 1): transactions.append( Transaction( txn_id=txn_id, acc_id=random.choice(acc_ids), txn_value=round(random.uniform(0, 500), 2), txn_type=random.choice(txn_types), ) ) return transactions transactions = generate_transactions(10) @app.get("/transactions", response_model=List[Transaction]) def get_transactions(): return transactions # solely for x-unwrap demonstration @app.get("/transactions_batch", response_model=TransactionBatch) def get_transactions_batch(): return { "total_count": len(transactions) / 2, "items": transactions[:5], } def custom_openapi(): if app.openapi_schema: return app.openapi_schema openapi_schema = get_openapi( title=app.title, version=app.version, description="Transaction API with x-unwrap demo", routes=app.routes, ) # Add x-unwrap for /transactions_batch openapi_schema["paths"]["/transactions_batch"]["get"]["x-unwrap"] = { "resultParam": "$response.body#/items", "includeRoot": "false" } app.openapi_schema = openapi_schema return app.openapi_schema app.openapi = custom_openapi @app.get("/openapi.yaml", include_in_schema=False) def get_openapi_yaml(): openapi_schema = custom_openapi() yaml_schema = yaml.dump(openapi_schema, allow_unicode=True, sort_keys=False) return Response(content=yaml_schema, media_type="application/yaml") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8003) -
Install the dependencies using the following requirements.txt file:
$ pip install -r requirements.txtrequirements.txtannotated-types==0.7.0 anyio==4.10.0 click==8.2.1 exceptiongroup==1.3.0 fastapi==0.116.1 h11==0.16.0 idna==3.10 pydantic==2.11.7 pydantic_core==2.33.2 PyYAML==6.0.2 sniffio==1.3.1 starlette==0.47.2 typing-inspection==0.4.1 typing_extensions==4.14.1 uvicorn==0.35.0 -
Run the application:
$ python server.pyThe output:
INFO: Started server process [26147] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8003 (Press CTRL+C to quit) ...
Run the example
-
Ensure the mock server is up and running.
-
In ADCM, enable the built-in Iceberg catalog (Clusters → <clusterName> → Services → Trino → Iceberg configuration).
-
(Optional) Enable dynamic catalog management (Clusters → <clusterName> → Services → Trino → Components → Trino Coordinator → Trino catalog management → catalog.management). This allows creating new catalogs using the
CREATE CATALOGcommand. Restart the Trino service. -
Connect to Trino Coordinator using DBeaver. The JDBC connection string is available in ADCM (Clusters → <clusterName> → Services → Trino → Info). In the connection string, specify the name of a user with HDFS write permissions. This is required to allow certain write operations by the Iceberg catalog in further steps (writing raw data to HDFS).
-
In DBeaver, create another Trino catalog with the following SQL:
CREATE CATALOG "openapi-catalog" USING openapi WITH ( "spec-location" = 'http://10.92.41.251:8003/openapi.yaml', (1) "base-uri" = 'http://10.92.41.251:8003', (2) "authentication.type" = 'none' (3) );1 The endpoint that returns an OpenAPI schema of the mock server. In this scenario, the schema is generated by FastAPI. 2 Base URL of the mock server’s REST API. 3 No authentication is used in this example for simplicity. For more information on supported authentication types, see the OpenAPI connector configuration article. -
Verify the new catalogs:
SHOW CATALOGS;The output:
Catalog | --------------------+ iceberg-adh | system | openapi-catalog |
-
Create a test Iceberg table:
USE "iceberg-adh".default; CREATE TABLE transactions_spb_only(txn_id int, acc_id int, txn_value double); -
Fetch transactions from the mock server, filter by transaction type, and write filtered result set to the Iceberg table with one SQL command:
INSERT INTO "iceberg-adh".default.transactions_spb_only (txn_id, acc_id, txn_value) SELECT txn_id, acc_id, txn_value FROM "openapi-catalog".default.transactions WHERE txn_type = 'spb'; -
Verify the data in the Iceberg table:
SELECT * FROM "iceberg-adh".default.transactions_spb_only;Sample output:
txn_id|acc_id|txn_value| ------+------+---------+ 1| 1001| 275.32| 2| 1001| 12.24| 6| 1000| 89.72| 7| 1000| 40.71| 8| 1002| 470.56|Verify data in HDFS:
$ hdfs dfs -ls /apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5Sample output:
drwxrwxr-x - hdfs hadoop 0 2025-09-17 16:01 hdfs://adh/apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5/data drwxrwxr-x - hdfs hadoop 0 2025-09-17 16:01 hdfs://adh/apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5/metadata