PyFlink usage examples
PyFlink is a Python API for Flink that allows building scalable batch and streaming pipelines using Python. Under the hood, PyFlink uses Py4J that connects to the target Flink JVM and interacts with the Flink objects.
PyFlink is distributed as a pip
module.
ADH comes with a Python interpreter that already has PyFlink installed, as well as all the required dependencies.
This interpreter is located at /opt/pyflink-python/ on ADH hosts with Flink components installed.
Using this interpreter, you can run PyFlink applications as regular Python scripts, for example:
$ source /opt/pyflink-python/bin/activate
$ /opt/pyflink-python/bin/python3 create_table.py
PyFlink APIs: DataStream vs Table
PyFlink provides two different APIs, depending on the abstraction level you need:
-
Table API. Allows running relational queries, similarly to using SQL or working with tabular data in Python.
-
DataStream API. Provides a lower-level control over Flink functions, like state, checkpoints, processing time, etc. Allows building more complex stream processing logic.
This section provides examples on working with both APIs. Additional PyFlink usage examples can be found on the Flink GitHub page. The detailed PyFlink API reference is available in Flink documentation.
Table API
PyFlink Table API is a unified relational API for batch and stream processing. It allows using an SQL-like domain-specific language for defining table queries and operations. The queries are executed with the same semantics on bounded batch data sets and unbounded streams.
A typical application starts with creating an execution environment object (TableEnvironment). This is the core component of any Table API application and is used to configure and execute table-based operations. Here’s an example of instantiating the environment object:
from pyflink.table import TableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode() (1)
t_env = TableEnvironment.create(env_settings) (2)
t_env.get_config().set("parallelism.default", "1") (3)
1 | Sets the execution mode. Can be either batch or streaming. |
2 | Creates a TableEnvironment object. |
3 | Sets Flink configuration properties (parallelism, recovery, state, etc.) on the application level. |
Create a source table
Once TableEnvironment
is instantiated, you can use it to create Flink tables.
A Table
object describes a pipeline of data transformations.
It does not store any data itself, but describes where to get data (source table), or where to store the data after the processing (sink table).
Below are several examples of creating Flink source tables.
The following snippet creates source tables from an in-memory collection.
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("parallelism.default", "1")
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
table_1 = t_env.from_elements(elements=mock_data, (1)
schema=['txn_id', 'acc_id', 'txn_value', 'txn_date'])
print("Inferred data types for table_1:")
table_1.print_schema()
from pyflink.table import DataTypes
table_2 = t_env.from_elements(elements=mock_data, (2)
schema=DataTypes.ROW([DataTypes.FIELD("txn_id", DataTypes.TINYINT()),
DataTypes.FIELD("acc_id", DataTypes.INT()),
DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
DataTypes.FIELD("txn_date", DataTypes.STRING())]))
print(f"The data type of 'txn_date': {table_2.get_schema().get_field_data_type('txn_date')}")
table_2.print_schema()
table_2 = table_2.select(
col("txn_id"),
col("acc_id"),
col("txn_value"),
col("txn_date").cast(DataTypes.DATE()).alias("txn_date")
)
print(f"Data type of 'txn_date' after casting:"
f"{table_2.get_schema().get_field_data_type('txn_date')}")
print("SELECT * FROM table_2:")
table_2.execute().print() (3)
1 | Creates a table, whose schema infers data types from the collection. |
2 | Creates a table with explicit data types in schema. |
3 | Prints the table’s content. |
Inferred data types for table_1: ( `txn_id` BIGINT, `acc_id` BIGINT, `txn_value` DOUBLE, `txn_date` STRING ) The data type of 'txn_date': VARCHAR ( `txn_id` TINYINT, `acc_id` INT, `txn_value` DOUBLE, `txn_date` STRING ) Data type of 'txn_date' after casting:DATE SELECT * FROM table_2: +----+--------+-------------+--------------------------------+------------+ | op | txn_id | acc_id | txn_value | txn_date | +----+--------+-------------+--------------------------------+------------+ | +I | 1 | 1001 | 100.0 | 2025-01-01 | | +I | 2 | 1002 | 20.0 | 2025-01-02 | | +I | 3 | 1003 | 70.0 | 2025-01-03 | +----+--------+-------------+--------------------------------+------------+ 3 rows in set
The following snippet creates a source table subscribed to a Kafka topic via the TableDescriptor.
from pyflink.table import EnvironmentSettings, TableEnvironment, Schema, DataTypes, TableDescriptor
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/konstantin/pyflink_demo/flink-connector-kafka-3.3.0-1.19.jar") (1)
t_env.create_temporary_table( (2)
"kafka_source_table",
TableDescriptor.for_connector("kafka")
.schema(Schema.new_builder()
.column("txn_id", DataTypes.INT())
.column("acc_id", DataTypes.INT())
.column("txn_value", DataTypes.DOUBLE())
.column("txn_date", DataTypes.DATE())
.build())
.option("properties.bootstrap.servers", "<host>:<port>")
.option("topic", "transactions_test")
.option("properties.group.id", "txn_group")
.option("scan.startup.mode", "earliest-offset")
.option("format", "json")
.option("json.fail-on-missing-field", "false")
.option("json.ignore-parse-errors", "true")
.build()
)
1 | Adds the Kafka connector JAR to the PyFlink job context. To get an up-to-date Kafka connector version, see Flink documentation. |
2 | Creates a temporary table that ingests data from a Kafka topic. |
The following snippet uses SQL expressions to create a table from a CSV file.
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.execute_sql("""
CREATE TABLE src_table_from_csv (
txn_id INT,
acc_id INT,
txn_value INT,
txn_date STRING
) WITH (
'connector' = 'filesystem', (1)
'format' = 'csv',
'path' = 'test.csv',
'csv.field-delimiter' = ','
)
""")
table = t_env.from_path("src_table_from_csv")
table.execute().print()
1 | Uses the built-in connector for working with files. |
+----+-------------+-------------+-------------+--------------------------------+ | op | txn_id | acc_id | txn_value | txn_date | +----+-------------+-------------+-------------+--------------------------------+ | +I | 1 | 1001 | 20 | 2025-01-02 | | +I | 2 | 1002 | 110 | 2025-01-01 | | +I | 3 | 1003 | 23 | 2025-01-01 | | +I | 4 | 1002 | 50 | 2025-01-03 | | +I | 5 | 1001 | 78 | 2025-01-02 | +----+-------------+-------------+-------------+--------------------------------+
For information on other ways of creating tables using Table API, see Flink documentation.
Queries and table operations
The Table
object provides numerous methods for executing relational operations.
The up-to-date API reference is available in the Flink documentation.
The following snippet covers major operations.
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.expressions import col, call
env_settings = EnvironmentSettings.in_batch_mode() (1)
t_env = TableEnvironment.create(env_settings)
txn_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-01"),
(3, 1002, 70.00, "2025-01-01"),
(4, 1001, 40.00, "2025-01-02"),
(5, 1003, 50.00, "2025-01-01"),
(6, 1001, 10.00, "2025-01-02"),
]
schema = DataTypes.ROW([
DataTypes.FIELD("txn_id", DataTypes.INT()),
DataTypes.FIELD("acc_id", DataTypes.INT()),
DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
DataTypes.FIELD("txn_date", DataTypes.STRING())
])
t_txns = t_env.from_elements(txn_data, schema)
print("Added table column:")
t_with_net_value = t_txns.add_columns( (2)
(col("txn_value") * 0.87).alias("net_value")
)
t_with_net_value.execute().print()
t_result = t_txns \ (3)
.filter(col("txn_date") != "2025-01-02") \
.group_by(col("acc_id")) \
.select(
col("acc_id"),
call("sum", col("txn_value")).alias("total_txn_value")
)
print("Filtered, groupped, and aggregated: ")
t_result.execute().print()
invoice_data = [
(1, 1, True),
(2, 4, False),
(3, 3, True),
(4, 2, True),
(5, 6, False),
(5, 5, True),
]
schema_i = DataTypes.ROW([
DataTypes.FIELD("invoice_id", DataTypes.INT()),
DataTypes.FIELD("tx_id", DataTypes.INT()),
DataTypes.FIELD("is_confirmed", DataTypes.BOOLEAN()),
])
t_invoices = t_env.from_elements(invoice_data, schema_i)
t_joined = t_invoices.join(t_txns).where(col("tx_id") == col("txn_id")) \ (4)
.select(col("txn_id"),
col("acc_id"),
col("invoice_id"),
col("is_confirmed"))
print("JOINed tables:")
t_joined.execute().print()
1 | Sets the execution mode to batch. |
2 | Adds a column to the table. |
3 | Applies a chain of operators and aggregation functions, like filter() , group_by() , sum() . |
4 | Applies a Join operation. |
Added table column: +-------------+-------------+--------------------------------+--------------------------------+--------------------------------+ | txn_id | acc_id | txn_value | txn_date | net_value | +-------------+-------------+--------------------------------+--------------------------------+--------------------------------+ | 1 | 1001 | 100.0 | 2025-01-01 | 87.0 | | 2 | 1002 | 20.0 | 2025-01-01 | 17.4 | | 3 | 1002 | 70.0 | 2025-01-01 | 60.9 | | 4 | 1001 | 40.0 | 2025-01-02 | 34.8 | | 5 | 1003 | 50.0 | 2025-01-01 | 43.5 | | 6 | 1001 | 10.0 | 2025-01-02 | 8.7 | +-------------+-------------+--------------------------------+--------------------------------+--------------------------------+ 6 rows in set Filtered, groupped, and aggregated: +-------------+--------------------------------+ | acc_id | total_txn_value | +-------------+--------------------------------+ | 1003 | 50.0 | | 1002 | 90.0 | | 1001 | 100.0 | +-------------+--------------------------------+ 3 rows in set JOINed tables: +-------------+-------------+-------------+--------------+ | txn_id | acc_id | invoice_id | is_confirmed | +-------------+-------------+-------------+--------------+ | 5 | 1003 | 5 | TRUE | | 2 | 1002 | 4 | TRUE | | 3 | 1002 | 3 | TRUE | | 6 | 1001 | 5 | FALSE | | 1 | 1001 | 1 | TRUE | | 4 | 1001 | 2 | FALSE | +-------------+-------------+-------------+--------------+ 6 rows in set
Alternatively, you can run table operations using SQL, for example:
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql("""
CREATE TEMPORARY TABLE t_txns (
txn_id INT,
acc_id INT,
txn_value DOUBLE,
txn_date STRING
) WITH (
'connector' = 'filesystem',
'path' = 'test.csv',
'format' = 'csv'
)
""")
table_env.execute_sql("""
CREATE TABLE t_sink_print (
acc_id INT,
max_txn_value DOUBLE
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql("""
INSERT INTO t_sink_print
SELECT acc_id, MAX(txn_value) AS max_txn_value
FROM t_txns
GROUP BY acc_id
""").wait()
15> +I[1002, 110.0] 3> +I[1001, 20.0] 2> +I[1003, 23.0] 3> -U[1001, 20.0] 3> +U[1001, 78.0]
Write data to a sink table
The following example creates sink tables that store data in external locations:
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("pipeline.jars",
"file:///home/konstantin/flink-sql-connector-kafka-3.3.0-1.19.jar")
mock_data = [
(1, 1001, 50.75, "2025-01-01"),
(2, 1002, 70.00, "2025-01-02"),
(3, 1001, 20.25, "2025-01-02")
]
t_txns = t_env.from_elements(
elements=mock_data,
schema=DataTypes.ROW([
DataTypes.FIELD("txn_id", DataTypes.INT()),
DataTypes.FIELD("acc_id", DataTypes.INT()),
DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
DataTypes.FIELD("txn_date", DataTypes.STRING())
])
)
t_env.create_temporary_view("txn_table", t_txns) (1)
t_env.execute_sql(""" (2)
CREATE TABLE sink_table (
txn_id INT,
acc_id INT,
txn_value DOUBLE,
txn_date STRING
) WITH (
'connector' = 'print' (3)
)
""")
t_env.execute_sql(""" (4)
INSERT INTO sink_table
SELECT * FROM txn_table
WHERE `acc_id` = 1002
""").wait()
t_env.create_temporary_table( (5)
'kafka_sink',
TableDescriptor.for_connector('kafka')
.schema(Schema.new_builder()
.column('txn_id', DataTypes.INT())
.column('acc_id', DataTypes.INT())
.column('txn_value', DataTypes.DOUBLE())
.column('txn_date', DataTypes.STRING())
.build())
.option('topic', 'transactions_test')
.option('properties.bootstrap.servers', 'localhost:9092')
.format(FormatDescriptor.for_format('json')
.build())
.build())
t_env.execute_sql("""
INSERT INTO kafka_sink
SELECT * FROM txn_table
WHERE `acc_id` = 1002
""").wait()
1 | Creates a temporary view from a table. |
2 | Creates a sink table. |
3 | Forwards all table data to STDOUT. |
4 | Invokes the INSERT query that triggers spilling data to the sink location.
Since the INSERT operation runs asynchronously in a separate thread, wait() is used to wait for results forwarded to STDOUT. |
5 | Creates another sink table, which uses a Kafka connector to write table data to the Kafka topic. |
DataStream API
PyFlink DataStream API provides a lower-level control over Flink features, like state management, processing time, checkpoints, recovery, etc.
The central API object is a DataStream, which can be thought of as an immutable collection that travels through transformation stages. A typical application comprises the following steps:
-
Create an execution environment.
-
Read data from a source and produce one or more DataStream objects.
-
Direct the DataStream(s) through one or more transformation operators. Each operator consumes an input DataStream and produces a new one with modified data.
-
Save the DataStream content to an external location using sinks.
Create execution environment
The entry point of a DataStream API application is the execution environment object (StreamExecutionEnvironment). The following example shows how to instantiate one.
menv = StreamExecutionEnvironment.get_execution_environment() (1)
menv.set_parallelism(1) (2)
menv.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# data ingest, transformation, and sink logic
menv.execute() (3)
1 | Creates a StreamExecutionEnvironment object. |
2 | Sets configuration properties like parallelism, retries, serializer options, and so on. |
3 | Triggers the job execution. You can also use execute_async() to run a job asynchronously and retrieve job results later. |
Once the environment object is created, you can use it to set job properties, define sources, sinks, operators, and trigger job execution.
Ingest data from a source
Using StreamExecutionEnvironment
, you can define Flink sources to ingest data into the Flink app.
Several ways of defining sources are shown below.
The following example creates a DataStream
using an in-memory collection as a source.
from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment
config = Configuration()
config.set_string("execution.runtime-mode", "STREAMING")
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
mds.print()
menv.execute()
+I[1,1001,100.0,2025-01-01] +I[2,1002,20.0,2025-01-02] +I[3,1003,70.0,2025-01-03]
TIP
Tokens like
|
The following snippet creates a Kafka source by using a connector for Kafka.
from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
menv = StreamExecutionEnvironment.get_execution_environment()
menv.add_jars("file:///home/konstantin/pyflink_demo/flink-sql-connector-kafka-3.3.0-1.19.jar") (1)
source_kafka = KafkaSource.builder() \ (2)
.set_bootstrap_servers("<host>:<port>") \
.set_topics("m_topic") \
.set_group_id("my_group") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
mds = menv.from_source(source_kafka, (3)
WatermarkStrategy.for_monotonous_timestamps(),
"Test Kafka source")
menv.execute()
1 | Adds the Kafka connector JAR to the PyFlink job context. To get an up-to-date Kafka connector version, see Flink documentation. |
2 | Creates a Kafka source by connecting to a Kafka broker. |
3 | Creates a DataStream from the Kafka source. |
The following snippet creates a DataStream
from a CSV file, treating each line as a stream record.
from pyflink.common import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
menv = StreamExecutionEnvironment.get_execution_environment()
file_source = FileSource.for_record_stream_format(
StreamFormat.text_line_format(), (1)
"test.csv"
).build()
wm_strategy = WatermarkStrategy.for_monotonous_timestamps()
ds = menv.from_source(source=file_source, (2)
source_name="test_file_source",
watermark_strategy=wm_strategy,
)
ds.print()
menv.execute()
1 | Uses a built-in StreamFormat, which tells Flink to read files line by line, treating each line as a single record. |
2 | Uses a built-in watermark, which assumes DataStream records (lines) never arrive out of order. |
10> 1,1001,20,2025-01-02 10> 2,1002,110,2025-01-01 10> 3,1003,23,2025-01-01 10> 4,1002,50,2025-01-03 10> 5,1001,78,2025-01-02
TIP
10> indicates a task ID or a subtask index.
|
Transformation operators
PyFlink provides numerous operators like map()
, filter()
, reduce()
, etc.
Each operator consumes a DataStream
, applies transformations on the data, and emits a new DataStream
object.
Basic operations are covered below:
from pyflink.common import Types, Row
from pyflink.datastream import StreamExecutionEnvironment, MapFunction
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
ds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
print("Original DataStream:")
ds.print()
ds_filtered = ds.filter(lambda txn: txn["txn_value"] > 50.0) (1)
print("Filtered DataStream:")
ds_filtered.print()
ds_mapped = ds.map( (2)
lambda txn: Row(
txn["txn_id"],
9999 if txn["acc_id"] == 1003 else txn["acc_id"],
txn["txn_value"],
txn["txn_date"]
), output_type=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
print("Lambda-mapped DataStream:")
ds_mapped.print()
class UpdateAccID(MapFunction): (3)
def map(self, value):
txn_id, acc_id, txn_value, txn_date = value
if acc_id < 2000:
acc_id = acc_id * 2
return (Row(txn_id, acc_id, txn_value, txn_date))
ds_mapped_class = ds.map(UpdateAccID(), (4)
output_type=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.FLOAT(), Types.STRING()]
))
print("MapFunction-mapped DataStream:")
ds_mapped_class.print()
menv.execute()
1 | Applies the filter() transformation on a DataStream. |
2 | Applies the map() transformation using a lambda function. |
3 | A custom MapFunction implementation to be used by the map operator.
The map() method is called for each element of the DataStream . |
4 | Applies the map() transformation using a custom MapFunction implementation. |
Original DataStream: +I[1,1001,100.0,2025-01-01] +I[2,1002,20.0,2025-01-02] +I[3,1003,70.0,2025-01-03] Filtered DataStream: +I[1,1001,100.0,2025-01-01] +I[3,1003,70.0,2025-01-03] Lambda-mapped DataStream: +I[1,1001,100.0,2025-01-01] +I[2,1002,20.0,2025-01-02] +I[3,9999,70.0,2025-01-03] Custom MapFunction-mapped DataStream: +I[1,2002,100.0,2025-01-01] +I[2,2004,20.0,2025-01-02] +I[3,2006,70.0,2025-01-03]
NOTE
The output lines will be shuffled since every DataStream operator prints to STDOUT from its own thread.
|
Write data to a sink
After the computation steps are complete, you can direct the results to an external location using sinks.
The following snippet spills the DataStream
content to a file sink using the FileSystem connector.
from pyflink.common import Types, Encoder
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy
OUTPUT_PATH = "file:///home/konstantin/pyflink_demo/fs_sink"
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
fs_sink = FileSink.for_row_format(OUTPUT_PATH, (1)
Encoder.simple_string_encoder("UTF-8")) \
.with_rolling_policy(RollingPolicy.default_rolling_policy( (2)
part_size=1024 ** 3,
rollover_interval=15 * 60 * 1000,
inactivity_interval=5 * 60 * 1000)) \
.build()
mds.sink_to(fs_sink) (3)
menv.execute()
1 | Creates a FileSink sink using the row-encoded format. |
2 | Sets a rolling policy. |
3 | Flushes the DataStream records to the sink. |
This Flink job creates a directory specified by the OUTPUT_PATH
location, for example:
/home/konstantin/pyflink_demo/fs_sink └─ /2025-04-08--13 └─ part-4a8b2a20-187d-4ad6-ae7a-8f0e6dc880c5-0
Every DataStream
record is written to a target file as a new line:
+I[1, 1001, 100.0, 2025-01-01] +I[2, 1002, 20.0, 2025-01-02] +I[3, 1003, 70.0, 2025-01-03]
To write binary data (ORC, Parquet, Avro, etc.) to a file, use bulk-encoded formats.
A sample job that writes ORC bytes using FileSink
is below:
from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink
from pyflink.datastream.formats.orc import OrcBulkWriters
from pyflink.table import DataTypes
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
menv.add_jars("file:///home/konstantin/pyflink_demo/flink-sql-orc-1.19.2.jar") (1)
OUTPUT_PATH = "file:///home/konstantin/pyflink_demo/fs_sink_orc"
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data,
type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
row_type = DataTypes.ROW([
DataTypes.FIELD('txn_id', DataTypes.INT()),
DataTypes.FIELD('acc_id', DataTypes.INT()),
DataTypes.FIELD('txn_value', DataTypes.DOUBLE()),
DataTypes.FIELD('txn_date', DataTypes.STRING()),
])
file_sink_orc = FileSink.for_bulk_format(
OUTPUT_PATH,
OrcBulkWriters.for_row_type( (2)
row_type=row_type,
writer_properties=Configuration(),
hadoop_config=Configuration(),
)
).build()
mds.sink_to(file_sink_orc)
menv.execute()
1 | Adds a dependency JAR to work with ORC. |
2 | Uses an ORC writer with FileSink . |
The following snippet forwards the content of a DataStream
to a Kafka topic using the Kafka sink object.
The connection to the server is provided by the Kafka connector.
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
menv.add_jars("file:///D:/dev/py/pyflink/flink-sql-connector-kafka-3.3.0-1.19.jar") (1)
mock_data = [
(1, 1001, 100.00, "2025-01-01"),
(2, 1002, 20.00, "2025-01-02"),
(3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
["txn_id", "acc_id", "txn_value", "txn_date"],
[Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
kafka_sink = KafkaSink.builder() \ (2)
.set_bootstrap_servers("<host>:<port>") \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("m_topic")
.set_value_serialization_schema(SimpleStringSchema())
.build()
) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.build()
mds.sink_to(kafka_sink) (3)
menv.execute()
1 | Adds the Kafka connector JAR to the PyFlink job context. To get an up-to-date Kafka connector version, see Flink documentation. |
2 | Creates a Kafka sink object that connects to a Kafka broker using the "<host>:<port>" values. |
3 | Writes DataStream items to the Kafka sink. |
Dependency management
PyFlink jobs may require third-party dependencies, for example, Python libraries, connector JARs, ML frameworks, etc. The ways for importing these are slightly different for Table API and DataStream API and are shown below.
JAR dependencies
table_env.get_config().set("pipeline.jars", "file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
table_env.get_config().set("pipeline.classpaths", "file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
stream_ex_env.add_jars("file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
stream_ex_env.add_classpaths("file:///path/to/myJar1.jar", "file:///path/to/myJar2.jar")
$ flink run \
--python test_flink_job.py \
--jarfile flink-sql-connector-kafka-3.3.0-1.19.jar