Пример использования OpenAPI connector
В данной статье описан пошаговый сценарий, демонстрирующий использование Trino OpenAPI-коннектора. Алгоритм сценария представлен на следующей схеме.
Требования
Для выполнения сценария необходимо следующее:
- 
Кластер ADH версии 4.1.0 или выше установлен и запущен. Кластер содержит следующие сервисы: - 
ADPG 
- 
Core configuration 
- 
HDFS 
- 
Hive 
- 
Trino 
- 
YARN 
- 
ZooKeeper 
 
- 
- 
Тестовый REST API c OpenAPI-схемой. В этом примере используется приложение Python, эмулирующее RESTful-сервер с банковскими транзакциями. Пример кода и инструкции по запуску приложения приведены в разделе Подготовка тестового OpenAPI-сервера. 
- 
Созданы два каталога Trino: - 
openapi-catalog. Каталог для чтения данных из тестового сервера.
- 
iceberg-adh. Каталог для записи отфильтрованных данных в таблицу Iceberg.
 Этапы создания каталогов описаны в сценарии. 
- 
- 
JDBC-клиент (например, DBeaver) для подключения к сервису Trino. 
Подготовка тестового OpenAPI-сервера
В данном примере приложение Python на базе фреймворка FastAPI эмулирует REST-сервер, который возвращает банковские транзакции по HTTP. Тестовый сервер предоставляет следующие REST-эндпойнты:
 
Возвращает список всех доступных транзакций, например:
    [
        {
            "txn_id": 1,
            "acc_id": 1002,
            "txn_value": 184.0,
            "txn_type": "withdrawal"
        },
        {
            "txn_id": 2,
            "acc_id": 1000,
            "txn_value": 282.95,
            "txn_type": "spb"
        },
        {
            "txn_id": 3,
            "acc_id": 1001,
            "txn_value": 100.00,
            "txn_type": "deposit"
        }
    ] 
Возвращает пакет транзакций фиксированного размера:
{
    "transactions_count": 2,
    "items":
    [
        {
            "txn_id": 1,
            "acc_id": 1002,
            "txn_value": 184.0,
            "txn_type": "withdrawal"
        },
        {
            "txn_id": 2,
            "acc_id": 1000,
            "txn_value": 282.95,
            "txn_type": "spb"
        }
    ]
} 
Возвращает OpenAPI-схему, описывающую возможности API сервера.
Фреймворк FastAPI позволяет автоматически генерировать схему, а пользовательские поля типа x-… вставляются в схему программно.
Например:
openapi: 3.1.0
info:
  title: Transaction API
  description: Transaction API with x-unwrap demo
  version: 1.0.0
paths:
  /transactions:
    get:
      summary: Get Transactions
      operationId: get_transactions_transactions_get
      responses:
        '200':
          description: Successful Response
          content:
            application/json:
              schema:
                items:
                  $ref: '#/components/schemas/Transaction'
                type: array
                title: Response Get Transactions Transactions Get
  /transactions_batch:
    get:
      summary: Get Transactions Batch
      operationId: get_transactions_batch_transactions_batch_get
      responses:
        '200':
          description: Successful Response
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/TransactionBatch'
      x-unwrap:
        resultParam: $response.body#/items
        includeRoot: 'false'
# ...Запуск тестового сервера
Чтобы запустить тестовый сервер, выполните следующие действия:
| РЕКОМЕНДАЦИЯИспользуйте интерпретатор Python, поставляемый с ADH (/opt/python3.10/). | 
- 
Создайте и активируйте новую виртуальную среду (virtual environment). $ mkdir demo_mock_server $ cd demo_mock_server $ /opt/python3.10/bin/python3 -m venv venv $ source venv/bin/activate
- 
Создайте файл server.py и вставьте в него приведенный ниже код: server.pyfrom fastapi import FastAPI from fastapi.openapi.utils import get_openapi from fastapi.responses import Response from pydantic import BaseModel from typing import List import random import uvicorn import yaml class Transaction(BaseModel): txn_id: int acc_id: int txn_value: float txn_type: str class TransactionBatch(BaseModel): total_count: int items: List[Transaction] app = FastAPI(title="Transaction API", version="1.0.0") def generate_transactions(n: int = 10) -> List[Transaction]: acc_ids = [1000, 1001, 1002] txn_types = ["deposit", "withdrawal", "spb"] transactions = [] for txn_id in range(1, n + 1): transactions.append( Transaction( txn_id=txn_id, acc_id=random.choice(acc_ids), txn_value=round(random.uniform(0, 500), 2), txn_type=random.choice(txn_types), ) ) return transactions transactions = generate_transactions(10) @app.get("/transactions", response_model=List[Transaction]) def get_transactions(): return transactions # solely for x-unwrap demonstration @app.get("/transactions_batch", response_model=TransactionBatch) def get_transactions_batch(): return { "total_count": len(transactions) / 2, "items": transactions[:5], } def custom_openapi(): if app.openapi_schema: return app.openapi_schema openapi_schema = get_openapi( title=app.title, version=app.version, description="Transaction API with x-unwrap demo", routes=app.routes, ) # Add x-unwrap for /transactions_batch openapi_schema["paths"]["/transactions_batch"]["get"]["x-unwrap"] = { "resultParam": "$response.body#/items", "includeRoot": "false" } app.openapi_schema = openapi_schema return app.openapi_schema app.openapi = custom_openapi @app.get("/openapi.yaml", include_in_schema=False) def get_openapi_yaml(): openapi_schema = custom_openapi() yaml_schema = yaml.dump(openapi_schema, allow_unicode=True, sort_keys=False) return Response(content=yaml_schema, media_type="application/yaml") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8003)
- 
Установите зависимости, используя requirements.txt: $ pip install -r requirements.txtrequirements.txtannotated-types==0.7.0 anyio==4.10.0 click==8.2.1 exceptiongroup==1.3.0 fastapi==0.116.1 h11==0.16.0 idna==3.10 pydantic==2.11.7 pydantic_core==2.33.2 PyYAML==6.0.2 sniffio==1.3.1 starlette==0.47.2 typing-inspection==0.4.1 typing_extensions==4.14.1 uvicorn==0.35.0
- 
Запустите приложение: $ python server.pyВывод: INFO: Started server process [26147] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8003 (Press CTRL+C to quit) ... 
Выполнение сценария
- 
Убедитесь, что тестовый сервер запущен и работает. 
- 
В ADCM активируйте встроенный каталог Iceberg (Clusters → <clusterName> → Services → Trino → Iceberg configuration). 
- 
(Опционально) Включите динамическое управление каталогами (Clusters → <clusterName> → Services → Trino → Components → Trino Coordinator → Trino catalog management → catalog.management). Это позволит создавать каталоги с помощью команды CREATE CATALOG. Перезапустите сервис Trino.
- 
Подключитесь к Trino Coordinator с помощью DBeaver. Строка подключения JDBC доступна в ADCM (Clusters → <clusterName> → Services → Trino → Info). Для подключения используйте имя пользователя, имеющего права на запись в HDFS. Это необходимо для выполнения определенных операций записи каталогом Iceberg на последующих шагах (запись сырых данных в HDFS). 
- 
В DBeaver выполните следующий SQL для создания Trino-каталога: CREATE CATALOG "openapi-catalog" USING openapi WITH ( "spec-location" = 'http://10.92.41.251:8003/openapi.yaml', (1) "base-uri" = 'http://10.92.41.251:8003', (2) "authentication.type" = 'none' (3) );1 Эндпойнт, который возвращает OpenAPI-схему тестового сервера. В данном сценарии схема генерируется с помощью FastAPI. 2 Базовый URL REST API сервера. 3 В данном примере аутентификация не используется. Дополнительные сведения о поддерживаемых типах аутентификации доступны в статье Настройка OpenAPI connector. 
- 
Проверьте созданные каталоги: SHOW CATALOGS;Вывод: Catalog | --------------------+ iceberg-adh | system | openapi-catalog | 
- 
Создайте тестовую таблицу Iceberg: USE "iceberg-adh".default; CREATE TABLE transactions_spb_only(txn_id int, acc_id int, txn_value double);
- 
Запросите список транзакций из тестового сервера, отфильтруйте их по типу и запишите результат в таблицу Iceberg с помощью SQL-команды: INSERT INTO "iceberg-adh".default.transactions_spb_only (txn_id, acc_id, txn_value) SELECT txn_id, acc_id, txn_value FROM "openapi-catalog".default.transactions WHERE txn_type = 'spb';
- 
Проверьте данные в таблице Iceberg: SELECT * FROM "iceberg-adh".default.transactions_spb_only;Пример вывода: txn_id|acc_id|txn_value| ------+------+---------+ 1| 1001| 275.32| 2| 1001| 12.24| 6| 1000| 89.72| 7| 1000| 40.71| 8| 1002| 470.56|Проверьте запись данных в HDFS: $ hdfs dfs -ls /apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5Пример вывода: drwxrwxr-x - hdfs hadoop 0 2025-09-17 16:01 hdfs://adh/apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5/data drwxrwxr-x - hdfs hadoop 0 2025-09-17 16:01 hdfs://adh/apps/hive/warehouse/transactions_spb_only-7947dbdc9e8d4cf6b5db7558a64f16c5/metadata