Пример использования OpenAPI connector

В данной статье описан пошаговый сценарий, демонстрирующий использование Trino OpenAPI-коннектора. Алгоритм сценария представлен на следующей схеме.

Пример использования коннектора Trino OpenAPI
Пример использования коннектора Trino OpenAPI
Пример использования коннектора Trino OpenAPI
Пример использования коннектора Trino OpenAPI

Требования

Для выполнения сценария необходимо следующее:

  • Кластер ADH версии 4.1.0 или выше установлен и запущен. Кластер содержит следующие сервисы:

    • ADPG

    • Core configuration

    • HDFS

    • Hive

    • Trino

    • YARN

    • ZooKeeper

  • Тестовый REST API c OpenAPI-схемой. В этом примере используется приложение Python, эмулирующее RESTful-сервер с банковскими транзакциями. Пример кода и инструкции по запуску приложения приведены в разделе Подготовка тестового OpenAPI-сервера.

  • Созданы два каталога Trino:

    • openapi-catalog. Каталог для чтения данных из тестового сервера.

    • iceberg-adh. Каталог для записи отфильтрованных данных в таблицу Iceberg.

    Этапы создания каталогов описаны в сценарии.

  • JDBC-клиент (например, DBeaver) для подключения к сервису Trino.

Подготовка тестового OpenAPI-сервера

В данном примере приложение Python на базе фреймворка FastAPI эмулирует REST-сервер, который возвращает банковские транзакции по HTTP. Тестовый сервер предоставляет следующие REST-эндпойнты:

GET [BaseURL]/transactions

 

Возвращает список всех доступных транзакций, например:

    [
        {
            "txn_id": 1,
            "acc_id": 1002,
            "txn_value": 184.0,
            "txn_type": "withdrawal"
        },
        {
            "txn_id": 2,
            "acc_id": 1000,
            "txn_value": 282.95,
            "txn_type": "spb"
        },
        {
            "txn_id": 3,
            "acc_id": 1001,
            "txn_value": 100.00,
            "txn_type": "deposit"
        }
    ]
GET [BaseURL]/transactions_batch

 

Возвращает пакет транзакций фиксированного размера:

{
    "transactions_count": 2,
    "items":
    [
        {
            "txn_id": 1,
            "acc_id": 1002,
            "txn_value": 184.0,
            "txn_type": "withdrawal"
        },
        {
            "txn_id": 2,
            "acc_id": 1000,
            "txn_value": 282.95,
            "txn_type": "spb"
        }
    ]
}
GET [BaseURL]/openapi.yaml

 

Возвращает OpenAPI-схему, описывающую возможности API сервера. Фреймворк FastAPI позволяет автоматически генерировать схему, а пользовательские поля типа x-…​ вставляются в схему программно. Например:

openapi: 3.1.0
info:
  title: Transaction API
  description: Transaction API with x-unwrap demo
  version: 1.0.0
paths:
  /transactions:
    get:
      summary: Get Transactions
      operationId: get_transactions_transactions_get
      responses:
        '200':
          description: Successful Response
          content:
            application/json:
              schema:
                items:
                  $ref: '#/components/schemas/Transaction'
                type: array
                title: Response Get Transactions Transactions Get
  /transactions_batch:
    get:
      summary: Get Transactions Batch
      operationId: get_transactions_batch_transactions_batch_get
      responses:
        '200':
          description: Successful Response
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/TransactionBatch'
      x-unwrap:
        resultParam: $response.body#/items
        includeRoot: 'false'
# ...

Запуск тестового сервера

Чтобы запустить тестовый сервер, выполните следующие действия:

РЕКОМЕНДАЦИЯ
Используйте интерпретатор Python, поставляемый с ADH (/opt/python3.10/).
  1. Создайте и активируйте новую виртуальную среду (virtual environment).

    $ mkdir demo_mock_server
    $ cd demo_mock_server
    $ /opt/python3.10/bin/python3 -m venv venv
    $ source venv/bin/activate
  2. Создайте файл server.py и вставьте в него приведенный ниже код:

    server.py

     

    from fastapi import FastAPI
    from fastapi.openapi.utils import get_openapi
    from fastapi.responses import Response
    from pydantic import BaseModel
    from typing import List
    import random
    import uvicorn
    import yaml
    
    
    class Transaction(BaseModel):
        txn_id: int
        acc_id: int
        txn_value: float
        txn_type: str
    
    
    class TransactionBatch(BaseModel):
        total_count: int
        items: List[Transaction]
    
    
    app = FastAPI(title="Transaction API", version="1.0.0")
    
    
    def generate_transactions(n: int = 10) -> List[Transaction]:
        acc_ids = [1000, 1001, 1002]
        txn_types = ["deposit", "withdrawal", "spb"]
        transactions = []
        for txn_id in range(1, n + 1):
            transactions.append(
                Transaction(
                    txn_id=txn_id,
                    acc_id=random.choice(acc_ids),
                    txn_value=round(random.uniform(0, 500), 2),
                    txn_type=random.choice(txn_types),
                )
            )
        return transactions
    
    
    transactions = generate_transactions(10)
    
    
    @app.get("/transactions", response_model=List[Transaction])
    def get_transactions():
        return transactions
    
    
    # solely for x-unwrap demonstration
    @app.get("/transactions_batch", response_model=TransactionBatch)
    def get_transactions_batch():
        return {
            "total_count": len(transactions) / 2,
            "items": transactions[:5],
        }
    
    
    def custom_openapi():
        if app.openapi_schema:
            return app.openapi_schema
    
        openapi_schema = get_openapi(
            title=app.title,
            version=app.version,
            description="Transaction API with x-unwrap demo",
            routes=app.routes,
        )
    
        # Add x-unwrap for /transactions_batch
        openapi_schema["paths"]["/transactions_batch"]["get"]["x-unwrap"] = {
            "resultParam": "$response.body#/items",
            "includeRoot": "false"
        }
    
        app.openapi_schema = openapi_schema
        return app.openapi_schema
    
    
    app.openapi = custom_openapi
    
    
    @app.get("/openapi.yaml", include_in_schema=False)
    def get_openapi_yaml():
        openapi_schema = custom_openapi()
        yaml_schema = yaml.dump(openapi_schema, allow_unicode=True,
                                sort_keys=False)
        return Response(content=yaml_schema, media_type="application/yaml")
    
    
    if __name__ == "__main__":
        uvicorn.run(app, host="0.0.0.0", port=8003)
  3. Установите зависимости, используя requirements.txt:

    $ pip install -r requirements.txt
    requirements.txt

     

    annotated-types==0.7.0
    anyio==4.10.0
    click==8.2.1
    exceptiongroup==1.3.0
    fastapi==0.116.1
    h11==0.16.0
    idna==3.10
    pydantic==2.11.7
    pydantic_core==2.33.2
    PyYAML==6.0.2
    sniffio==1.3.1
    starlette==0.47.2
    typing-inspection==0.4.1
    typing_extensions==4.14.1
    uvicorn==0.35.0
  4. Запустите приложение:

    $ python server.py

    Вывод:

    INFO:     Started server process [26147]
    INFO:     Waiting for application startup.
    INFO:     Application startup complete.
    INFO:     Uvicorn running on http://0.0.0.0:8003 (Press CTRL+C to quit)
    ...

Выполнение сценария

  1. Убедитесь, что тестовый сервер запущен и работает.

  2. В ADCM активируйте встроенный каталог Iceberg (Clusters → <clusterName> → Services → Trino → Iceberg configuration).

  3. (Опционально) Включите динамическое управление каталогами (Clusters → <clusterName> → Services → Trino → Components → Trino Coordinator → Trino catalog management → catalog.management). Это позволит создавать каталоги с помощью команды CREATE CATALOG. Перезапустите сервис Trino.

  4. Подключитесь к Trino Coordinator с помощью DBeaver. Строка подключения JDBC доступна в ADCM (Clusters → <clusterName> → Services → Trino → Info). Для подключения используйте имя пользователя, имеющего права на запись в HDFS. Это необходимо для выполнения определенных операций записи каталогом Iceberg на последующих шагах (запись сырых данных в HDFS).

  5. В DBeaver выполните следующий SQL для создания Trino-каталога:

    CREATE CATALOG "openapi-catalog" USING openapi
    WITH (
        "spec-location" = 'http://10.92.41.251:8003/openapi.yaml', (1)
        "base-uri" = 'http://10.92.41.251:8003', (2)
        "authentication.type" = 'none' (3)
    );
    1 Эндпойнт, который возвращает OpenAPI-схему тестового сервера. В данном сценарии схема генерируется с помощью FastAPI.
    2 Базовый URL REST API сервера.
    3 В данном примере аутентификация не используется. Дополнительные сведения о поддерживаемых типах аутентификации доступны в статье Настройка OpenAPI connector.
  6. Проверьте созданные каталоги:

    SHOW CATALOGS;

    Вывод:

    Catalog             |
    --------------------+
    iceberg-adh         |
    system              |
    openapi-catalog     |
  7. Создайте тестовую таблицу Iceberg:

    USE "iceberg-adh".default;
    CREATE TABLE transactions_spb_only(txn_id int, acc_id int, txn_value double);
  8. Запросите список транзакций из тестового сервера, отфильтруйте их по типу и запишите результат в таблицу Iceberg с помощью SQL-команды:

    INSERT INTO "iceberg-adh".default.transactions_spb_only (txn_id, acc_id, txn_value)
    SELECT txn_id, acc_id, txn_value
    FROM "openapi-catalog".default.transactions
    WHERE txn_type = 'spb';
  9. Проверьте данные в таблице Iceberg:

    SELECT * FROM "iceberg-adh".default.transactions_spb_only;

    Пример вывода:

    txn_id|acc_id|txn_value|
    ------+------+---------+
         1|  1001|   275.32|
         2|  1001|    12.24|
         6|  1000|    89.72|
         7|  1000|    40.71|
         8|  1002|   470.56|

    Проверьте запись данных в HDFS:

    $ hdfs dfs -ls /apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5

    Пример вывода:

    drwxrwxr-x   - hdfs hadoop          0 2025-09-17 16:01 hdfs://adh/apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5/data
    drwxrwxr-x   - hdfs hadoop          0 2025-09-17 16:01 hdfs://adh/apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5/metadata
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней