Обзор работы Flink

Сервис Flink — это процессинговый движок для потоковой и пакетной обработки данных. По назначению и использованию Flink похож на Spark — он также предлагает stateful/stateless-вычисления, вводит абстракцию SQL над данными, поддерживает режим высокой доступности и предлагает гибкие возможности кластеризации. Однако изначально Flink разрабатывался как фреймворк, ориентированный на работу именно с потоками данных. Flink может работать с ограниченными (bounded) и неограниченными (unbounded) потоками данных, что делает его универсальным выбором для различных приложений. Области применения Flink: аналитика в реальном времени, событийно-ориентированные приложения (event-driven applications), системы распознавания паттернов, аномалий, выявление мошеннических действий, высоконагруженные ETL-пайплайны и так далее.

Flink предоставляет следующие возможности:

  • Унифицированный API для потоковой и пакетной обработки. Хоть Flink и был разработан в первую очередь для работы с потоками данных, он также может решать задачи пакетной обработки.

  • Обработка с учетом состояния (stateful processing). Flink поддерживает обработку данных с учетом состояния, периодически сохраняя состояние приложения на разных этапах работы. Этот подход позволяет Flink обеспечивать принцип обработки "ровно один раз" (exactly once), гарантируя, что данные будут обработаны только один раз, даже в случае сбоев или перезапусков.

  • Окна (windows) и временные операции. Flink имеет встроенную поддержку различных типов окон, которые являются ключевым инструментом при работе с неограниченными потоками данных. Оконные функции позволяют группировать и агрегировать данные в определенном временном интервале. Flink также позволяет выполнять агрегации типа SUM, AVG, COUNT в контексте окна, отталкиваясь от времени события либо времени обработки.

  • Отказоустойчивость и восстановление. Flink предоставляет гибкие возможности для обеспечения отказоустойчивости с помощью контрольных точек (checkpoints). Используя контрольные точки, система периодически сохраняет свое состояние с возможностью восстановления из контрольной точки в будущем. Схожий механизм точек сохранения (savepoints) позволяет вручную создавать снепшоты Flink-задач для обновления, приостановки или перезапуска задач с сохранением состояния.

  • SQL для потоков данных. Flink вводит уровень SQL-абстракции для потоков данных, позволяя выполнять SQL-запросы над потоками, как с обычными таблицами. Flink поддерживает операции фильтрации, объединения, агрегации, а также оконные функции, используя синтаксис SQL.

Архитектура

Ниже представлена высокоуровневая архитектура Flink.

Архитектура Flink
Архитектура Flink
Архитектура Flink
Архитектура Flink

Основные архитектурные компоненты описаны ниже.

Преобразует код приложения Flink в логический граф (Logical graph, также DataFlow graph или JobGraph) и отправляет его компоненту JobManager. Не является частью среды выполнения Flink. Существует несколько вариантов, которые могут использоваться в качестве клиента в кластере Flink:

  • Flink CLI.

  • REST-эндпойнт. Flink может получить JAR c задачами в теле REST-запроса.

  • SQL-клиент. Позволяет запускать задачи Flink с помощью встроенного CLI-интерфейса для SQL.

  • Клиентские приложения на Java/Python, которые программно запускают новые задачи через Flink API.

JobManager

Ключевой компонент, координирующий жизненный цикл приложений Flink. Он управляет планированием задач, отслеживает их состояние, координирует работу контрольных точек, занимается восстановлением системы и так далее. В кластере Flink JobManager существует в одном экземпляре и управляет одним или несколькими TaskManagers.

Процесс JobManager включает следующие компоненты:

  • Resource Manager. Отвечает за распределение ресурсов в кластере Flink. Непосредственно управляет слотами задач (task slots) — единицами планирования ресурсов Flink.

  • JobMaster. Создается для каждой задачи и отвечает за выполнение одного логического графа Dataflow. В кластере Flink одновременно могут выполняться несколько задач, у каждой из которых есть свой JobMaster.

  • Dispatcher. Создает новый экземпляр JobMaster для каждой задачи. Обслуживает REST-эндпойнты для запуска приложений Flink по REST API и предоставляет веб-интерфейс Flink c информацией о выполненных задачах.

TaskManager

Процесс, который запускает одну или несколько подзадач (subtasks) в отдельных потоках. В кластере Flink содержится один или более экземпляров TaskManager, отвечающих за буферизацию данных и обмен данными друг с другом. Наименьшей единицей ресурсов для TaskManager является слот задачи (task slot).

РЕКОМЕНДАЦИЯ
Задачи Flink можно запускать в режиме YARN-кластера, который автоматически создает контейнеры для компонентов JobManager/TaskManager на хостах под управлением YARN. Подробная информация о поддерживаемых режимах и примерах использования доступна в статье Запуск Flink в YARN.

Концепции

Далее описаны основные понятия, наиболее часто использующиеся в контексте Flink. Больше информации доступно в документации Flink.

  • Ограниченные и неограниченные потоки данных

    Ограниченный (bounded) поток — это поток данных с определенным количеством записей, у которого есть начало и конец. Например, чтение записей из CSV-файла с помощью Flink DataStream API создает ограниченный поток данных. При работе с такими потоками все данные потока можно загрузить в систему (например, для предварительной сортировки) до выполнения вычислений.

    Неограниченный поток данных имеет начало, но может никогда не заканчиваться, непрерывно поставляя новые записи через произвольные промежутки времени. Примеры неограниченного потока данных: брокер Kafka, веб-сокет, отправляющий JSON-события, цифровой интерфейс датчика на производстве и так далее.

  • Данные состояния и обработка данных с учетом состояния

    Данные состояния (state data) — это данные типа ключ/значение, которые Flink периодически сохраняет на различных этапах обработки, например, между двумя операторами агрегации. Использование данных состояния предполагает дополнительные затраты на обработку и хранение этих данных. Однако именно данные состояния делают возможными такие операции, как поиск закономерностей в потоке входных данных, агрегация событий в определенном временном интервале (минута/час/дата), восстановление обработки потока без потерь и так далее.

    В кластере Flink данные состояния обрабатываются системой state backend. Выбор реализации этой системы, наиболее подходящей для конкретного Flink-приложения, может помочь минимизировать затраты на доступ к данным состояния.

  • Источники и приемники данных

    Типичное приложение Flink, работающее с потоками данных, имеет структуру, представленную ниже. Это упрощенное представление, поскольку "под капотом" в JVM Flink автоматически создает новые и при необходимости дублирует объекты источников/приемников/операторов в зависимости от многих факторов, таких как настройки параллелизма, активированные механизмы оптимизации и так далее.

    Источники и приемники данных
    Источники и приемники данных
    Источники и приемники данных
    Источники и приемники данных

    Основные компоненты диаграммы:

    • Источник (source). Стартовая точка при выполнении задачи Flink. Источник получает данные из внешней системы, например Kafka-брокера, базы данных или сокета, и создает объект DataStream, который далее обрабатывается операторами. Сервис Flink поставляется со встроенными коннекторами для Kafka, JDBC, файловых систем и так далее.

    • Операторы выполняют трансформации над DataStreams.

    • Приемник (sink). Конечная точка обработки данных в приложении Flink. Приемники получают потоки данных в виде объектов DataStream и выводят содержащиеся в них данные в файлы, сокеты, Kafka-топики и другие внешние системы.

  • Задачи и операторы

    Операторы (operators) получают на входе один или несколько DataStream, применяют к ним трансформации и на выходе отдают новый DataStream. Примеры операторов преобразования: функции map(), flatMap(), keyBy() и прочие.

    Задача (task) — это базовая единица работы Flink. Для каждого экземпляра оператора, работающего в параллельном режиме, требуется одна задача для выполнения работы. Например, у оператора с параллелизмом 3 каждый экземпляр будет выполняться как три отдельные задачи.

    Задачи и операторы
    Задачи и операторы
    Задачи и операторы
    Задачи и операторы

Сервис Flink содержит компонент Flink History Server, который отображает статистику о завершенных приложениях Flink в веб-интерфейсе.

ПРИМЕЧАНИЕ
Компонент Flink History Server доступен начиная с версии ADH 4.0.0.

Статистика о завершенных задачах Flink генерируется компонентом JobManager и сохраняется в конфигурируемой локации (по умолчанию — /apps/flink/completed-jobs в HDFS). Flink History Server периодически сканирует данную локацию и отображает информацию о завершенных задачах в веб-интерфейсе. Директории, в которых хранятся архивы, а также интервал сканирования можно настроить с помощью параметров конфигурации сервиса Flink.

History Server REST API

Помимо пользовательского интерфейса, компонент Flink History Server также предоставляет REST API для получения информации о выполненных задачах. REST-эндпойнты не требуют аутентификации, принимают только GET-запросы и доступны по следующему URL:

GET http://<history_server_host>:<port>/<endpoint_path> HTTP/1.1

где:

  • <history_server_host> — имя или IP-адрес хоста, на котором установлен компонент Flink History Server.

  • <port> — номер порта, на котором Flink ожидает REST-запросы. По умолчанию используется порт 8081. Порт можно изменить с помощью параметра historyserver.web.port сервиса Flink в ADCM.

Доступны следующие эндпойнты:

GET [BASE_URL]/config

 

Возвращает общую информацию о кластере Flink.

Пример запроса:

$ curl 'http://ka-adh-3.ru-central1.internal:8082/config'

Ответ сервера:

{
    "refresh-interval": 10000,
    "timezone-name": "Coordinated Universal Time",
    "timezone-offset": 0,
    "flink-version": "1.20.1",
    "flink-revision": "ae77a60 @ 2025-04-28T10:41:27+02:00",
    "features": {
        "web-submit": false,
        "web-cancel": false,
        "web-rescale": false,
        "web-history": true
    }
}
GET [BASE_URL]/jobs/overview

 

Возвращает список выполненных Flink-задач.

Пример запроса:

$ curl 'http://ka-adh-3.ru-central1.internal:8082/jobs/overview'

Ответ сервера:

{
    "jobs": [
        {
            "jid": "547cc23f3e85744bffbfcafe7c06471a",
            "name": "Flink Java Job at Sun Jun 15 23:25:44 UTC 2025",
            "start-time": 1750029945589,
            "end-time": 1750029946616,
            "duration": 1027,
            "state": "FINISHED",
            "last-modification": 1750029946616,
            "tasks": {
                "running": 0,
                "canceling": 0,
                "canceled": 0,
                "total": 3,
                "created": 0,
                "scheduled": 0,
                "deploying": 0,
                "reconciling": 0,
                "finished": 3,
                "initializing": 0,
                "failed": 0
            }
        }
    ]
}
GET [BASE_URL]/jobs/<job_id>

 

Возвращает подробную информацию о конкретной задаче Flink, включая план выполнения, вершины логического графа (vertices), подзадачи и так далее.

Пример запроса:

$ curl 'http://ka-adh-3.ru-central1.internal:8082/jobs/547cc23f3e85744bffbfcafe7c06471a'

Ответ сервера:

{
    "jid": "547cc23f3e85744bffbfcafe7c06471a",
    "name": "Flink Java Job at Sun Jun 15 23:25:44 UTC 2025",
    "isStoppable": false,
    "state": "FINISHED",
    "job-type": "BATCH",
    "start-time": 1750029945589,
    "end-time": 1750029946616,
    "duration": 1027,
    "maxParallelism": -1,
    "now": 1750029946692,
    "timestamps": {
        "FAILED": 0,
        "SUSPENDED": 0,
        "CANCELLING": 0,
        "INITIALIZING": 1750029945589,
        "FAILING": 0,
        "CANCELED": 0,
        "RUNNING": 1750029946033,
        "FINISHED": 1750029946616,
        "CREATED": 1750029945752,
        "RESTARTING": 0,
        "RECONCILING": 0
    },
    "vertices": [
        ...
    ],
    "status-counts": {
        "INITIALIZING": 0,
        "CANCELING": 0,
        "DEPLOYING": 0,
        "FAILED": 0,
        "SCHEDULED": 0,
        "RUNNING": 0,
        "RECONCILING": 0,
        "FINISHED": 3,
        "CREATED": 0,
        "CANCELED": 0
    },
    "plan": {
        ...
    }
}
GET [BASE_URL]/jobs/<job_id>/config

 

Возвращает конфигурацию определенной задачи Flink.

Пример запроса:

$ curl 'http://ka-adh-3.ru-central1.internal:8082/jobs/547cc23f3e85744bffbfcafe7c06471a/config'

Ответ сервера:

{
    "jid": "547cc23f3e85744bffbfcafe7c06471a",
    "name": "Flink Java Job at Sun Jun 15 23:25:44 UTC 2025",
    "execution-config": {
        "execution-mode": "PIPELINED",
        "restart-strategy": "Cluster level default restart strategy",
        "job-parallelism": 1,
        "object-reuse-mode": false,
        "user-config": {}
    }
}
GET [BASE_URL]/jobs/<job_id>/vertices/<vertex_id>

 

Возвращает информацию о конкретной вершине Flink-задачи.

Пример запроса:

$ curl 'http://ka-adh-3.ru-central1.internal:8082/jobs/547cc23f3e85744bffbfcafe7c06471a/vertices/33b1d60e4c0d4656355755296d6794e0'

Ответ сервера:

{
    "id": "33b1d60e4c0d4656355755296d6794e0",
    "name": "CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:97)) -> Combine (SUM(1), at main(WordCount.java:100)",
    "parallelism": 1,
    "maxParallelism": 1,
    "now": 1750029946787,
    "subtasks": [
        {
            ...
        }
    ],
    "aggregated": {
        ...
    }
}
GET [BASE_URL]/jobs/<job_id>/exceptions

 

Возвращает список исключений, возникших при выполнении задачи.

Пример запроса:

$ curl 'http://ka-adh-3.ru-central1.internal:8082/jobs/57a9afc1904b14f808c80378ee20ffeb/exceptions'

Ответ сервера:

{
    "root-exception": null,
    "timestamp": null,
    "all-exceptions": [],
    "truncated": false,
    "exceptionHistory": {
        "entries": [],
        "truncated": false
    }
}
GET [BASE_URL]/jobs/<job_id>/accumulators

 

Возвращает список объектов accumulator, задействованных для выполнения определенной задачи.

Пример запроса:

$ curl 'http://ka-adh-3.ru-central1.internal:8082/jobs/57a9afc1904b14f808c80378ee20ffeb/accumulators'

Ответ сервера:

{
    "job-accumulators": [],
    "user-task-accumulators": [
        {
            "name": "85d49c256c7a7e0370ebba1e88e8f199",
            "type": "SerializedListAccumulator",
            "value": "[[B@7b294b0f, [B@2a90f4d2, [B@2a86b8be, ..."
        }
    ],
    "serialized-user-task-accumulators": {
        "85d49c256c7a7e0370ebba1e88e8f199": "rO0ABXNyACVvcmcuYXBhY2hlLmZsaW5rLnV..."
    }
}
GET [BASE_URL]/jobs/<job_id>/vertices/<vertex_id>/subtasktimes

 

Возвращает список подзадач Flink, созданных для выполнения задачи.

Пример запроса:

$ curl 'http://ka-adh-3.ru-central1.internal:8082/jobs/547cc23f3e85744bffbfcafe7c06471a/vertices/33b1d60e4c0d4656355755296d6794e0/subtasktimes'

Ответ сервера:

{
    "id": "33b1d60e4c0d4656355755296d6794e0",
    "name": "CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:97)) -> Combine (SUM(1), at main(WordCount.java:100)",
    "now": 1750029946723,
    "subtasks": [
        {
            "subtask": 0,
            "host": "ka-adh-3",
            "endpoint": "ka-adh-3.ru-central1.internal:38443",
            "duration": 565,
            "timestamps": {
                "INITIALIZING": 1750029946391,
                "CANCELING": 0,
                "DEPLOYING": 1750029946222,
                "FAILED": 0,
                "SCHEDULED": 1750029946037,
                "RUNNING": 1750029946393,
                "RECONCILING": 0,
                "FINISHED": 1750029946602,
                "CREATED": 1750029945835,
                "CANCELED": 0
            }
        }
    ]
}
GET [BASE_URL]/jobs/<job_id>/plan

 

Возвращает план выполнения (execution plan) задачи Flink.

Пример запроса:

$ curl 'http://ka-adh-3.ru-central1.internal:8082/jobs/57a9afc1904b14f808c80378ee20ffeb/plan'

Ответ сервера:

{
    "plan": {
        "jid": "57a9afc1904b14f808c80378ee20ffeb",
        "name": "Flink Java Job at Sun Jun 15 23:49:15 UTC 2025",
        "type": "BATCH",
        "nodes": [
            {
                "id": "5b6d423820643b538f78784054364b75",
                "parallelism": 1,
                "operator": "Data Sink",
                "operator_strategy": "(none)",
                "description": "DataSink (collect())",
                "inputs": [
                    {
                        "num": 0,
                        "id": "fa8699328ec101fd063912c9734556b7",
                        "ship_strategy": "Forward",
                        "exchange": "pipelined"
                    }
                ],
                "optimizer_properties": {
                    ...
                }
            },
            {
                "id": "fa8699328ec101fd063912c9734556b7",
                "parallelism": 1,
                "operator": "GroupReduce",
                "operator_strategy": "Sorted Group Reduce",
                "description": "Reduce (SUM(1), at main(WordCount.java:100)",
                "inputs": [
                    {
                        "num": 0,
                        "id": "9d782237f8c637c037444a774f60e6e7",
                        "ship_strategy": "Hash Partition on [0]",
                        "local_strategy": "Sort (combining) on [0:ASC]",
                        "exchange": "pipelined"
                    }
                ],
                "optimizer_properties": {
                    ...
                }
            },
            {
                "id": "9d782237f8c637c037444a774f60e6e7",
                "parallelism": 1,
                "operator": "Data Source -> FlatMap -> GroupCombine",
                "operator_strategy": "(none)<br/> -&gt; FlatMap<br/> -&gt; Sorted Combine",
                "description": "DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) (org.apache.flink.api.java.io.CollectionInputFormat))<br/> -&gt; FlatMap (FlatMap at main(WordCount.java:97))<br/> -&gt; Combine (SUM(1), at main(WordCount.java:100)",
                "optimizer_properties": {
                    ...
                }
            }
        ]
    }
}
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней