PyFlink usage examples

PyFlink is a Python API for Flink that allows building scalable batch and streaming pipelines using Python. Under the hood, PyFlink uses Py4J that connects to the target Flink JVM and interacts with the Flink objects.

PyFlink is distributed as a pip module. ADH comes with a Python interpreter that already has PyFlink installed, as well as all the required dependencies. This interpreter is located at /opt/pyflink-python/ on ADH hosts with Flink components installed. Using this interpreter, you can run PyFlink applications as regular Python scripts, for example:

$ source /opt/pyflink-python/bin/activate
$ /opt/pyflink-python/bin/python3 create_table.py

PyFlink provides two different APIs, depending on the abstraction level you need:

  • Table API. Allows running relational queries, similarly to using SQL or working with tabular data in Python.

  • DataStream API. Provides a lower-level control over Flink functions, like state, checkpoints, processing time, etc. Allows building more complex stream processing logic.

This section provides examples on working with both APIs. Additional PyFlink usage examples can be found on the Flink GitHub page. The detailed PyFlink API reference is available in Flink documentation.

Table API

PyFlink Table API is a unified relational API for batch and stream processing. It allows using an SQL-like domain-specific language for defining table queries and operations. The queries are executed with the same semantics on bounded batch data sets and unbounded streams.

A typical application starts with creating an execution environment object (TableEnvironment). This is the core component of any Table API application and is used to configure and execute table-based operations. Here’s an example of instantiating the environment object:

from pyflink.table import TableEnvironment, EnvironmentSettings

env_settings = EnvironmentSettings.in_streaming_mode() (1)
t_env = TableEnvironment.create(env_settings) (2)
t_env.get_config().set("parallelism.default", "1") (3)
1 Sets the execution mode. Can be either batch or streaming.
2 Creates a TableEnvironment object.
3 Sets Flink configuration properties (parallelism, recovery, state, etc.) on the application level.

Create a source table

Once TableEnvironment is instantiated, you can use it to create Flink tables. A Table object describes a pipeline of data transformations. It does not store any data itself, but describes where to get data (source table), or where to store the data after the processing (sink table).

Below are several examples of creating Flink source tables.

  • From a collection

  • From a Kafka topic

  • From a file using DDL

The following snippet creates source tables from an in-memory collection.

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("parallelism.default", "1")

mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
table_1 = t_env.from_elements(elements=mock_data, (1)
                              schema=['txn_id', 'acc_id', 'txn_value', 'txn_date'])
print("Inferred data types for table_1:")
table_1.print_schema()

from pyflink.table import DataTypes
table_2 = t_env.from_elements(elements=mock_data, (2)
                              schema=DataTypes.ROW([DataTypes.FIELD("txn_id", DataTypes.TINYINT()),
                                               DataTypes.FIELD("acc_id", DataTypes.INT()),
                                               DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
                                               DataTypes.FIELD("txn_date", DataTypes.STRING())]))
print(f"The data type of 'txn_date': {table_2.get_schema().get_field_data_type('txn_date')}")
table_2.print_schema()
table_2 = table_2.select(
    col("txn_id"),
    col("acc_id"),
    col("txn_value"),
    col("txn_date").cast(DataTypes.DATE()).alias("txn_date")
)
print(f"Data type of 'txn_date' after casting:"
      f"{table_2.get_schema().get_field_data_type('txn_date')}")
print("SELECT * FROM table_2:")
table_2.execute().print() (3)
1 Creates a table, whose schema infers data types from the collection.
2 Creates a table with explicit data types in schema.
3 Prints the table’s content.
Sample output
Inferred data types for table_1:
(
  `txn_id` BIGINT,
  `acc_id` BIGINT,
  `txn_value` DOUBLE,
  `txn_date` STRING
)
The data type of 'txn_date': VARCHAR
(
  `txn_id` TINYINT,
  `acc_id` INT,
  `txn_value` DOUBLE,
  `txn_date` STRING
)
Data type of 'txn_date' after casting:DATE
SELECT * FROM table_2:
+----+--------+-------------+--------------------------------+------------+
| op | txn_id |      acc_id |                      txn_value |   txn_date |
+----+--------+-------------+--------------------------------+------------+
| +I |      1 |        1001 |                          100.0 | 2025-01-01 |
| +I |      2 |        1002 |                           20.0 | 2025-01-02 |
| +I |      3 |        1003 |                           70.0 | 2025-01-03 |
+----+--------+-------------+--------------------------------+------------+
3 rows in set

The following snippet creates a source table subscribed to a Kafka topic via the TableDescriptor.

from pyflink.table import EnvironmentSettings, TableEnvironment, Schema, DataTypes, TableDescriptor

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/konstantin/pyflink_demo/flink-connector-kafka-3.3.0-1.19.jar") (1)

t_env.create_temporary_table( (2)
    "kafka_source_table",
    TableDescriptor.for_connector("kafka")
    .schema(Schema.new_builder()
            .column("txn_id", DataTypes.INT())
            .column("acc_id", DataTypes.INT())
            .column("txn_value", DataTypes.DOUBLE())
            .column("txn_date", DataTypes.DATE())
            .build())
    .option("properties.bootstrap.servers", "<host>:<port>")
    .option("topic", "transactions_test")
    .option("properties.group.id", "txn_group")
    .option("scan.startup.mode", "earliest-offset")
    .option("format", "json")
    .option("json.fail-on-missing-field", "false")
    .option("json.ignore-parse-errors", "true")
    .build()
)
1 Adds the Kafka connector JAR to the PyFlink job context. To get an up-to-date Kafka connector version, see Flink documentation.
2 Creates a temporary table that ingests data from a Kafka topic.

The following snippet uses SQL expressions to create a table from a CSV file.

from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

t_env.execute_sql("""
CREATE TABLE src_table_from_csv (
    txn_id INT,
    acc_id INT,
    txn_value INT,
    txn_date STRING
) WITH (
    'connector' = 'filesystem', (1)
    'format' = 'csv',
    'path' = 'test.csv',
    'csv.field-delimiter' = ','
)
""")

table = t_env.from_path("src_table_from_csv")
table.execute().print()
1 Uses the built-in connector for working with files.
Sample output
+----+-------------+-------------+-------------+--------------------------------+
| op |      txn_id |      acc_id |   txn_value |                       txn_date |
+----+-------------+-------------+-------------+--------------------------------+
| +I |           1 |        1001 |          20 |                     2025-01-02 |
| +I |           2 |        1002 |         110 |                     2025-01-01 |
| +I |           3 |        1003 |          23 |                     2025-01-01 |
| +I |           4 |        1002 |          50 |                     2025-01-03 |
| +I |           5 |        1001 |          78 |                     2025-01-02 |
+----+-------------+-------------+-------------+--------------------------------+

For information on other ways of creating tables using Table API, see Flink documentation.

Queries and table operations

The Table object provides numerous methods for executing relational operations. The up-to-date API reference is available in the Flink documentation. The following snippet covers major operations.

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.expressions import col, call

env_settings = EnvironmentSettings.in_batch_mode() (1)
t_env = TableEnvironment.create(env_settings)

txn_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-01"),
    (3, 1002, 70.00, "2025-01-01"),
    (4, 1001, 40.00, "2025-01-02"),
    (5, 1003, 50.00, "2025-01-01"),
    (6, 1001, 10.00, "2025-01-02"),
]

schema = DataTypes.ROW([
    DataTypes.FIELD("txn_id", DataTypes.INT()),
    DataTypes.FIELD("acc_id", DataTypes.INT()),
    DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
    DataTypes.FIELD("txn_date", DataTypes.STRING())
])

t_txns = t_env.from_elements(txn_data, schema)
print("Added table column:")
t_with_net_value = t_txns.add_columns( (2)
    (col("txn_value") * 0.87).alias("net_value")
)
t_with_net_value.execute().print()

t_result = t_txns \ (3)
    .filter(col("txn_date") != "2025-01-02") \
    .group_by(col("acc_id")) \
    .select(
    col("acc_id"),
    call("sum", col("txn_value")).alias("total_txn_value")
)
print("Filtered, groupped, and aggregated: ")
t_result.execute().print()

invoice_data = [
    (1, 1, True),
    (2, 4, False),
    (3, 3, True),
    (4, 2, True),
    (5, 6, False),
    (5, 5, True),
]
schema_i = DataTypes.ROW([
    DataTypes.FIELD("invoice_id", DataTypes.INT()),
    DataTypes.FIELD("tx_id", DataTypes.INT()),
    DataTypes.FIELD("is_confirmed", DataTypes.BOOLEAN()),
])

t_invoices = t_env.from_elements(invoice_data, schema_i)
t_joined = t_invoices.join(t_txns).where(col("tx_id") == col("txn_id")) \ (4)
    .select(col("txn_id"),
            col("acc_id"),
            col("invoice_id"),
            col("is_confirmed"))
print("JOINed tables:")
t_joined.execute().print()
1 Sets the execution mode to batch.
2 Adds a column to the table.
3 Applies a chain of operators and aggregation functions, like filter(), group_by(), sum().
4 Applies a Join operation.
Sample output
Added table column:
+-------------+-------------+--------------------------------+--------------------------------+--------------------------------+
|      txn_id |      acc_id |                      txn_value |                       txn_date |                      net_value |
+-------------+-------------+--------------------------------+--------------------------------+--------------------------------+
|           1 |        1001 |                          100.0 |                     2025-01-01 |                           87.0 |
|           2 |        1002 |                           20.0 |                     2025-01-01 |                           17.4 |
|           3 |        1002 |                           70.0 |                     2025-01-01 |                           60.9 |
|           4 |        1001 |                           40.0 |                     2025-01-02 |                           34.8 |
|           5 |        1003 |                           50.0 |                     2025-01-01 |                           43.5 |
|           6 |        1001 |                           10.0 |                     2025-01-02 |                            8.7 |
+-------------+-------------+--------------------------------+--------------------------------+--------------------------------+
6 rows in set
Filtered, groupped, and aggregated:
+-------------+--------------------------------+
|      acc_id |                total_txn_value |
+-------------+--------------------------------+
|        1003 |                           50.0 |
|        1002 |                           90.0 |
|        1001 |                          100.0 |
+-------------+--------------------------------+
3 rows in set
JOINed tables:
+-------------+-------------+-------------+--------------+
|      txn_id |      acc_id |  invoice_id | is_confirmed |
+-------------+-------------+-------------+--------------+
|           5 |        1003 |           5 |         TRUE |
|           2 |        1002 |           4 |         TRUE |
|           3 |        1002 |           3 |         TRUE |
|           6 |        1001 |           5 |        FALSE |
|           1 |        1001 |           1 |         TRUE |
|           4 |        1001 |           2 |        FALSE |
+-------------+-------------+-------------+--------------+
6 rows in set

Alternatively, you can run table operations using SQL, for example:

from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_env.execute_sql("""
    CREATE TEMPORARY TABLE t_txns (
        txn_id INT,
        acc_id INT,
        txn_value DOUBLE,
        txn_date STRING
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'test.csv',
        'format' = 'csv'
    )
""")

table_env.execute_sql("""
    CREATE TABLE t_sink_print (
        acc_id INT,
        max_txn_value DOUBLE
    ) WITH (
        'connector' = 'print'
    )
""")

table_env.execute_sql("""
    INSERT INTO t_sink_print
    SELECT acc_id, MAX(txn_value) AS max_txn_value
    FROM t_txns
    GROUP BY acc_id
""").wait()
Sample output
15> +I[1002, 110.0]
3> +I[1001, 20.0]
2> +I[1003, 23.0]
3> -U[1001, 20.0]
3> +U[1001, 78.0]

Write data to a sink table

The following example creates sink tables that store data in external locations:

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("pipeline.jars",
                       "file:///home/konstantin/flink-sql-connector-kafka-3.3.0-1.19.jar")
mock_data = [
    (1, 1001, 50.75, "2025-01-01"),
    (2, 1002, 70.00, "2025-01-02"),
    (3, 1001, 20.25, "2025-01-02")
]
t_txns = t_env.from_elements(
    elements=mock_data,
    schema=DataTypes.ROW([
        DataTypes.FIELD("txn_id", DataTypes.INT()),
        DataTypes.FIELD("acc_id", DataTypes.INT()),
        DataTypes.FIELD("txn_value", DataTypes.DOUBLE()),
        DataTypes.FIELD("txn_date", DataTypes.STRING())
    ])
)
t_env.create_temporary_view("txn_table", t_txns) (1)

t_env.execute_sql(""" (2)
    CREATE TABLE sink_table (
        txn_id INT,
        acc_id INT,
        txn_value DOUBLE,
        txn_date STRING
    ) WITH (
        'connector' = 'print' (3)
    )
""")

t_env.execute_sql(""" (4)
    INSERT INTO sink_table
    SELECT * FROM txn_table
    WHERE `acc_id` = 1002
""").wait()

t_env.create_temporary_table( (5)
    'kafka_sink',
    TableDescriptor.for_connector('kafka')
    .schema(Schema.new_builder()
            .column('txn_id', DataTypes.INT())
            .column('acc_id', DataTypes.INT())
            .column('txn_value', DataTypes.DOUBLE())
            .column('txn_date', DataTypes.STRING())
            .build())
    .option('topic', 'transactions_test')
    .option('properties.bootstrap.servers', 'localhost:9092')
    .format(FormatDescriptor.for_format('json')
            .build())
    .build())

t_env.execute_sql("""
    INSERT INTO kafka_sink
    SELECT * FROM txn_table
    WHERE `acc_id` = 1002
""").wait()
1 Creates a temporary view from a table.
2 Creates a sink table.
3 Forwards all table data to STDOUT.
4 Invokes the INSERT query that triggers spilling data to the sink location. Since the INSERT operation runs asynchronously in a separate thread, wait() is used to wait for results forwarded to STDOUT.
5 Creates another sink table, which uses a Kafka connector to write table data to the Kafka topic.

DataStream API

PyFlink DataStream API provides a lower-level control over Flink features, like state management, processing time, checkpoints, recovery, etc.

The central API object is a DataStream, which can be thought of as an immutable collection that travels through transformation stages. A typical application comprises the following steps:

  1. Create an execution environment.

  2. Read data from a source and produce one or more DataStream objects.

  3. Direct the DataStream(s) through one or more transformation operators. Each operator consumes an input DataStream and produces a new one with modified data.

  4. Save the DataStream content to an external location using sinks.

Create execution environment

The entry point of a DataStream API application is the execution environment object (StreamExecutionEnvironment). The following example shows how to instantiate one.

menv = StreamExecutionEnvironment.get_execution_environment() (1)
menv.set_parallelism(1) (2)
menv.set_runtime_mode(RuntimeExecutionMode.STREAMING)

# data ingest, transformation, and sink logic

menv.execute() (3)
1 Creates a StreamExecutionEnvironment object.
2 Sets configuration properties like parallelism, retries, serializer options, and so on.
3 Triggers the job execution. You can also use execute_async() to run a job asynchronously and retrieve job results later.

Once the environment object is created, you can use it to set job properties, define sources, sinks, operators, and trigger job execution.

Ingest data from a source

Using StreamExecutionEnvironment, you can define Flink sources to ingest data into the Flink app. Several ways of defining sources are shown below.

  • From a collection

  • From a Kafka topic

  • From a file

The following example creates a DataStream using an in-memory collection as a source.

from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment

config = Configuration()
config.set_string("execution.runtime-mode", "STREAMING")

menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)

mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
       ["txn_id", "acc_id", "txn_value", "txn_date"],
       [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
mds.print()

menv.execute()
Sample output
+I[1,1001,100.0,2025-01-01]
+I[2,1002,20.0,2025-01-02]
+I[3,1003,70.0,2025-01-03]
TIP

Tokens like +I, +U, -U, and -D indicate changelog row operations, namely:

  • +I — a new row was inserted.

  • +U — the previous version of a row (to be removed).

  • -U — the new version of the row (to be inserted).

  • -D — a row was deleted.

The following snippet creates a Kafka source by using a connector for Kafka.

from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer

menv = StreamExecutionEnvironment.get_execution_environment()
menv.add_jars("file:///home/konstantin/pyflink_demo/flink-sql-connector-kafka-3.3.0-1.19.jar") (1)

source_kafka = KafkaSource.builder() \ (2)
    .set_bootstrap_servers("<host>:<port>") \
    .set_topics("m_topic") \
    .set_group_id("my_group") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()
mds = menv.from_source(source_kafka, (3)
                      WatermarkStrategy.for_monotonous_timestamps(),
                      "Test Kafka source")

menv.execute()
1 Adds the Kafka connector JAR to the PyFlink job context. To get an up-to-date Kafka connector version, see Flink documentation.
2 Creates a Kafka source by connecting to a Kafka broker.
3 Creates a DataStream from the Kafka source.

The following snippet creates a DataStream from a CSV file, treating each line as a stream record.

from pyflink.common import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat

menv = StreamExecutionEnvironment.get_execution_environment()
file_source = FileSource.for_record_stream_format(
    StreamFormat.text_line_format(), (1)
    "test.csv"
).build()
wm_strategy = WatermarkStrategy.for_monotonous_timestamps()

ds = menv.from_source(source=file_source, (2)
                    source_name="test_file_source",
                    watermark_strategy=wm_strategy,

)
ds.print()
menv.execute()
1 Uses a built-in StreamFormat, which tells Flink to read files line by line, treating each line as a single record.
2 Uses a built-in watermark, which assumes DataStream records (lines) never arrive out of order.
Sample output

 

10> 1,1001,20,2025-01-02
10> 2,1002,110,2025-01-01
10> 3,1003,23,2025-01-01
10> 4,1002,50,2025-01-03
10> 5,1001,78,2025-01-02
TIP
10> indicates a task ID or a subtask index.

Transformation operators

PyFlink provides numerous operators like map(), filter(), reduce(), etc. Each operator consumes a DataStream, applies transformations on the data, and emits a new DataStream object. Basic operations are covered below:

from pyflink.common import Types, Row
from pyflink.datastream import StreamExecutionEnvironment, MapFunction

menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
ds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
    ["txn_id", "acc_id", "txn_value", "txn_date"],
    [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
print("Original DataStream:")
ds.print()

ds_filtered = ds.filter(lambda txn: txn["txn_value"] > 50.0) (1)
print("Filtered DataStream:")
ds_filtered.print()

ds_mapped = ds.map( (2)
    lambda txn: Row(
        txn["txn_id"],
        9999 if txn["acc_id"] == 1003 else txn["acc_id"],
        txn["txn_value"],
        txn["txn_date"]
    ), output_type=Types.ROW_NAMED(
        ["txn_id", "acc_id", "txn_value", "txn_date"],
        [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
print("Lambda-mapped DataStream:")
ds_mapped.print()

class UpdateAccID(MapFunction): (3)
    def map(self, value):
        txn_id, acc_id, txn_value, txn_date = value
        if acc_id < 2000:
            acc_id = acc_id * 2
        return (Row(txn_id, acc_id, txn_value, txn_date))

ds_mapped_class = ds.map(UpdateAccID(), (4)
                         output_type=Types.ROW_NAMED(
                             ["txn_id", "acc_id", "txn_value", "txn_date"],
                             [Types.INT(), Types.INT(), Types.FLOAT(), Types.STRING()]
                         ))
print("MapFunction-mapped DataStream:")
ds_mapped_class.print()

menv.execute()
1 Applies the filter() transformation on a DataStream.
2 Applies the map() transformation using a lambda function.
3 A custom MapFunction implementation to be used by the map operator. The map() method is called for each element of the DataStream.
4 Applies the map() transformation using a custom MapFunction implementation.
Sample output
Original DataStream:
+I[1,1001,100.0,2025-01-01]
+I[2,1002,20.0,2025-01-02]
+I[3,1003,70.0,2025-01-03]

Filtered DataStream:
+I[1,1001,100.0,2025-01-01]
+I[3,1003,70.0,2025-01-03]

Lambda-mapped DataStream:
+I[1,1001,100.0,2025-01-01]
+I[2,1002,20.0,2025-01-02]
+I[3,9999,70.0,2025-01-03]

Custom MapFunction-mapped DataStream:
+I[1,2002,100.0,2025-01-01]
+I[2,2004,20.0,2025-01-02]
+I[3,2006,70.0,2025-01-03]
NOTE
The output lines will be shuffled since every DataStream operator prints to STDOUT from its own thread.

Write data to a sink

After the computation steps are complete, you can direct the results to an external location using sinks.

  • Write to a file

  • Write to an ORC file

  • Write to a Kafka topic

The following snippet spills the DataStream content to a file sink using the FileSystem connector.

from pyflink.common import Types, Encoder
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy

OUTPUT_PATH = "file:///home/konstantin/pyflink_demo/fs_sink"
menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
    ["txn_id", "acc_id", "txn_value", "txn_date"],
    [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))
fs_sink = FileSink.for_row_format(OUTPUT_PATH, (1)
                                  Encoder.simple_string_encoder("UTF-8")) \
    .with_rolling_policy(RollingPolicy.default_rolling_policy( (2)
    part_size=1024 ** 3,
    rollover_interval=15 * 60 * 1000,
    inactivity_interval=5 * 60 * 1000)) \
    .build()

mds.sink_to(fs_sink) (3)
menv.execute()
1 Creates a FileSink sink using the row-encoded format.
2 Sets a rolling policy.
3 Flushes the DataStream records to the sink.

This Flink job creates a directory specified by the OUTPUT_PATH location, for example:

/home/konstantin/pyflink_demo/fs_sink
└─ /2025-04-08--13
    └─ part-4a8b2a20-187d-4ad6-ae7a-8f0e6dc880c5-0

Every DataStream record is written to a target file as a new line:

+I[1, 1001, 100.0, 2025-01-01]
+I[2, 1002, 20.0, 2025-01-02]
+I[3, 1003, 70.0, 2025-01-03]

To write binary data (ORC, Parquet, Avro, etc.) to a file, use bulk-encoded formats. A sample job that writes ORC bytes using FileSink is below:

from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink
from pyflink.datastream.formats.orc import OrcBulkWriters
from pyflink.table import DataTypes

menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
menv.add_jars("file:///home/konstantin/pyflink_demo/flink-sql-orc-1.19.2.jar") (1)
OUTPUT_PATH = "file:///home/konstantin/pyflink_demo/fs_sink_orc"
mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data,
                           type_info=Types.ROW_NAMED(
                               ["txn_id", "acc_id", "txn_value", "txn_date"],
                               [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))

row_type = DataTypes.ROW([
    DataTypes.FIELD('txn_id', DataTypes.INT()),
    DataTypes.FIELD('acc_id', DataTypes.INT()),
    DataTypes.FIELD('txn_value', DataTypes.DOUBLE()),
    DataTypes.FIELD('txn_date', DataTypes.STRING()),
])

file_sink_orc = FileSink.for_bulk_format(
    OUTPUT_PATH,
    OrcBulkWriters.for_row_type( (2)
        row_type=row_type,
        writer_properties=Configuration(),
        hadoop_config=Configuration(),
    )
).build()

mds.sink_to(file_sink_orc)
menv.execute()
1 Adds a dependency JAR to work with ORC.
2 Uses an ORC writer with FileSink.

The following snippet forwards the content of a DataStream to a Kafka topic using the Kafka sink object. The connection to the server is provided by the Kafka connector.

menv = StreamExecutionEnvironment.get_execution_environment()
menv.set_parallelism(1)
menv.add_jars("file:///D:/dev/py/pyflink/flink-sql-connector-kafka-3.3.0-1.19.jar") (1)
mock_data = [
    (1, 1001, 100.00, "2025-01-01"),
    (2, 1002, 20.00, "2025-01-02"),
    (3, 1003, 70.00, "2025-01-03"),
]
mds = menv.from_collection(mock_data, type_info=Types.ROW_NAMED(
       ["txn_id", "acc_id", "txn_value", "txn_date"],
       [Types.INT(), Types.INT(), Types.DOUBLE(), Types.STRING()]))

kafka_sink = KafkaSink.builder() \ (2)
    .set_bootstrap_servers("<host>:<port>") \
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
            .set_topic("m_topic")
            .set_value_serialization_schema(SimpleStringSchema())
            .build()
    ) \
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
    .build()

mds.sink_to(kafka_sink) (3)
menv.execute()
1 Adds the Kafka connector JAR to the PyFlink job context. To get an up-to-date Kafka connector version, see Flink documentation.
2 Creates a Kafka sink object that connects to a Kafka broker using the "<host>:<port>" values.
3 Writes DataStream items to the Kafka sink.

Dependency management

PyFlink jobs may require third-party dependencies, for example, Python libraries, connector JARs, ML frameworks, etc. The ways for importing these are slightly different for Table API and DataStream API and are shown below.

JAR dependencies

  • Table API

  • DataStream API

  • As a CLI argument

table_env.get_config().set("pipeline.jars", "file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
table_env.get_config().set("pipeline.classpaths", "file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
stream_ex_env.add_jars("file:///path/to/myJar1.jar;file:///path/to/myJar2.jar")
stream_ex_env.add_classpaths("file:///path/to/myJar1.jar", "file:///path/to/myJar2.jar")
$ flink run \
      --python test_flink_job.py \
      --jarfile flink-sql-connector-kafka-3.3.0-1.19.jar

Python dependencies

  • Table API

  • DataStream API

  • As a CLI argument

table_env.add_python_file("path/to/myScript.py")
stream_execution_environment.add_python_file("path/to/myScript.py")
$ flink run \
      --python test_flink_job.py \
      --pyFiles test.py
Found a mistake? Seleсt text and press Ctrl+Enter to report it