Примеры использования PyFlink
PyFlink — это Python API для Flink, который позволяет создавать масштабируемые приложения для пакетной и потоковой обработки данных. "Под капотом" PyFlink использует библиотеку Py4J, которая подключается к Flink JVM и взаимодействует с объектами Flink.
PyFlink доступен в виде модуля pip.
ADH поставляется с интерпретатором Python, в котором уже установлен модуль PyFlink, а также все необходимые зависимости.
Данный интерпретатор расположен в /opt/pyflink-python/ на хостах ADH с установленными компонентами Flink.
Используя данный интерпретатор, вы можете запускать приложения PyFlink как обычные Python-скрипты, например:
$ source /opt/pyflink-python/bin/activate
$ /opt/pyflink-python/bin/python3 create_table.py
PyFlink API: DataStream vs Table
PyFlink предоставляет два различных API в зависимости от необходимого уровня абстракции:
-
Table API. Позволяет выполнять реляционные запросы к данным, аналогично использованию SQL или работе с табличными данными в Python.
-
DataStream API. Предоставляет низкоуровневый доступ к функционалу Flink. Позволяет напрямую взаимодействовать с данными состояния (state data), контрольными точками, системой восстановления и так далее. Позволяет создавать более сложные задачи Flink.
В этой статье приведены примеры работы с обоими API. Дополнительные примеры использования PyFlink доступны в GitHub. Актуальная справочная информация по PyFlink API доступна в документации Flink.
Table API
PyFlink Table API — это универсальный реляционный API для пакетной и потоковой обработки. Позволяет использовать SQL-подобный язык для запросов и операций с таблицами. Запросы выполняются одинаково для ограниченных и неограниченных потоков данных.
Первым шагом в приложении является создание объекта среды выполнения (TableEnvironment). Данный объект — основа любого приложения Table API; используется для настройки и выполнения операций с таблицами. Пример использования объекта среды показан ниже:
from pyflink.table import TableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode() (1)
t_env = TableEnvironment.create(env_settings) (2)
t_env.get_config().set("parallelism.default", "1") (3)
| 1 | Установка режима работы (потоковый или пакетный). |
| 2 | Создание объекта TableEnvironment. |
| 3 | Установка параметров конфигурации (параллелизм, восстановление, данные состояния и так далее) на уровне приложения. |
Создание source-таблицы
Используя TableEnvironment, можно создавать таблицы Flink.
Таблица Flink (объект Table) описывает пайплайн преобразования данных.
Сама таблица не хранит никаких данных, а лишь указывает, откуда данные необходимо получить (source-таблица) или куда данные необходимо сохранить после обработки (sink-таблица).
Далее показаны несколько способов создания source-таблиц.
Следующий код создает таблицу из списка Python.
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("parallelism.default", "1")
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
table_1 = t_env.from_elements(elements=mock_data, (1)
schema=['txn_id', 'acc_id', 'txn_value', 'txn_date'])
print("Inferred data types for table_1:")
table_1.print_schema()
from pyflink.table import DataTypes
table_2 = t_env.from_elements(elements=mock_data, (2)
schema=DataTypes.ROW([DataTypes.FIELD("txn_id", DataTypes.TINYINT()),
DataTypes.FIELD("acc_id", DataTypes.INT()),
DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
DataTypes.FIELD("txn_date", DataTypes.STRING())]))
print(f"The data type of 'txn_date': {table_2.get_schema().get_field_data_type('txn_date')}")
table_2.print_schema()
table_2 = table_2.select(
col("txn_id"),
col("acc_id"),
col("txn_value"),
col("txn_date").cast(DataTypes.DATE()).alias("txn_date")
)
print(f"Data type of 'txn_date' after casting:"
f"{table_2.get_schema().get_field_data_type('txn_date')}")
print("SELECT * FROM table_2:")
table_2.execute().print() (3)
| 1 | Создание таблицы, схема которой формируется автоматически на основе данных коллекции. |
| 2 | Создание таблицы с явным указанием типов данных в схеме. |
| 3 | Вывод содержимого таблицы. |
Inferred data types for table_1: ( `txn_id` BIGINT, `acc_id` BIGINT, `txn_value` DOUBLE, `txn_date` STRING ) The data type of 'txn_date': VARCHAR ( `txn_id` TINYINT, `acc_id` INT, `txn_value` DOUBLE, `txn_date` STRING ) Data type of 'txn_date' after casting:DATE SELECT * FROM table_2: +----+--------+-------------+--------------------------------+------------+ | op | txn_id | acc_id | txn_value | txn_date | +----+--------+-------------+--------------------------------+------------+ | +I | 1 | 1001 | 100.0 | 2025-01-01 | | +I | 2 | 1002 | 20.0 | 2025-01-02 | | +I | 3 | 1003 | 70.0 | 2025-01-03 | +----+--------+-------------+--------------------------------+------------+ 3 rows in set
Следующий код создает таблицу, подписанную на Kafka-топик через TableDescriptor.
from pyflink.table import EnvironmentSettings, TableEnvironment, Schema, DataTypes, TableDescriptor
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/konstantin/pyflink_demo/flink-connector-kafka-3.3.0-1.19.jar") (1)
t_env.create_temporary_table( (2)
"kafka_source_table",
TableDescriptor.for_connector("kafka")
.schema(Schema.new_builder()
.column("txn_id", DataTypes.INT())
.column("acc_id", DataTypes.INT())
.column("txn_value", DataTypes.DOUBLE())
.column("txn_date", DataTypes.DATE())
.build())
.option("properties.bootstrap.servers", "<host>:<port>")
.option("topic", "transactions_test")
.option("properties.group.id", "txn_group")
.option("scan.startup.mode", "earliest-offset")
.option("format", "json")
.option("json.fail-on-missing-field", "false")
.option("json.ignore-parse-errors", "true")
.build()
)
| 1 | Добавление JAR Kafka-коннектора в контекст задачи PyFlink. Информация об актуальной версии коннектора доступна в документации Flink. |
| 2 | Создание временной таблицы, в которую попадают данные из Kafka-топика. |
Следующий код создает таблицу на основе CSV-файла, используя SQL.
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.execute_sql("""
CREATE TABLE src_table_from_csv (
txn_id INT,
acc_id INT,
txn_value INT,
txn_date STRING
) WITH (
'connector' = 'filesystem', (1)
'format' = 'csv',
'path' = 'test.csv',
'csv.field-delimiter' = ','
)
""")
table = t_env.from_path("src_table_from_csv")
table.execute().print()
| 1 | Использование встроенного коннектора для работы с файлами. |
+----+-------------+-------------+-------------+--------------------------------+ | op | txn_id | acc_id | txn_value | txn_date | +----+-------------+-------------+-------------+--------------------------------+ | +I | 1 | 1001 | 20 | 2025-01-02 | | +I | 2 | 1002 | 110 | 2025-01-01 | | +I | 3 | 1003 | 23 | 2025-01-01 | | +I | 4 | 1002 | 50 | 2025-01-03 | | +I | 5 | 1001 | 78 | 2025-01-02 | +----+-------------+-------------+-------------+--------------------------------+
Больше информации о других способах создания таблиц в Table API доступно в документации Flink.
Запросы и операции с таблицами
Объект Table предоставляет множество методов для выполнения реляционных операций.
Актуальная справочная информация по API доступна в документации Flink.
Основные операции с таблицами показаны в следующем примере.
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.expressions import col, call
env_settings = EnvironmentSettings.in_batch_mode() (1)
t_env = TableEnvironment.create(env_settings)
txn_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-01"),
(3, 1002, 70.00, "2025-01-01"),
(4, 1001, 40.00, "2025-01-02"),
(5, 1003, 50.00, "2025-01-01"),
(6, 1001, 10.00, "2025-01-02"),
]
schema = DataTypes.ROW([
DataTypes.FIELD("txn_id", DataTypes.INT()),
DataTypes.FIELD("acc_id", DataTypes.INT()),
DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
DataTypes.FIELD("txn_date", DataTypes.STRING())
])
t_txns = t_env.from_elements(txn_data, schema)
print("Added table column:")
t_with_net_value = t_txns.add_columns( (2)
(col("txn_value") * 0.87).alias("net_value")
)
t_with_net_value.execute().print()
t_result = t_txns \ (3)
.filter(col("txn_date") != "2025-01-02") \
.group_by(col("acc_id")) \
.select(
col("acc_id"),
call("sum", col("txn_value")).alias("total_txn_value")
)
print("Filtered, groupped, and aggregated: ")
t_result.execute().print()
invoice_data = [
(1, 1, True),
(2, 4, False),
(3, 3, True),
(4, 2, True),
(5, 6, False),
(5, 5, True),
]
schema_i = DataTypes.ROW([
DataTypes.FIELD("invoice_id", DataTypes.INT()),
DataTypes.FIELD("tx_id", DataTypes.INT()),
DataTypes.FIELD("is_confirmed", DataTypes.BOOLEAN()),
])
t_invoices = t_env.from_elements(invoice_data, schema_i)
t_joined = t_invoices.join(t_txns).where(col("tx_id") == col("txn_id")) \ (4)
.select(col("txn_id"),
col("acc_id"),
col("invoice_id"),
col("is_confirmed"))
print("JOINed tables:")
t_joined.execute().print()
| 1 | Установка пакетного режима работы. |
| 2 | Добавление столбца в таблицу. |
| 3 | Выполнение цепочки операторов и функций агрегации, таких как filter(), group_by(), sum(). |
| 4 | Использование Join. |
Added table column: +-------------+-------------+--------------------------------+--------------------------------+--------------------------------+ | txn_id | acc_id | txn_value | txn_date | net_value | +-------------+-------------+--------------------------------+--------------------------------+--------------------------------+ | 1 | 1001 | 100.0 | 2025-01-01 | 87.0 | | 2 | 1002 | 20.0 | 2025-01-01 | 17.4 | | 3 | 1002 | 70.0 | 2025-01-01 | 60.9 | | 4 | 1001 | 40.0 | 2025-01-02 | 34.8 | | 5 | 1003 | 50.0 | 2025-01-01 | 43.5 | | 6 | 1001 | 10.0 | 2025-01-02 | 8.7 | +-------------+-------------+--------------------------------+--------------------------------+--------------------------------+ 6 rows in set Filtered, groupped, and aggregated: +-------------+--------------------------------+ | acc_id | total_txn_value | +-------------+--------------------------------+ | 1003 | 50.0 | | 1002 | 90.0 | | 1001 | 100.0 | +-------------+--------------------------------+ 3 rows in set JOINed tables: +-------------+-------------+-------------+--------------+ | txn_id | acc_id | invoice_id | is_confirmed | +-------------+-------------+-------------+--------------+ | 5 | 1003 | 5 | TRUE | | 2 | 1002 | 4 | TRUE | | 3 | 1002 | 3 | TRUE | | 6 | 1001 | 5 | FALSE | | 1 | 1001 | 1 | TRUE | | 4 | 1001 | 2 | FALSE | +-------------+-------------+-------------+--------------+ 6 rows in set
Операции с таблицами можно также выполнять с помощью SQL:
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql("""
CREATE TEMPORARY TABLE t_txns (
txn_id INT,
acc_id INT,
txn_value DOUBLE,
txn_date STRING
) WITH (
'connector' = 'filesystem',
'path' = 'test.csv',
'format' = 'csv'
)
""")
table_env.execute_sql("""
CREATE TABLE t_sink_print (
acc_id INT,
max_txn_value DOUBLE
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql("""
INSERT INTO t_sink_print
SELECT acc_id, MAX(txn_value) AS max_txn_value
FROM t_txns
GROUP BY acc_id
""").wait()
15> +I[1002, 110.0] 3> +I[1001, 20.0] 2> +I[1003, 23.0] 3> -U[1001, 20.0] 3> +U[1001, 78.0]
Запись данных в sink-таблицу
В следующем примере показано создание таблиц типа sink, которые сохраняют данные во внешних системах:
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("pipeline.jars",
"file:///home/konstantin/pyflink_demo/flink-sql-connector-kafka-3.3.0-1.19.jar")
mock_data = [
(1, 1001, 50.75, "2025-01-01"),
(2, 1002, 70.00, "2025-01-02"),
(3, 1001, 20.25, "2025-01-02")
]
t_txns = t_env.from_elements(
elements=mock_data,
schema=DataTypes.ROW([
DataTypes.FIELD("txn_id", DataTypes.INT()),
DataTypes.FIELD("acc_id", DataTypes.INT()),
DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
DataTypes.FIELD("txn_date", DataTypes.STRING())
])
)
t_env.create_temporary_view("txn_table", t_txns) (1)
t_env.execute_sql(""" (2)
CREATE TABLE sink_table (
txn_id INT,
acc_id INT,
txn_value DOUBLE,
txn_date STRING
) WITH (
'connector' = 'print' (3)
)
""")
t_env.execute_sql(""" (4)
INSERT INTO sink_table
SELECT * FROM txn_table
WHERE `acc_id` = 1002
""").wait()
t_env.create_temporary_table( (5)
'kafka_sink',
TableDescriptor.for_connector('kafka')
.schema(Schema.new_builder()
.column('txn_id', DataTypes.INT())
.column('acc_id', DataTypes.INT())
.column('txn_value', DataTypes.DOUBLE())
.column('txn_date', DataTypes.STRING())
.build())
.option('topic', 'transactions_test')
.option('properties.bootstrap.servers', 'localhost:9092')
.format(FormatDescriptor.for_format('json')
.build())
.build())
t_env.execute_sql("""
INSERT INTO kafka_sink
SELECT * FROM txn_table
WHERE `acc_id` = 1002
""").wait()
| 1 | Создание временного представления на основе таблицы. |
| 2 | Создание sink-таблицы. |
| 3 | Вывод содержимого таблицы в STDOUT. |
| 4 | Вызов операции INSERT, которая инициирует запись данных в приемник (sink).
Поскольку INSERT выполняется асинхронно в отдельном потоке, wait() необходим, чтобы дождаться вывода в STDOUT. |
| 5 | Создание sink-таблицы, которая использует коннектор Kafka для записи данных в Kafka-топик. |
DataStream API
PyFlink DataStream API предоставляет более низкоуровневый контроль над функциями Flink, такими как управление состоянием (state) и временем обработки, использование контрольных точек, восстановление и так далее.
Центральный объект API — DataStream, который можно представить как неизменяемую коллекцию, проходящую через один или несколько этапов преобразований. Большинство приложений включает следующие шаги:
-
Создание среды выполнения.
-
Чтение данных из источника и создание объектов
DataStream. -
Обработка
DataStreamоператорами преобразования. Каждый оператор получает на входеDataStreamи возвращает новыйDataStreamс измененными данными. -
Сохранение содержимого
DataStreamво внешней системе.
Создание среды выполнения
Точкой входа в приложение DataStream API является создание объекта среды выполнения (StreamExecutionEnvironment). Пример создания показан ниже.
menv = StreamExecutionEnvironment.get_execution_environment() (1)
menv.set_parallelism(1) (2)
menv.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# data ingest, transformation, and sink logic
menv.execute() (3)
| 1 | Создание объекта StreamExecutionEnvironment. |
| 2 | Установка параметров конфигурации (параллелизм, восстановление, параметры сериализации и прочие). |
| 3 | Запуск задачи. Также можно использовать execute_async() для асинхронного запуска задачи. |
Созданный StreamExecutionEnvironment используется для установки свойств конфигурации, создания источников (sources), приемников (sinks), операторов (operators) и для запуска задачи.
Получение данных из источника
Используя StreamExecutionEnvironment, можно создать источник Flink для чтения данных из внешней системы.
Ниже представлены несколько способов создания источников.
Следующий пример создает DataStream, используя Python-список в качестве источника данных:
from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment
config = Configuration()
config.set_string("execution.runtime-mode", "STREAMING")
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
mds.print()
menv.execute()
+I[1,1001,100.0,2025-01-01] +I[2,1002,20.0,2025-01-02] +I[3,1003,70.0,2025-01-03]
|
TIP
Элементы
|
Следующий код создает источник Kafka с помощью Kafka-коннектора:
from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
menv = StreamExecutionEnvironment.get_execution_environment()
menv.add_jars("file:///home/konstantin/pyflink_demo/flink-sql-connector-kafka-3.3.0-1.19.jar") (1)
source_kafka = KafkaSource.builder() \ (2)
.set_bootstrap_servers("<host>:<port>") \
.set_topics("m_topic") \
.set_group_id("my_group") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
mds = menv.from_source(source_kafka, (3)
WatermarkStrategy.for_monotonous_timestamps(),
"Test Kafka source")
menv.execute()
| 1 | Добавляет JAR Kafka-коннектора в контекст задачи PyFlink. Для получения актуальной версии коннектора Kafka можно обратиться к документации Flink. |
| 2 | Создание источника Kafka на основе топика Kafka. |
| 3 | Получение DataStream от источника Kafka. |
В примере ниже Flink считывает строки CSV-файла и возвращает DataStream.
Каждая строка является отдельной записью в потоке.
from pyflink.common import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
menv = StreamExecutionEnvironment.get_execution_environment()
file_source = FileSource.for_record_stream_format(
StreamFormat.text_line_format(), (1)
"test.csv"
).build()
wm_strategy = WatermarkStrategy.for_monotonous_timestamps()
ds = menv.from_source(source=file_source, (2)
source_name="test_file_source",
watermark_strategy=wm_strategy,
)
ds.print()
menv.execute()
| 1 | Использование встроенного StreamFormat, который позволяет читать файлы построчно и преобразовывать каждую строку в отдельную запись. |
| 2 | Использование встроенной реализации водяного знака (watermark), предполагающей, что записи DataStream всегда поступают в нужном порядке. |
10> 1,1001,20,2025-01-02 10> 2,1002,110,2025-01-01 10> 3,1003,23,2025-01-01 10> 4,1002,50,2025-01-03 10> 5,1001,78,2025-01-02
|
РЕКОМЕНДАЦИЯ
10> указывает на идентификатор задачи или индекс подзадачи.
|
Операторы преобразования
PyFlink предоставляет множество операторов, таких как map(), filter(), reduce() и прочих.
Каждый оператор получает на входе объект DataStream, выполняет преобразования данных и возвращает новый DataStream.
Основные операции показаны ниже:
from pyflink.common import Types, Row
from pyflink.datastream import StreamExecutionEnvironment, MapFunction
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
ds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
print("Original DataStream:")
ds.print()
ds_filtered = ds.filter(lambda txn: txn["txn_value"] > 50.0) (1)
print("Filtered DataStream:")
ds_filtered.print()
ds_mapped = ds.map( (2)
lambda txn: Row(
txn["txn_id"],
9999 if txn["acc_id"] == 1003 else txn["acc_id"],
txn["txn_value"],
txn["txn_date"]
), output_type=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
print("Lambda-mapped DataStream:")
ds_mapped.print()
class UpdateAccID(MapFunction): (3)
def map(self, value):
txn_id, acc_id, txn_value, txn_date = value
if acc_id < 2000:
acc_id = acc_id * 2
return (Row(txn_id, acc_id, txn_value, txn_date))
ds_mapped_class = ds.map(UpdateAccID(), (4)
output_type=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.FLOAT(), Types.STRING()]
))
print("MapFunction-mapped DataStream:")
ds_mapped_class.print()
menv.execute()
| 1 | Использование filter(). |
| 2 | Применение map() с лямбда-функцией. |
| 3 | Реализация класса MapFunction для map-оператора.
Метод map() вызывается для каждого элемента DataStream. |
| 4 | Выполнение трансформации map() с использованием реализации MapFunction. |
Original DataStream: +I[1,1001,100.0,2025-01-01] +I[2,1002,20.0,2025-01-02] +I[3,1003,70.0,2025-01-03] Filtered DataStream: +I[1,1001,100.0,2025-01-01] +I[3,1003,70.0,2025-01-03] Lambda-mapped DataStream: +I[1,1001,100.0,2025-01-01] +I[2,1002,20.0,2025-01-02] +I[3,9999,70.0,2025-01-03] Custom MapFunction-mapped DataStream: +I[1,2002,100.0,2025-01-01] +I[2,2004,20.0,2025-01-02] +I[3,2006,70.0,2025-01-03]
|
Примечание
Строки вывода могут быть перемешаны, поскольку каждый оператор DataStream отправляет данные в STDOUT из отдельного потока.
|
Запись данных в приемник
Результаты вычислений Flink можно записать во внешнюю систему с помощью приемника (sink).
В следующем примере содержимое DataStream записывается в файл с помощью коннектора FileSystem.
from pyflink.common import Types, Encoder
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy
OUTPUT_PATH = "file:///home/konstantin/pyflink_demo/fs_sink"
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
fs_sink = FileSink.for_row_format(OUTPUT_PATH, (1)
Encoder.simple_string_encoder("UTF-8")) \
.with_rolling_policy(RollingPolicy.default_rolling_policy( (2)
part_size=1024 ** 3,
rollover_interval=15 * 60 * 1000,
inactivity_interval=5 * 60 * 1000)) \
.build()
mds.sink_to(fs_sink) (3)
menv.execute()
| 1 | Создание приемника типа FileSink с использованием формата построчного кодирования (row-encoded format). |
| 2 | Установка политики ротации. |
| 3 | Запись содержимого DataStream в приемник. |
Данный пример создает директорию, указанную в поле OUTPUT_PATH, например:
/home/konstantin/pyflink_demo/fs_sink
└─ /2025-04-08--13
└─ part-4a8b2a20-187d-4ad6-ae7a-8f0e6dc880c5-0
Каждая запись DataStream записывается в файл в виде новой строки:
+I[1, 1001, 100.0, 2025-01-01] +I[2, 1002, 20.0, 2025-01-02] +I[3, 1003, 70.0, 2025-01-03]
Для записи данных в формате ORC, Parquet или Avro используйте bulk-encoded форматы.
Ниже показан пример задачи, которая записывает данные в файл в формате ORC, используя FileSink.
from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink
from pyflink.datastream.formats.orc import OrcBulkWriters
from pyflink.table import DataTypes
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
menv.add_jars("file:///home/konstantin/pyflink_demo/flink-sql-orc-1.19.2.jar") (1)
OUTPUT_PATH = "file:///home/konstantin/pyflink_demo/fs_sink_orc"
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data,
type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
row_type = DataTypes.ROW([
DataTypes.FIELD('txn_id', DataTypes.INT()),
DataTypes.FIELD('acc_id', DataTypes.INT()),
DataTypes.FIELD('txn_value', DataTypes.DOUBLE()),
DataTypes.FIELD('txn_date', DataTypes.STRING()),
])
file_sink_orc = FileSink.for_bulk_format(
OUTPUT_PATH,
OrcBulkWriters.for_row_type( (2)
row_type=row_type,
writer_properties=Configuration(),
hadoop_config=Configuration(),
)
).build()
mds.sink_to(file_sink_orc)
menv.execute()
| 1 | Добавление зависимостей для работы с ORC. |
| 2 | Использование формата ORC с FileSink. |
Следующий код направляет содержимое DataStream в топик Kafka, используя приемник Kafka.
Соединение с сервером устанавливается с помощью коннектора.
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
menv.add_jars("file:///D:/dev/py/pyflink/flink-sql-connector-kafka-3.3.0-1.19.jar") (1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
kafka_sink = KafkaSink.builder() \ (2)
.set_bootstrap_servers("<host>:<port>") \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("m_topic")
.set_value_serialization_schema(SimpleStringSchema())
.build()
) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.build()
mds.sink_to(kafka_sink) (3)
menv.execute()
| 1 | Добавление коннектора Kafka в контекст задачи PyFlink. Актуальная версия коннектора доступна в документации Flink. |
| 2 | Создание объекта приемника, который подключается к брокеру Kafka, используя значения "<host>:<port>". |
| 3 | Запись содержимого DataStream в приемник Kafka. |
Управление зависимостями
В приложениях PyFlink могут использоваться сторонние зависимости, например: библиотеки Python, JAR-файлы коннекторов, фреймворки ML и прочие. Способы их импорта отличаются для Table API и DataStream API и показаны ниже.
Зависимости JAR
table_env.get_config().set("pipeline.jars", "file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
table_env.get_config().set("pipeline.classpaths", "file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
stream_ex_env.add_jars("file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
stream_ex_env.add_classpaths("file:///path/to/myJar1.jar", "file:///path/to/myJar2.jar")
$ flink run \
--python test_flink_job.py \
--jarfile path/to/myJar1.jar