Примеры использования PyFlink
PyFlink — это Python API для Flink, который позволяет создавать масштабируемые приложения для пакетной и потоковой обработки данных. "Под капотом" PyFlink использует библиотеку Py4J, которая подключается к Flink JVM и взаимодействует с объектами Flink.
PyFlink доступен в виде модуля pip
.
ADH поставляется с интерпретатором Python, в котором уже установлен модуль PyFlink, а также все необходимые зависимости.
Данный интерпретатор расположен в /opt/pyflink-python/ на хостах ADH с установленными компонентами Flink.
Используя данный интерпретатор, вы можете запускать приложения PyFlink как обычные Python-скрипты, например:
$ source /opt/pyflink-python/bin/activate
$ /opt/pyflink-python/bin/python3 create_table.py
PyFlink API: DataStream vs Table
PyFlink предоставляет два различных API в зависимости от необходимого уровня абстракции:
-
Table API. Позволяет выполнять реляционные запросы к данным, аналогично использованию SQL или работе с табличными данными в Python.
-
DataStream API. Предоставляет низкоуровневый доступ к функционалу Flink. Позволяет напрямую взаимодействовать с данными состояния (state data), контрольными точками, системой восстановления и так далее. Позволяет создавать более сложные задачи Flink.
В этой статье приведены примеры работы с обоими API. Дополнительные примеры использования PyFlink доступны в GitHub. Актуальная справочная информация по PyFlink API доступна в документации Flink.
Table API
PyFlink Table API — это универсальный реляционный API для пакетной и потоковой обработки. Позволяет использовать SQL-подобный язык для запросов и операций с таблицами. Запросы выполняются одинаково для ограниченных и неограниченных потоков данных.
Первым шагом в приложении является создание объекта среды выполнения (TableEnvironment). Данный объект — основа любого приложения Table API; используется для настройки и выполнения операций с таблицами. Пример использования объекта среды показан ниже:
from pyflink.table import TableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode() (1)
t_env = TableEnvironment.create(env_settings) (2)
t_env.get_config().set("parallelism.default", "1") (3)
1 | Установка режима работы (потоковый или пакетный). |
2 | Создание объекта TableEnvironment . |
3 | Установка параметров конфигурации (параллелизм, восстановление, данные состояния и так далее) на уровне приложения. |
Создание source-таблицы
Используя TableEnvironment
, можно создавать таблицы Flink.
Таблица Flink (объект Table
) описывает пайплайн преобразования данных.
Сама таблица не хранит никаких данных, а лишь указывает, откуда данные необходимо получить (source-таблица) или куда данные необходимо сохранить после обработки (sink-таблица).
Далее показаны несколько способов создания source-таблиц.
Следующий код создает таблицу из списка Python.
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("parallelism.default", "1")
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
table_1 = t_env.from_elements(elements=mock_data, (1)
schema=['txn_id', 'acc_id', 'txn_value', 'txn_date'])
print("Inferred data types for table_1:")
table_1.print_schema()
from pyflink.table import DataTypes
table_2 = t_env.from_elements(elements=mock_data, (2)
schema=DataTypes.ROW([DataTypes.FIELD("txn_id", DataTypes.TINYINT()),
DataTypes.FIELD("acc_id", DataTypes.INT()),
DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
DataTypes.FIELD("txn_date", DataTypes.STRING())]))
print(f"The data type of 'txn_date': {table_2.get_schema().get_field_data_type('txn_date')}")
table_2.print_schema()
table_2 = table_2.select(
col("txn_id"),
col("acc_id"),
col("txn_value"),
col("txn_date").cast(DataTypes.DATE()).alias("txn_date")
)
print(f"Data type of 'txn_date' after casting:"
f"{table_2.get_schema().get_field_data_type('txn_date')}")
print("SELECT * FROM table_2:")
table_2.execute().print() (3)
1 | Создание таблицы, схема которой формируется автоматически на основе данных коллекции. |
2 | Создание таблицы с явным указанием типов данных в схеме. |
3 | Вывод содержимого таблицы. |
Inferred data types for table_1: ( `txn_id` BIGINT, `acc_id` BIGINT, `txn_value` DOUBLE, `txn_date` STRING ) The data type of 'txn_date': VARCHAR ( `txn_id` TINYINT, `acc_id` INT, `txn_value` DOUBLE, `txn_date` STRING ) Data type of 'txn_date' after casting:DATE SELECT * FROM table_2: +----+--------+-------------+--------------------------------+------------+ | op | txn_id | acc_id | txn_value | txn_date | +----+--------+-------------+--------------------------------+------------+ | +I | 1 | 1001 | 100.0 | 2025-01-01 | | +I | 2 | 1002 | 20.0 | 2025-01-02 | | +I | 3 | 1003 | 70.0 | 2025-01-03 | +----+--------+-------------+--------------------------------+------------+ 3 rows in set
Следующий код создает таблицу, подписанную на Kafka-топик через TableDescriptor.
from pyflink.table import EnvironmentSettings, TableEnvironment, Schema, DataTypes, TableDescriptor
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/konstantin/pyflink_demo/flink-connector-kafka-3.3.0-1.19.jar") (1)
t_env.create_temporary_table( (2)
"kafka_source_table",
TableDescriptor.for_connector("kafka")
.schema(Schema.new_builder()
.column("txn_id", DataTypes.INT())
.column("acc_id", DataTypes.INT())
.column("txn_value", DataTypes.DOUBLE())
.column("txn_date", DataTypes.DATE())
.build())
.option("properties.bootstrap.servers", "<host>:<port>")
.option("topic", "transactions_test")
.option("properties.group.id", "txn_group")
.option("scan.startup.mode", "earliest-offset")
.option("format", "json")
.option("json.fail-on-missing-field", "false")
.option("json.ignore-parse-errors", "true")
.build()
)
1 | Добавление JAR Kafka-коннектора в контекст задачи PyFlink. Информация об актуальной версии коннектора доступна в документации Flink. |
2 | Создание временной таблицы, в которую попадают данные из Kafka-топика. |
Следующий код создает таблицу на основе CSV-файла, используя SQL.
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.execute_sql("""
CREATE TABLE src_table_from_csv (
txn_id INT,
acc_id INT,
txn_value INT,
txn_date STRING
) WITH (
'connector' = 'filesystem', (1)
'format' = 'csv',
'path' = 'test.csv',
'csv.field-delimiter' = ','
)
""")
table = t_env.from_path("src_table_from_csv")
table.execute().print()
1 | Использование встроенного коннектора для работы с файлами. |
+----+-------------+-------------+-------------+--------------------------------+ | op | txn_id | acc_id | txn_value | txn_date | +----+-------------+-------------+-------------+--------------------------------+ | +I | 1 | 1001 | 20 | 2025-01-02 | | +I | 2 | 1002 | 110 | 2025-01-01 | | +I | 3 | 1003 | 23 | 2025-01-01 | | +I | 4 | 1002 | 50 | 2025-01-03 | | +I | 5 | 1001 | 78 | 2025-01-02 | +----+-------------+-------------+-------------+--------------------------------+
Больше информации о других способах создания таблиц в Table API доступно в документации Flink.
Запросы и операции с таблицами
Объект Table
предоставляет множество методов для выполнения реляционных операций.
Актуальная справочная информация по API доступна в документации Flink.
Основные операции с таблицами показаны в следующем примере.
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.expressions import col, call
env_settings = EnvironmentSettings.in_batch_mode() (1)
t_env = TableEnvironment.create(env_settings)
txn_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-01"),
(3, 1002, 70.00, "2025-01-01"),
(4, 1001, 40.00, "2025-01-02"),
(5, 1003, 50.00, "2025-01-01"),
(6, 1001, 10.00, "2025-01-02"),
]
schema = DataTypes.ROW([
DataTypes.FIELD("txn_id", DataTypes.INT()),
DataTypes.FIELD("acc_id", DataTypes.INT()),
DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
DataTypes.FIELD("txn_date", DataTypes.STRING())
])
t_txns = t_env.from_elements(txn_data, schema)
print("Added table column:")
t_with_net_value = t_txns.add_columns( (2)
(col("txn_value") * 0.87).alias("net_value")
)
t_with_net_value.execute().print()
t_result = t_txns \ (3)
.filter(col("txn_date") != "2025-01-02") \
.group_by(col("acc_id")) \
.select(
col("acc_id"),
call("sum", col("txn_value")).alias("total_txn_value")
)
print("Filtered, groupped, and aggregated: ")
t_result.execute().print()
invoice_data = [
(1, 1, True),
(2, 4, False),
(3, 3, True),
(4, 2, True),
(5, 6, False),
(5, 5, True),
]
schema_i = DataTypes.ROW([
DataTypes.FIELD("invoice_id", DataTypes.INT()),
DataTypes.FIELD("tx_id", DataTypes.INT()),
DataTypes.FIELD("is_confirmed", DataTypes.BOOLEAN()),
])
t_invoices = t_env.from_elements(invoice_data, schema_i)
t_joined = t_invoices.join(t_txns).where(col("tx_id") == col("txn_id")) \ (4)
.select(col("txn_id"),
col("acc_id"),
col("invoice_id"),
col("is_confirmed"))
print("JOINed tables:")
t_joined.execute().print()
1 | Установка пакетного режима работы. |
2 | Добавление столбца в таблицу. |
3 | Выполнение цепочки операторов и функций агрегации, таких как filter() , group_by() , sum() . |
4 | Использование Join. |
Added table column: +-------------+-------------+--------------------------------+--------------------------------+--------------------------------+ | txn_id | acc_id | txn_value | txn_date | net_value | +-------------+-------------+--------------------------------+--------------------------------+--------------------------------+ | 1 | 1001 | 100.0 | 2025-01-01 | 87.0 | | 2 | 1002 | 20.0 | 2025-01-01 | 17.4 | | 3 | 1002 | 70.0 | 2025-01-01 | 60.9 | | 4 | 1001 | 40.0 | 2025-01-02 | 34.8 | | 5 | 1003 | 50.0 | 2025-01-01 | 43.5 | | 6 | 1001 | 10.0 | 2025-01-02 | 8.7 | +-------------+-------------+--------------------------------+--------------------------------+--------------------------------+ 6 rows in set Filtered, groupped, and aggregated: +-------------+--------------------------------+ | acc_id | total_txn_value | +-------------+--------------------------------+ | 1003 | 50.0 | | 1002 | 90.0 | | 1001 | 100.0 | +-------------+--------------------------------+ 3 rows in set JOINed tables: +-------------+-------------+-------------+--------------+ | txn_id | acc_id | invoice_id | is_confirmed | +-------------+-------------+-------------+--------------+ | 5 | 1003 | 5 | TRUE | | 2 | 1002 | 4 | TRUE | | 3 | 1002 | 3 | TRUE | | 6 | 1001 | 5 | FALSE | | 1 | 1001 | 1 | TRUE | | 4 | 1001 | 2 | FALSE | +-------------+-------------+-------------+--------------+ 6 rows in set
Операции с таблицами можно также выполнять с помощью SQL:
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql("""
CREATE TEMPORARY TABLE t_txns (
txn_id INT,
acc_id INT,
txn_value DOUBLE,
txn_date STRING
) WITH (
'connector' = 'filesystem',
'path' = 'test.csv',
'format' = 'csv'
)
""")
table_env.execute_sql("""
CREATE TABLE t_sink_print (
acc_id INT,
max_txn_value DOUBLE
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql("""
INSERT INTO t_sink_print
SELECT acc_id, MAX(txn_value) AS max_txn_value
FROM t_txns
GROUP BY acc_id
""").wait()
15> +I[1002, 110.0] 3> +I[1001, 20.0] 2> +I[1003, 23.0] 3> -U[1001, 20.0] 3> +U[1001, 78.0]
Запись данных в sink-таблицу
В следующем примере показано создание таблиц типа sink, которые сохраняют данные во внешних системах:
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("pipeline.jars",
"file:///home/konstantin/pyflink_demo/flink-sql-connector-kafka-3.3.0-1.19.jar")
mock_data = [
(1, 1001, 50.75, "2025-01-01"),
(2, 1002, 70.00, "2025-01-02"),
(3, 1001, 20.25, "2025-01-02")
]
t_txns = t_env.from_elements(
elements=mock_data,
schema=DataTypes.ROW([
DataTypes.FIELD("txn_id", DataTypes.INT()),
DataTypes.FIELD("acc_id", DataTypes.INT()),
DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
DataTypes.FIELD("txn_date", DataTypes.STRING())
])
)
t_env.create_temporary_view("txn_table", t_txns) (1)
t_env.execute_sql(""" (2)
CREATE TABLE sink_table (
txn_id INT,
acc_id INT,
txn_value DOUBLE,
txn_date STRING
) WITH (
'connector' = 'print' (3)
)
""")
t_env.execute_sql(""" (4)
INSERT INTO sink_table
SELECT * FROM txn_table
WHERE `acc_id` = 1002
""").wait()
t_env.create_temporary_table( (5)
'kafka_sink',
TableDescriptor.for_connector('kafka')
.schema(Schema.new_builder()
.column('txn_id', DataTypes.INT())
.column('acc_id', DataTypes.INT())
.column('txn_value', DataTypes.DOUBLE())
.column('txn_date', DataTypes.STRING())
.build())
.option('topic', 'transactions_test')
.option('properties.bootstrap.servers', 'localhost:9092')
.format(FormatDescriptor.for_format('json')
.build())
.build())
t_env.execute_sql("""
INSERT INTO kafka_sink
SELECT * FROM txn_table
WHERE `acc_id` = 1002
""").wait()
1 | Создание временного представления на основе таблицы. |
2 | Создание sink-таблицы. |
3 | Вывод содержимого таблицы в STDOUT. |
4 | Вызов операции INSERT , которая инициирует запись данных в приемник (sink).
Поскольку INSERT выполняется асинхронно в отдельном потоке, wait() необходим, чтобы дождаться вывода в STDOUT. |
5 | Создание sink-таблицы, которая использует коннектор Kafka для записи данных в Kafka-топик. |
DataStream API
PyFlink DataStream API предоставляет более низкоуровневый контроль над функциями Flink, такими как управление состоянием (state) и временем обработки, использование контрольных точек, восстановление и так далее.
Центральный объект API — DataStream, который можно представить как неизменяемую коллекцию, проходящую через один или несколько этапов преобразований. Большинство приложений включает следующие шаги:
-
Создание среды выполнения.
-
Чтение данных из источника и создание объектов
DataStream
. -
Обработка
DataStream
операторами преобразования. Каждый оператор получает на входеDataStream
и возвращает новыйDataStream
с измененными данными. -
Сохранение содержимого
DataStream
во внешней системе.
Создание среды выполнения
Точкой входа в приложение DataStream API является создание объекта среды выполнения (StreamExecutionEnvironment). Пример создания показан ниже.
menv = StreamExecutionEnvironment.get_execution_environment() (1)
menv.set_parallelism(1) (2)
menv.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# data ingest, transformation, and sink logic
menv.execute() (3)
1 | Создание объекта StreamExecutionEnvironment . |
2 | Установка параметров конфигурации (параллелизм, восстановление, параметры сериализации и прочие). |
3 | Запуск задачи. Также можно использовать execute_async() для асинхронного запуска задачи. |
Созданный StreamExecutionEnvironment
используется для установки свойств конфигурации, создания источников (sources), приемников (sinks), операторов (operators) и для запуска задачи.
Получение данных из источника
Используя StreamExecutionEnvironment
, можно создать источник Flink для чтения данных из внешней системы.
Ниже представлены несколько способов создания источников.
Следующий пример создает DataStream
, используя Python-список в качестве источника данных:
from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment
config = Configuration()
config.set_string("execution.runtime-mode", "STREAMING")
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
mds.print()
menv.execute()
+I[1,1001,100.0,2025-01-01] +I[2,1002,20.0,2025-01-02] +I[3,1003,70.0,2025-01-03]
TIP
Элементы
|
Следующий код создает источник Kafka с помощью Kafka-коннектора:
from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
menv = StreamExecutionEnvironment.get_execution_environment()
menv.add_jars("file:///home/konstantin/pyflink_demo/flink-sql-connector-kafka-3.3.0-1.19.jar") (1)
source_kafka = KafkaSource.builder() \ (2)
.set_bootstrap_servers("<host>:<port>") \
.set_topics("m_topic") \
.set_group_id("my_group") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
mds = menv.from_source(source_kafka, (3)
WatermarkStrategy.for_monotonous_timestamps(),
"Test Kafka source")
menv.execute()
1 | Добавляет JAR Kafka-коннектора в контекст задачи PyFlink. Для получения актуальной версии коннектора Kafka можно обратиться к документации Flink. |
2 | Создание источника Kafka на основе топика Kafka. |
3 | Получение DataStream от источника Kafka. |
В примере ниже Flink считывает строки CSV-файла и возвращает DataStream
.
Каждая строка является отдельной записью в потоке.
from pyflink.common import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
menv = StreamExecutionEnvironment.get_execution_environment()
file_source = FileSource.for_record_stream_format(
StreamFormat.text_line_format(), (1)
"test.csv"
).build()
wm_strategy = WatermarkStrategy.for_monotonous_timestamps()
ds = menv.from_source(source=file_source, (2)
source_name="test_file_source",
watermark_strategy=wm_strategy,
)
ds.print()
menv.execute()
1 | Использование встроенного StreamFormat, который позволяет читать файлы построчно и преобразовывать каждую строку в отдельную запись. |
2 | Использование встроенной реализации водяного знака (watermark), предполагающей, что записи DataStream всегда поступают в нужном порядке. |
10> 1,1001,20,2025-01-02 10> 2,1002,110,2025-01-01 10> 3,1003,23,2025-01-01 10> 4,1002,50,2025-01-03 10> 5,1001,78,2025-01-02
РЕКОМЕНДАЦИЯ
10> указывает на идентификатор задачи или индекс подзадачи.
|
Операторы преобразования
PyFlink предоставляет множество операторов, таких как map()
, filter()
, reduce()
и прочих.
Каждый оператор получает на входе объект DataStream
, выполняет преобразования данных и возвращает новый DataStream
.
Основные операции показаны ниже:
from pyflink.common import Types, Row
from pyflink.datastream import StreamExecutionEnvironment, MapFunction
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
ds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
print("Original DataStream:")
ds.print()
ds_filtered = ds.filter(lambda txn: txn["txn_value"] > 50.0) (1)
print("Filtered DataStream:")
ds_filtered.print()
ds_mapped = ds.map( (2)
lambda txn: Row(
txn["txn_id"],
9999 if txn["acc_id"] == 1003 else txn["acc_id"],
txn["txn_value"],
txn["txn_date"]
), output_type=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
print("Lambda-mapped DataStream:")
ds_mapped.print()
class UpdateAccID(MapFunction): (3)
def map(self, value):
txn_id, acc_id, txn_value, txn_date = value
if acc_id < 2000:
acc_id = acc_id * 2
return (Row(txn_id, acc_id, txn_value, txn_date))
ds_mapped_class = ds.map(UpdateAccID(), (4)
output_type=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.FLOAT(), Types.STRING()]
))
print("MapFunction-mapped DataStream:")
ds_mapped_class.print()
menv.execute()
1 | Использование filter(). |
2 | Применение map() с лямбда-функцией. |
3 | Реализация класса MapFunction для map-оператора.
Метод map() вызывается для каждого элемента DataStream . |
4 | Выполнение трансформации map() с использованием реализации MapFunction . |
Original DataStream: +I[1,1001,100.0,2025-01-01] +I[2,1002,20.0,2025-01-02] +I[3,1003,70.0,2025-01-03] Filtered DataStream: +I[1,1001,100.0,2025-01-01] +I[3,1003,70.0,2025-01-03] Lambda-mapped DataStream: +I[1,1001,100.0,2025-01-01] +I[2,1002,20.0,2025-01-02] +I[3,9999,70.0,2025-01-03] Custom MapFunction-mapped DataStream: +I[1,2002,100.0,2025-01-01] +I[2,2004,20.0,2025-01-02] +I[3,2006,70.0,2025-01-03]
Примечание
Строки вывода могут быть перемешаны, поскольку каждый оператор DataStream отправляет данные в STDOUT из отдельного потока.
|
Запись данных в приемник
Результаты вычислений Flink можно записать во внешнюю систему с помощью приемника (sink).
В следующем примере содержимое DataStream
записывается в файл с помощью коннектора FileSystem.
from pyflink.common import Types, Encoder
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy
OUTPUT_PATH = "file:///home/konstantin/pyflink_demo/fs_sink"
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
fs_sink = FileSink.for_row_format(OUTPUT_PATH, (1)
Encoder.simple_string_encoder("UTF-8")) \
.with_rolling_policy(RollingPolicy.default_rolling_policy( (2)
part_size=1024 ** 3,
rollover_interval=15 * 60 * 1000,
inactivity_interval=5 * 60 * 1000)) \
.build()
mds.sink_to(fs_sink) (3)
menv.execute()
1 | Создание приемника типа FileSink с использованием формата построчного кодирования (row-encoded format). |
2 | Установка политики ротации. |
3 | Запись содержимого DataStream в приемник. |
Данный пример создает директорию, указанную в поле OUTPUT_PATH
, например:
/home/konstantin/pyflink_demo/fs_sink └─ /2025-04-08--13 └─ part-4a8b2a20-187d-4ad6-ae7a-8f0e6dc880c5-0
Каждая запись DataStream
записывается в файл в виде новой строки:
+I[1, 1001, 100.0, 2025-01-01] +I[2, 1002, 20.0, 2025-01-02] +I[3, 1003, 70.0, 2025-01-03]
Для записи данных в формате ORC, Parquet или Avro используйте bulk-encoded форматы.
Ниже показан пример задачи, которая записывает данные в файл в формате ORC, используя FileSink
.
from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink
from pyflink.datastream.formats.orc import OrcBulkWriters
from pyflink.table import DataTypes
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
menv.add_jars("file:///home/konstantin/pyflink_demo/flink-sql-orc-1.19.2.jar") (1)
OUTPUT_PATH = "file:///home/konstantin/pyflink_demo/fs_sink_orc"
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data,
type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
row_type = DataTypes.ROW([
DataTypes.FIELD('txn_id', DataTypes.INT()),
DataTypes.FIELD('acc_id', DataTypes.INT()),
DataTypes.FIELD('txn_value', DataTypes.DOUBLE()),
DataTypes.FIELD('txn_date', DataTypes.STRING()),
])
file_sink_orc = FileSink.for_bulk_format(
OUTPUT_PATH,
OrcBulkWriters.for_row_type( (2)
row_type=row_type,
writer_properties=Configuration(),
hadoop_config=Configuration(),
)
).build()
mds.sink_to(file_sink_orc)
menv.execute()
1 | Добавление зависимостей для работы с ORC. |
2 | Использование формата ORC с FileSink . |
Следующий код направляет содержимое DataStream
в топик Kafka, используя приемник Kafka.
Соединение с сервером устанавливается с помощью коннектора.
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
menv.add_jars("file:///D:/dev/py/pyflink/flink-sql-connector-kafka-3.3.0-1.19.jar") (1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
kafka_sink = KafkaSink.builder() \ (2)
.set_bootstrap_servers("<host>:<port>") \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("m_topic")
.set_value_serialization_schema(SimpleStringSchema())
.build()
) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.build()
mds.sink_to(kafka_sink) (3)
menv.execute()
1 | Добавление коннектора Kafka в контекст задачи PyFlink. Актуальная версия коннектора доступна в документации Flink. |
2 | Создание объекта приемника, который подключается к брокеру Kafka, используя значения "<host>:<port>" . |
3 | Запись содержимого DataStream в приемник Kafka. |
Управление зависимостями
В приложениях PyFlink могут использоваться сторонние зависимости, например: библиотеки Python, JAR-файлы коннекторов, фреймворки ML и прочие. Способы их импорта отличаются для Table API и DataStream API и показаны ниже.
Зависимости JAR
table_env.get_config().set("pipeline.jars", "file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
table_env.get_config().set("pipeline.classpaths", "file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
stream_ex_env.add_jars("file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
stream_ex_env.add_classpaths("file:///path/to/myJar1.jar", "file:///path/to/myJar2.jar")
$ flink run \
--python test_flink_job.py \
--jarfile path/to/myJar1.jar