OpenAPI connector usage example

This article describes a step-by-step scenario that shows how to use the Trino OpenAPI connector. The high-level scenario workflow is presented in the following diagram.

Trino OpenAPI connector use case
Trino OpenAPI connector use case
Trino OpenAPI connector use case
Trino OpenAPI connector use case

Prerequisites

To run the scenario, the following prerequisites must be met:

  • An ADH cluster with the 4.1.0 version or higher is installed and running. The cluster includes the following services:

    • ADPG

    • Core configuration

    • HDFS

    • Hive

    • Trino

    • YARN

    • ZooKeeper

  • A test REST API service with an OpenAPI schema. In this example, a simple Python application emulates a mock RESTful server with bank transactions. The sample code and the instructions on running the app are provided in the Prepare a test OpenAPI server section.

  • Two Trino catalogs are created:

    • openapi-catalog. The catalog for fetching data from the mock server.

    • iceberg-adh. The catalog for writing filtered data to an Iceberg table.

    Catalog creation steps are described in the scenario.

  • A JDBC client (for example, DBeaver) to connect to the Trino service.

Prepare a test OpenAPI server

In this example, a Python application built with the FastAPI framework emulates a REST server that returns bank transactions over HTTP. The mock server exposes the following REST endpoints:

GET [BaseURL]/transactions

 

Returns a list of all transactions, for example:

    [
        {
            "txn_id": 1,
            "acc_id": 1002,
            "txn_value": 184.0,
            "txn_type": "withdrawal"
        },
        {
            "txn_id": 2,
            "acc_id": 1000,
            "txn_value": 282.95,
            "txn_type": "spb"
        },
        {
            "txn_id": 3,
            "acc_id": 1001,
            "txn_value": 100.00,
            "txn_type": "deposit"
        }
    ]
GET [BaseURL]/transactions_batch

 

Returns a batch of transactions of a fixed size:

{
    "transactions_count": 2,
    "items":
    [
        {
            "txn_id": 1,
            "acc_id": 1002,
            "txn_value": 184.0,
            "txn_type": "withdrawal"
        },
        {
            "txn_id": 2,
            "acc_id": 1000,
            "txn_value": 282.95,
            "txn_type": "spb"
        }
    ]
}
GET [BaseURL]/openapi.yaml

 

Returns an OpenAPI schema of the mock server. FastAPI provides automatic schema generation and custom x-…​ fields are injected into the schema programmatically. For example:

openapi: 3.1.0
info:
  title: Transaction API
  description: Transaction API with x-unwrap demo
  version: 1.0.0
paths:
  /transactions:
    get:
      summary: Get Transactions
      operationId: get_transactions_transactions_get
      responses:
        '200':
          description: Successful Response
          content:
            application/json:
              schema:
                items:
                  $ref: '#/components/schemas/Transaction'
                type: array
                title: Response Get Transactions Transactions Get
  /transactions_batch:
    get:
      summary: Get Transactions Batch
      operationId: get_transactions_batch_transactions_batch_get
      responses:
        '200':
          description: Successful Response
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/TransactionBatch'
      x-unwrap:
        resultParam: $response.body#/items
        includeRoot: 'false'
# ...

Run the test server

To run the mock server, follow the steps:

TIP
Use the Python interpreter bundled with ADH (/opt/python3.10/).
  1. Create and activate a new virtual environment.

    $ mkdir demo_mock_server
    $ cd demo_mock_server
    $ /opt/python3.10/bin/python3 -m venv venv
    $ source venv/bin/activate
  2. Create the server.py file and paste the following code:

    server.py

     

    from fastapi import FastAPI
    from fastapi.openapi.utils import get_openapi
    from fastapi.responses import Response
    from pydantic import BaseModel
    from typing import List
    import random
    import uvicorn
    import yaml
    
    
    class Transaction(BaseModel):
        txn_id: int
        acc_id: int
        txn_value: float
        txn_type: str
    
    
    class TransactionBatch(BaseModel):
        total_count: int
        items: List[Transaction]
    
    
    app = FastAPI(title="Transaction API", version="1.0.0")
    
    
    def generate_transactions(n: int = 10) -> List[Transaction]:
        acc_ids = [1000, 1001, 1002]
        txn_types = ["deposit", "withdrawal", "spb"]
        transactions = []
        for txn_id in range(1, n + 1):
            transactions.append(
                Transaction(
                    txn_id=txn_id,
                    acc_id=random.choice(acc_ids),
                    txn_value=round(random.uniform(0, 500), 2),
                    txn_type=random.choice(txn_types),
                )
            )
        return transactions
    
    
    transactions = generate_transactions(10)
    
    
    @app.get("/transactions", response_model=List[Transaction])
    def get_transactions():
        return transactions
    
    
    # solely for x-unwrap demonstration
    @app.get("/transactions_batch", response_model=TransactionBatch)
    def get_transactions_batch():
        return {
            "total_count": len(transactions) / 2,
            "items": transactions[:5],
        }
    
    
    def custom_openapi():
        if app.openapi_schema:
            return app.openapi_schema
    
        openapi_schema = get_openapi(
            title=app.title,
            version=app.version,
            description="Transaction API with x-unwrap demo",
            routes=app.routes,
        )
    
        # Add x-unwrap for /transactions_batch
        openapi_schema["paths"]["/transactions_batch"]["get"]["x-unwrap"] = {
            "resultParam": "$response.body#/items",
            "includeRoot": "false"
        }
    
        app.openapi_schema = openapi_schema
        return app.openapi_schema
    
    
    app.openapi = custom_openapi
    
    
    @app.get("/openapi.yaml", include_in_schema=False)
    def get_openapi_yaml():
        openapi_schema = custom_openapi()
        yaml_schema = yaml.dump(openapi_schema, allow_unicode=True,
                                sort_keys=False)
        return Response(content=yaml_schema, media_type="application/yaml")
    
    
    if __name__ == "__main__":
        uvicorn.run(app, host="0.0.0.0", port=8003)
  3. Install the dependencies using the following requirements.txt file:

    $ pip install -r requirements.txt
    requirements.txt

     

    annotated-types==0.7.0
    anyio==4.10.0
    click==8.2.1
    exceptiongroup==1.3.0
    fastapi==0.116.1
    h11==0.16.0
    idna==3.10
    pydantic==2.11.7
    pydantic_core==2.33.2
    PyYAML==6.0.2
    sniffio==1.3.1
    starlette==0.47.2
    typing-inspection==0.4.1
    typing_extensions==4.14.1
    uvicorn==0.35.0
  4. Run the application:

    $ python server.py

    The output:

    INFO:     Started server process [26147]
    INFO:     Waiting for application startup.
    INFO:     Application startup complete.
    INFO:     Uvicorn running on http://0.0.0.0:8003 (Press CTRL+C to quit)
    ...

Run the example

  1. Ensure the mock server is up and running.

  2. In ADCM, enable the built-in Iceberg catalog (Clusters → <clusterName> → Services → Trino → Iceberg configuration).

  3. (Optional) Enable dynamic catalog management (Clusters → <clusterName> → Services → Trino → Components → Trino Coordinator → Trino catalog management → catalog.management). This allows creating new catalogs using the CREATE CATALOG command. Restart the Trino service.

  4. Connect to Trino Coordinator using DBeaver. The JDBC connection string is available in ADCM (Clusters → <clusterName> → Services → Trino → Info). In the connection string, specify the name of a user with HDFS write permissions. This is required to allow certain write operations by the Iceberg catalog in further steps (writing raw data to HDFS).

  5. In DBeaver, create another Trino catalog with the following SQL:

    CREATE CATALOG "openapi-catalog" USING openapi
    WITH (
        "spec-location" = 'http://10.92.41.251:8003/openapi.yaml', (1)
        "base-uri" = 'http://10.92.41.251:8003', (2)
        "authentication.type" = 'none' (3)
    );
    1 The endpoint that returns an OpenAPI schema of the mock server. In this scenario, the schema is generated by FastAPI.
    2 Base URL of the mock server’s REST API.
    3 No authentication is used in this example for simplicity. For more information on supported authentication types, see the OpenAPI connector configuration article.
  6. Verify the new catalogs:

    SHOW CATALOGS;

    The output:

    Catalog             |
    --------------------+
    iceberg-adh         |
    system              |
    openapi-catalog     |
  7. Create a test Iceberg table:

    USE "iceberg-adh".default;
    CREATE TABLE transactions_spb_only(txn_id int, acc_id int, txn_value double);
  8. Fetch transactions from the mock server, filter by transaction type, and write filtered result set to the Iceberg table with one SQL command:

    INSERT INTO "iceberg-adh".default.transactions_spb_only (txn_id, acc_id, txn_value)
    SELECT txn_id, acc_id, txn_value
    FROM "openapi-catalog".default.transactions
    WHERE txn_type = 'spb';
  9. Verify the data in the Iceberg table:

    SELECT * FROM "iceberg-adh".default.transactions_spb_only;

    Sample output:

    txn_id|acc_id|txn_value|
    ------+------+---------+
         1|  1001|   275.32|
         2|  1001|    12.24|
         6|  1000|    89.72|
         7|  1000|    40.71|
         8|  1002|   470.56|

    Verify data in HDFS:

    $ hdfs dfs -ls /apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5

    Sample output:

    drwxrwxr-x   - hdfs hadoop          0 2025-09-17 16:01 hdfs://adh/apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5/data
    drwxrwxr-x   - hdfs hadoop          0 2025-09-17 16:01 hdfs://adh/apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5/metadata
Found a mistake? Seleсt text and press Ctrl+Enter to report it