Примеры использования PyFlink

PyFlink — это Python API для Flink, который позволяет создавать масштабируемые приложения для пакетной и потоковой обработки данных. "Под капотом" PyFlink использует библиотеку Py4J, которая подключается к Flink JVM и взаимодействует с объектами Flink.

PyFlink доступен в виде модуля pip. ADH поставляется с интерпретатором Python, в котором уже установлен модуль PyFlink, а также все необходимые зависимости. Данный интерпретатор расположен в /opt/pyflink-python/ на хостах ADH с установленными компонентами Flink. Используя данный интерпретатор, вы можете запускать приложения PyFlink как обычные Python-скрипты, например:

$ source /opt/pyflink-python/bin/activate
$ /opt/pyflink-python/bin/python3 create_table.py

PyFlink предоставляет два различных API в зависимости от необходимого уровня абстракции:

  • Table API. Позволяет выполнять реляционные запросы к данным, аналогично использованию SQL или работе с табличными данными в Python.

  • DataStream API. Предоставляет низкоуровневый доступ к функционалу Flink. Позволяет напрямую взаимодействовать с данными состояния (state data), контрольными точками, системой восстановления и так далее. Позволяет создавать более сложные задачи Flink.

В этой статье приведены примеры работы с обоими API. Дополнительные примеры использования PyFlink доступны в GitHub. Актуальная справочная информация по PyFlink API доступна в документации Flink.

Table API

PyFlink Table API — это универсальный реляционный API для пакетной и потоковой обработки. Позволяет использовать SQL-подобный язык для запросов и операций с таблицами. Запросы выполняются одинаково для ограниченных и неограниченных потоков данных.

Первым шагом в приложении является создание объекта среды выполнения (TableEnvironment). Данный объект — основа любого приложения Table API; используется для настройки и выполнения операций с таблицами. Пример использования объекта среды показан ниже:

from pyflink.table import TableEnvironment, EnvironmentSettings

env_settings = EnvironmentSettings.in_streaming_mode() (1)
t_env = TableEnvironment.create(env_settings) (2)
t_env.get_config().set("parallelism.default", "1") (3)
1 Установка режима работы (потоковый или пакетный).
2 Создание объекта TableEnvironment.
3 Установка параметров конфигурации (параллелизм, восстановление, данные состояния и так далее) на уровне приложения.

Создание source-таблицы

Используя TableEnvironment, можно создавать таблицы Flink. Таблица Flink (объект Table) описывает пайплайн преобразования данных. Сама таблица не хранит никаких данных, а лишь указывает, откуда данные необходимо получить (source-таблица) или куда данные необходимо сохранить после обработки (sink-таблица).

Далее показаны несколько способов создания source-таблиц.

  • Из коллекции

  • Из топика Kafka

  • Из файла, используя DDL

Следующий код создает таблицу из списка Python.

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("parallelism.default", "1")

mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
table_1 = t_env.from_elements(elements=mock_data, (1)
                              schema=['txn_id', 'acc_id', 'txn_value', 'txn_date'])
print("Inferred data types for table_1:")
table_1.print_schema()

from pyflink.table import DataTypes
table_2 = t_env.from_elements(elements=mock_data, (2)
                              schema=DataTypes.ROW([DataTypes.FIELD("txn_id", DataTypes.TINYINT()),
                                               DataTypes.FIELD("acc_id", DataTypes.INT()),
                                               DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
                                               DataTypes.FIELD("txn_date", DataTypes.STRING())]))
print(f"The data type of 'txn_date': {table_2.get_schema().get_field_data_type('txn_date')}")
table_2.print_schema()
table_2 = table_2.select(
    col("txn_id"),
    col("acc_id"),
    col("txn_value"),
    col("txn_date").cast(DataTypes.DATE()).alias("txn_date")
)
print(f"Data type of 'txn_date' after casting:"
      f"{table_2.get_schema().get_field_data_type('txn_date')}")
print("SELECT * FROM table_2:")
table_2.execute().print() (3)
1 Создание таблицы, схема которой формируется автоматически на основе данных коллекции.
2 Создание таблицы с явным указанием типов данных в схеме.
3 Вывод содержимого таблицы.
Пример вывода
Inferred data types for table_1:
(
  `txn_id` BIGINT,
  `acc_id` BIGINT,
  `txn_value` DOUBLE,
  `txn_date` STRING
)
The data type of 'txn_date': VARCHAR
(
  `txn_id` TINYINT,
  `acc_id` INT,
  `txn_value` DOUBLE,
  `txn_date` STRING
)
Data type of 'txn_date' after casting:DATE
SELECT * FROM table_2:
+----+--------+-------------+--------------------------------+------------+
| op | txn_id |      acc_id |                      txn_value |   txn_date |
+----+--------+-------------+--------------------------------+------------+
| +I |      1 |        1001 |                          100.0 | 2025-01-01 |
| +I |      2 |        1002 |                           20.0 | 2025-01-02 |
| +I |      3 |        1003 |                           70.0 | 2025-01-03 |
+----+--------+-------------+--------------------------------+------------+
3 rows in set

Следующий код создает таблицу, подписанную на Kafka-топик через TableDescriptor.

from pyflink.table import EnvironmentSettings, TableEnvironment, Schema, DataTypes, TableDescriptor

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/konstantin/pyflink_demo/flink-connector-kafka-3.3.0-1.19.jar") (1)

t_env.create_temporary_table( (2)
    "kafka_source_table",
    TableDescriptor.for_connector("kafka")
    .schema(Schema.new_builder()
            .column("txn_id", DataTypes.INT())
            .column("acc_id", DataTypes.INT())
            .column("txn_value", DataTypes.DOUBLE())
            .column("txn_date", DataTypes.DATE())
            .build())
    .option("properties.bootstrap.servers", "<host>:<port>")
    .option("topic", "transactions_test")
    .option("properties.group.id", "txn_group")
    .option("scan.startup.mode", "earliest-offset")
    .option("format", "json")
    .option("json.fail-on-missing-field", "false")
    .option("json.ignore-parse-errors", "true")
    .build()
)
1 Добавление JAR Kafka-коннектора в контекст задачи PyFlink. Информация об актуальной версии коннектора доступна в документации Flink.
2 Создание временной таблицы, в которую попадают данные из Kafka-топика.

Следующий код создает таблицу на основе CSV-файла, используя SQL.

from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

t_env.execute_sql("""
CREATE TABLE src_table_from_csv (
    txn_id INT,
    acc_id INT,
    txn_value INT,
    txn_date STRING
) WITH (
    'connector' = 'filesystem', (1)
    'format' = 'csv',
    'path' = 'test.csv',
    'csv.field-delimiter' = ','
)
""")

table = t_env.from_path("src_table_from_csv")
table.execute().print()
1 Использование встроенного коннектора для работы с файлами.
Пример вывода
+----+-------------+-------------+-------------+--------------------------------+
| op |      txn_id |      acc_id |   txn_value |                       txn_date |
+----+-------------+-------------+-------------+--------------------------------+
| +I |           1 |        1001 |          20 |                     2025-01-02 |
| +I |           2 |        1002 |         110 |                     2025-01-01 |
| +I |           3 |        1003 |          23 |                     2025-01-01 |
| +I |           4 |        1002 |          50 |                     2025-01-03 |
| +I |           5 |        1001 |          78 |                     2025-01-02 |
+----+-------------+-------------+-------------+--------------------------------+

Больше информации о других способах создания таблиц в Table API доступно в документации Flink.

Запросы и операции с таблицами

Объект Table предоставляет множество методов для выполнения реляционных операций. Актуальная справочная информация по API доступна в документации Flink. Основные операции с таблицами показаны в следующем примере.

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.expressions import col, call

env_settings = EnvironmentSettings.in_batch_mode() (1)
t_env = TableEnvironment.create(env_settings)

txn_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-01"),
    (3, 1002, 70.00, "2025-01-01"),
    (4, 1001, 40.00, "2025-01-02"),
    (5, 1003, 50.00, "2025-01-01"),
    (6, 1001, 10.00, "2025-01-02"),
]

schema = DataTypes.ROW([
    DataTypes.FIELD("txn_id", DataTypes.INT()),
    DataTypes.FIELD("acc_id", DataTypes.INT()),
    DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
    DataTypes.FIELD("txn_date", DataTypes.STRING())
])

t_txns = t_env.from_elements(txn_data, schema)
print("Added table column:")
t_with_net_value = t_txns.add_columns( (2)
    (col("txn_value") * 0.87).alias("net_value")
)
t_with_net_value.execute().print()

t_result = t_txns \ (3)
    .filter(col("txn_date") != "2025-01-02") \
    .group_by(col("acc_id")) \
    .select(
    col("acc_id"),
    call("sum", col("txn_value")).alias("total_txn_value")
)
print("Filtered, groupped, and aggregated: ")
t_result.execute().print()

invoice_data = [
    (1, 1, True),
    (2, 4, False),
    (3, 3, True),
    (4, 2, True),
    (5, 6, False),
    (5, 5, True),
]
schema_i = DataTypes.ROW([
    DataTypes.FIELD("invoice_id", DataTypes.INT()),
    DataTypes.FIELD("tx_id", DataTypes.INT()),
    DataTypes.FIELD("is_confirmed", DataTypes.BOOLEAN()),
])

t_invoices = t_env.from_elements(invoice_data, schema_i)
t_joined = t_invoices.join(t_txns).where(col("tx_id") == col("txn_id")) \ (4)
    .select(col("txn_id"),
            col("acc_id"),
            col("invoice_id"),
            col("is_confirmed"))
print("JOINed tables:")
t_joined.execute().print()
1 Установка пакетного режима работы.
2 Добавление столбца в таблицу.
3 Выполнение цепочки операторов и функций агрегации, таких как filter(), group_by(), sum().
4 Использование Join.
Пример вывода
Added table column:
+-------------+-------------+--------------------------------+--------------------------------+--------------------------------+
|      txn_id |      acc_id |                      txn_value |                       txn_date |                      net_value |
+-------------+-------------+--------------------------------+--------------------------------+--------------------------------+
|           1 |        1001 |                          100.0 |                     2025-01-01 |                           87.0 |
|           2 |        1002 |                           20.0 |                     2025-01-01 |                           17.4 |
|           3 |        1002 |                           70.0 |                     2025-01-01 |                           60.9 |
|           4 |        1001 |                           40.0 |                     2025-01-02 |                           34.8 |
|           5 |        1003 |                           50.0 |                     2025-01-01 |                           43.5 |
|           6 |        1001 |                           10.0 |                     2025-01-02 |                            8.7 |
+-------------+-------------+--------------------------------+--------------------------------+--------------------------------+
6 rows in set
Filtered, groupped, and aggregated:
+-------------+--------------------------------+
|      acc_id |                total_txn_value |
+-------------+--------------------------------+
|        1003 |                           50.0 |
|        1002 |                           90.0 |
|        1001 |                          100.0 |
+-------------+--------------------------------+
3 rows in set
JOINed tables:
+-------------+-------------+-------------+--------------+
|      txn_id |      acc_id |  invoice_id | is_confirmed |
+-------------+-------------+-------------+--------------+
|           5 |        1003 |           5 |         TRUE |
|           2 |        1002 |           4 |         TRUE |
|           3 |        1002 |           3 |         TRUE |
|           6 |        1001 |           5 |        FALSE |
|           1 |        1001 |           1 |         TRUE |
|           4 |        1001 |           2 |        FALSE |
+-------------+-------------+-------------+--------------+
6 rows in set

Операции с таблицами можно также выполнять с помощью SQL:

from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_env.execute_sql("""
    CREATE TEMPORARY TABLE t_txns (
        txn_id INT,
        acc_id INT,
        txn_value DOUBLE,
        txn_date STRING
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'test.csv',
        'format' = 'csv'
    )
""")

table_env.execute_sql("""
    CREATE TABLE t_sink_print (
        acc_id INT,
        max_txn_value DOUBLE
    ) WITH (
        'connector' = 'print'
    )
""")

table_env.execute_sql("""
    INSERT INTO t_sink_print
    SELECT acc_id, MAX(txn_value) AS max_txn_value
    FROM t_txns
    GROUP BY acc_id
""").wait()
Пример вывода
15> +I[1002, 110.0]
3> +I[1001, 20.0]
2> +I[1003, 23.0]
3> -U[1001, 20.0]
3> +U[1001, 78.0]

Запись данных в sink-таблицу

В следующем примере показано создание таблиц типа sink, которые сохраняют данные во внешних системах:

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("pipeline.jars",
                       "file:///home/konstantin/pyflink_demo/flink-sql-connector-kafka-3.3.0-1.19.jar")
mock_data = [
    (1, 1001, 50.75, "2025-01-01"),
    (2, 1002, 70.00, "2025-01-02"),
    (3, 1001, 20.25, "2025-01-02")
]
t_txns = t_env.from_elements(
    elements=mock_data,
    schema=DataTypes.ROW([
        DataTypes.FIELD("txn_id", DataTypes.INT()),
        DataTypes.FIELD("acc_id", DataTypes.INT()),
        DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
        DataTypes.FIELD("txn_date", DataTypes.STRING())
    ])
)
t_env.create_temporary_view("txn_table", t_txns) (1)

t_env.execute_sql(""" (2)
    CREATE TABLE sink_table (
        txn_id INT,
        acc_id INT,
        txn_value DOUBLE,
        txn_date STRING
    ) WITH (
        'connector' = 'print' (3)
    )
""")

t_env.execute_sql(""" (4)
    INSERT INTO sink_table
    SELECT * FROM txn_table
    WHERE `acc_id` = 1002
""").wait()

t_env.create_temporary_table( (5)
    'kafka_sink',
    TableDescriptor.for_connector('kafka')
    .schema(Schema.new_builder()
            .column('txn_id', DataTypes.INT())
            .column('acc_id', DataTypes.INT())
            .column('txn_value', DataTypes.DOUBLE())
            .column('txn_date', DataTypes.STRING())
            .build())
    .option('topic', 'transactions_test')
    .option('properties.bootstrap.servers', 'localhost:9092')
    .format(FormatDescriptor.for_format('json')
            .build())
    .build())

t_env.execute_sql("""
    INSERT INTO kafka_sink
    SELECT * FROM txn_table
    WHERE `acc_id` = 1002
""").wait()
1 Создание временного представления на основе таблицы.
2 Создание sink-таблицы.
3 Вывод содержимого таблицы в STDOUT.
4 Вызов операции INSERT, которая инициирует запись данных в приемник (sink). Поскольку INSERT выполняется асинхронно в отдельном потоке, wait() необходим, чтобы дождаться вывода в STDOUT.
5 Создание sink-таблицы, которая использует коннектор Kafka для записи данных в Kafka-топик.

DataStream API

PyFlink DataStream API предоставляет более низкоуровневый контроль над функциями Flink, такими как управление состоянием (state) и временем обработки, использование контрольных точек, восстановление и так далее.

Центральный объект API — DataStream, который можно представить как неизменяемую коллекцию, проходящую через один или несколько этапов преобразований. Большинство приложений включает следующие шаги:

  1. Создание среды выполнения.

  2. Чтение данных из источника и создание объектов DataStream.

  3. Обработка DataStream операторами преобразования. Каждый оператор получает на входе DataStream и возвращает новый DataStream с измененными данными.

  4. Сохранение содержимого DataStream во внешней системе.

Создание среды выполнения

Точкой входа в приложение DataStream API является создание объекта среды выполнения (StreamExecutionEnvironment). Пример создания показан ниже.

menv = StreamExecutionEnvironment.get_execution_environment() (1)
menv.set_parallelism(1) (2)
menv.set_runtime_mode(RuntimeExecutionMode.STREAMING)

# data ingest, transformation, and sink logic

menv.execute() (3)
1 Создание объекта StreamExecutionEnvironment.
2 Установка параметров конфигурации (параллелизм, восстановление, параметры сериализации и прочие).
3 Запуск задачи. Также можно использовать execute_async() для асинхронного запуска задачи.

Созданный StreamExecutionEnvironment используется для установки свойств конфигурации, создания источников (sources), приемников (sinks), операторов (operators) и для запуска задачи.

Получение данных из источника

Используя StreamExecutionEnvironment, можно создать источник Flink для чтения данных из внешней системы. Ниже представлены несколько способов создания источников.

  • Из коллекции

  • Из Kafka-топика

  • Из файла

Следующий пример создает DataStream, используя Python-список в качестве источника данных:

from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment

config = Configuration()
config.set_string("execution.runtime-mode", "STREAMING")

menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)

mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
       ["txn_id", "acc_id", "txn_value", "txn_date"],
       [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
mds.print()

menv.execute()
Пример вывода
+I[1,1001,100.0,2025-01-01]
+I[2,1002,20.0,2025-01-02]
+I[3,1003,70.0,2025-01-03]
TIP

Элементы +I, +U, -U и -D означают операции над данными, а именно:

  • +I — добавление новой строки.

  • +U — предыдущая версия строки (удаляемая).

  • -U — новая версия строки (вставляемая).

  • -D — строка была удалена.

Следующий код создает источник Kafka с помощью Kafka-коннектора:

from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer

menv = StreamExecutionEnvironment.get_execution_environment()
menv.add_jars("file:///home/konstantin/pyflink_demo/flink-sql-connector-kafka-3.3.0-1.19.jar") (1)

source_kafka = KafkaSource.builder() \ (2)
    .set_bootstrap_servers("<host>:<port>") \
    .set_topics("m_topic") \
    .set_group_id("my_group") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()
mds = menv.from_source(source_kafka, (3)
                      WatermarkStrategy.for_monotonous_timestamps(),
                      "Test Kafka source")

menv.execute()
1 Добавляет JAR Kafka-коннектора в контекст задачи PyFlink. Для получения актуальной версии коннектора Kafka можно обратиться к документации Flink.
2 Создание источника Kafka на основе топика Kafka.
3 Получение DataStream от источника Kafka.

В примере ниже Flink считывает строки CSV-файла и возвращает DataStream. Каждая строка является отдельной записью в потоке.

from pyflink.common import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat

menv = StreamExecutionEnvironment.get_execution_environment()
file_source = FileSource.for_record_stream_format(
    StreamFormat.text_line_format(), (1)
    "test.csv"
).build()
wm_strategy = WatermarkStrategy.for_monotonous_timestamps()

ds = menv.from_source(source=file_source, (2)
                    source_name="test_file_source",
                    watermark_strategy=wm_strategy,

)
ds.print()
menv.execute()
1 Использование встроенного StreamFormat, который позволяет читать файлы построчно и преобразовывать каждую строку в отдельную запись.
2 Использование встроенной реализации водяного знака (watermark), предполагающей, что записи DataStream всегда поступают в нужном порядке.
Пример вывода

 

10> 1,1001,20,2025-01-02
10> 2,1002,110,2025-01-01
10> 3,1003,23,2025-01-01
10> 4,1002,50,2025-01-03
10> 5,1001,78,2025-01-02
РЕКОМЕНДАЦИЯ
10> указывает на идентификатор задачи или индекс подзадачи.

Операторы преобразования

PyFlink предоставляет множество операторов, таких как map(), filter(), reduce() и прочих. Каждый оператор получает на входе объект DataStream, выполняет преобразования данных и возвращает новый DataStream. Основные операции показаны ниже:

from pyflink.common import Types, Row
from pyflink.datastream import StreamExecutionEnvironment, MapFunction

menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
ds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
    ["txn_id", "acc_id", "txn_value", "txn_date"],
    [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
print("Original DataStream:")
ds.print()

ds_filtered = ds.filter(lambda txn: txn["txn_value"] > 50.0) (1)
print("Filtered DataStream:")
ds_filtered.print()

ds_mapped = ds.map( (2)
    lambda txn: Row(
        txn["txn_id"],
        9999 if txn["acc_id"] == 1003 else txn["acc_id"],
        txn["txn_value"],
        txn["txn_date"]
    ), output_type=Types.ROW_NAMED(
        ["txn_id", "acc_id", "txn_value", "txn_date"],
        [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
print("Lambda-mapped DataStream:")
ds_mapped.print()

class UpdateAccID(MapFunction): (3)
    def map(self, value):
        txn_id, acc_id, txn_value, txn_date = value
        if acc_id < 2000:
            acc_id = acc_id * 2
        return (Row(txn_id, acc_id, txn_value, txn_date))

ds_mapped_class = ds.map(UpdateAccID(), (4)
                         output_type=Types.ROW_NAMED(
                             ["txn_id", "acc_id", "txn_value", "txn_date"],
                             [Types.INT(), Types.INT(), Types.FLOAT(), Types.STRING()]
                         ))
print("MapFunction-mapped DataStream:")
ds_mapped_class.print()

menv.execute()
1 Использование filter().
2 Применение map() с лямбда-функцией.
3 Реализация класса MapFunction для map-оператора. Метод map() вызывается для каждого элемента DataStream.
4 Выполнение трансформации map() с использованием реализации MapFunction.
Пример вывода
Original DataStream:
+I[1,1001,100.0,2025-01-01]
+I[2,1002,20.0,2025-01-02]
+I[3,1003,70.0,2025-01-03]

Filtered DataStream:
+I[1,1001,100.0,2025-01-01]
+I[3,1003,70.0,2025-01-03]

Lambda-mapped DataStream:
+I[1,1001,100.0,2025-01-01]
+I[2,1002,20.0,2025-01-02]
+I[3,9999,70.0,2025-01-03]

Custom MapFunction-mapped DataStream:
+I[1,2002,100.0,2025-01-01]
+I[2,2004,20.0,2025-01-02]
+I[3,2006,70.0,2025-01-03]
Примечание
Строки вывода могут быть перемешаны, поскольку каждый оператор DataStream отправляет данные в STDOUT из отдельного потока.

Запись данных в приемник

Результаты вычислений Flink можно записать во внешнюю систему с помощью приемника (sink).

  • Запись в файл

  • Запись в файл ORC

  • Запись в Kafka-топик

В следующем примере содержимое DataStream записывается в файл с помощью коннектора FileSystem.

from pyflink.common import Types, Encoder
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy

OUTPUT_PATH = "file:///home/konstantin/pyflink_demo/fs_sink"
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
    ["txn_id", "acc_id", "txn_value", "txn_date"],
    [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
fs_sink = FileSink.for_row_format(OUTPUT_PATH, (1)
                                  Encoder.simple_string_encoder("UTF-8")) \
    .with_rolling_policy(RollingPolicy.default_rolling_policy( (2)
    part_size=1024 ** 3,
    rollover_interval=15 * 60 * 1000,
    inactivity_interval=5 * 60 * 1000)) \
    .build()

mds.sink_to(fs_sink) (3)
menv.execute()
1 Создание приемника типа FileSink с использованием формата построчного кодирования (row-encoded format).
2 Установка политики ротации.
3 Запись содержимого DataStream в приемник.

Данный пример создает директорию, указанную в поле OUTPUT_PATH, например:

/home/konstantin/pyflink_demo/fs_sink
└─ /2025-04-08--13
    └─ part-4a8b2a20-187d-4ad6-ae7a-8f0e6dc880c5-0

Каждая запись DataStream записывается в файл в виде новой строки:

+I[1, 1001, 100.0, 2025-01-01]
+I[2, 1002, 20.0, 2025-01-02]
+I[3, 1003, 70.0, 2025-01-03]

Для записи данных в формате ORC, Parquet или Avro используйте bulk-encoded форматы. Ниже показан пример задачи, которая записывает данные в файл в формате ORC, используя FileSink.

from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink
from pyflink.datastream.formats.orc import OrcBulkWriters
from pyflink.table import DataTypes

menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
menv.add_jars("file:///home/konstantin/pyflink_demo/flink-sql-orc-1.19.2.jar") (1)
OUTPUT_PATH = "file:///home/konstantin/pyflink_demo/fs_sink_orc"
mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data,
                           type_info=Types.ROW_NAMED(
                               ["txn_id", "acc_id", "txn_value", "txn_date"],
                               [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))

row_type = DataTypes.ROW([
    DataTypes.FIELD('txn_id', DataTypes.INT()),
    DataTypes.FIELD('acc_id', DataTypes.INT()),
    DataTypes.FIELD('txn_value', DataTypes.DOUBLE()),
    DataTypes.FIELD('txn_date', DataTypes.STRING()),
])

file_sink_orc = FileSink.for_bulk_format(
    OUTPUT_PATH,
    OrcBulkWriters.for_row_type( (2)
        row_type=row_type,
        writer_properties=Configuration(),
        hadoop_config=Configuration(),
    )
).build()

mds.sink_to(file_sink_orc)
menv.execute()
1 Добавление зависимостей для работы с ORC.
2 Использование формата ORC с FileSink.

Следующий код направляет содержимое DataStream в топик Kafka, используя приемник Kafka. Соединение с сервером устанавливается с помощью коннектора.

menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
menv.add_jars("file:///D:/dev/py/pyflink/flink-sql-connector-kafka-3.3.0-1.19.jar") (1)
mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
       ["txn_id", "acc_id", "txn_value", "txn_date"],
       [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))

kafka_sink = KafkaSink.builder() \ (2)
    .set_bootstrap_servers("<host>:<port>") \
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
            .set_topic("m_topic")
            .set_value_serialization_schema(SimpleStringSchema())
            .build()
    ) \
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
    .build()

mds.sink_to(kafka_sink) (3)
menv.execute()
1 Добавление коннектора Kafka в контекст задачи PyFlink. Актуальная версия коннектора доступна в документации Flink.
2 Создание объекта приемника, который подключается к брокеру Kafka, используя значения "<host>:<port>".
3 Запись содержимого DataStream в приемник Kafka.

Управление зависимостями

В приложениях PyFlink могут использоваться сторонние зависимости, например: библиотеки Python, JAR-файлы коннекторов, фреймворки ML и прочие. Способы их импорта отличаются для Table API и DataStream API и показаны ниже.

Зависимости JAR

  • Table API

  • DataStream API

  • Параметр CLI

table_env.get_config().set("pipeline.jars", "file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
table_env.get_config().set("pipeline.classpaths", "file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
stream_ex_env.add_jars("file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
stream_ex_env.add_classpaths("file:///path/to/myJar1.jar", "file:///path/to/myJar2.jar")
$ flink run \
      --python test_flink_job.py \
      --jarfile path/to/myJar1.jar

Зависимости Python

  • Table API

  • DataStream API

  • Параметр CLI

table_env.add_python_file("path/to/myScript.py")
stream_execution_environment.add_python_file("path/to/myScript.py")
$ flink run \
      --python test_flink_job.py \
      --pyFiles file:///path/to/test.py
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней