Flink performance tuning
This article describes optimization techniques and best practices that can help you improve the performance of your Flink service.
Memory
A sufficient amount of memory allocated to Flink Task Manager/Job Manager components is vital for smooth Flink operation. This includes not only JVM heap but also metaspace and off-heap memory. You can control the memory settings using taskmanager.memory.* and jobmanager.memory.* properties in ADCM (Clusters → <cluster_name> → Services → Flink → Primary Configuration → flink-conf.yaml).
Parallelism
Using appropriate parallelism configuration is crucial for even distribution of tasks across the Flink cluster. The major parameters for tuning Flink’s parallelism are described below. You can set these parameters in the flink-conf.yaml settings in ADCM. For information on how to set parallelism programmatically on operator/environment/client levels, see Flink documentation.
Configuration property | Description | Default value |
---|---|---|
parallelism.default |
Sets the number of parallel subtasks that each operator can run by default. If an operator or a job does not have its own parallelism factor set, Flink uses this value |
1 |
taskmanager.numberOfTaskSlots |
The number of parallel operators or user function instances that a single TaskManager can run.
Setting a value of more than |
1 |
Checkpoints and savepoints
In Flink, checkpointing is a mechanism for ensuring fault tolerance, which periodically persists the state of an application to a durable location (for example, HDFS) so that it can recover in case of failures. By default, the checkpointing is disabled.
The major parameters affecting the performance are checkpointing intervals and the timeout. These can be configured using the following configuration properties.
Configuration property | Description | Default value |
---|---|---|
execution.checkpointing.interval |
The interval at which checkpoints are periodically scheduled |
— |
execution.checkpointing.timeout |
The maximum time for a checkpointing operation to complete before it is discarded |
10 min |
These and other checkpointing parameters can be set in the Custom flink-conf.yaml ADCM section. The following example shows checkpointing configuration in a Flink app code:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000); (1)
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); (2)
config.setCheckpointTimeout(60000);
config.setMaxConcurrentCheckpoint(1); (3)
1 | Enables checkpoints every 1000 milliseconds. |
2 | Sets the EXACTLY_ONCE checkpointing mode. |
3 | Defines the maximum number of concurrent checkpoints. |
Flink savepoints are a similar mechanism for creating snapshots of an application’s state and resuming the application execution later from that point. Unlike checkpoints, which are automatically created at regular intervals, savepoints are explicitly triggered by the user. Checkpoints are used to ensure fault tolerance, whereas savepoints best fit job upgrades, migrations, and manual backups. Savepoints are more flexible and allow you to configure the size, frequency, retention, and other characteristics that best fit your ETL pipelines.
State management
Using a suitable state backend may significantly improve your Flink performance depending on the load type. Flink comes with several state backends (a custom implementation is also allowed):
-
HashMapStateBackend. The default implementation that stores state data internally in a hash map. Being an in-memory storage, this is a very fast state management mechanism; however, the maximum state data size is limited by the amount of memory available on Flink cluster nodes. This backend fits small or medium-sized state applications.
-
EmbeddedRocksDBStateBackend. Stores state data in a RocksDB database, which involves de/serialization and I/O operations. This state backend is profitable for jobs with large state data. The amount of state data that can be handled by the backend is only limited by the amount of disk space. For large-scale applications, consider using
EmbeddedRocksDBStateBackend
or other implementations.
A state backend can be set globally or on a per-job basis.
To specify a default backend for the Flink service, use the property state.backend.type
in Custom flink-conf.yaml.
The possible property values are hashmap
, rocksdb
, or a class name implementing StateBackendFactory
.
A way to override the default state backend for a job is shown below:
Configuration conf = new Configuration();
conf.set(StateBackendOptions.STATE_BACKEND, "hashmap");
env.configure(config);
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new HashMapStateBackend())
conf = Configuration()
conf.set_string('state.backend.type', 'hashmap')
env = StreamExecutionEnvironment.get_execution_environment(config)
Optimize aggregations
Like most data systems, Flink supports aggregate functions like SUM
, COUNT
, DISTINCT
, etc.
For intensive data streams, using aggregate functions may become a bottleneck in your ETL pipelines.
Below are some tips on optimizing the aggregate functions' behavior.
MiniBatch aggregation
By default, group aggregation operators process input records sequentially one-by-one. This approach tends to increase the load on the state backend system (especially if RocksDB is used). Moreover, inevitable data skews in production can worsen the problem, leading to frequent backpressure cases.
The main idea of the mini-batch aggregation is caching a bunch of input records in a buffer by an aggregation operator. When a batch of input records is processed, Flink needs only one operation per key to access the state data. This reduces the state data overhead and ensures a better throughput. However, this design assumes some latency since incoming records are buffered rather than being processed immediately. Enabling mini-batch processing offers a trade-off between stable throughput and latency.
The diagram below illustrates the mini-batch aggregation approach versus the traditional Flink aggregation flow.
MiniBatch optimization is disabled by default. To enable this optimization, use the following properties in the Custom flink-conf.yaml ADCM section:
Configuration property | Description | Default value |
---|---|---|
table.exec.mini-batch.enabled |
Enables or disables the mini-batch optimization to buffer input records. Flink forms batches either at regular intervals or when the maximum number of buffered records is reached |
false |
The maximum latency interval to buffer input records.
The time unit is specified together with the value, for example |
— |
|
table.exec.mini-batch.size |
The maximum number of input records that can be buffered in a batch |
-1 |
An example of setting these properties programmatically is below:
Configuration configuration = new Configuration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "2 s");
configuration.setString("table.exec.mini-batch.size", "1000");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.withConfiguration(configuration)
.build();
val configuration = new Configuration;
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "2 s")
configuration.setString("table.exec.mini-batch.size", "1000")
val settings = EnvironmentSettings.newInstance
.inStreamingMode
.withConfiguration(configuration)
.build
val tEnv: TableEnvironment = TableEnvironment.create(settings)
configuration = Configuration()
configuration.set("table.exec.mini-batch.enabled", "true")
configuration.set("table.exec.mini-batch.allow-latency", "2 s")
configuration.set("table.exec.mini-batch.size", "1000")
settings = EnvironmentSettings.new_instance() \
.in_streaming_mode() \
.with_configuration(configuration) \
.build()
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '5000';
Local-global aggregation
This optimization is aimed at minimizing the effect of input data skew by dividing the group aggregation into two stages:
-
First, data is aggregated locally in upstream, followed by the global aggregation in downstream, which is similar to combine and reduce stages of MapReduce.
-
Then, the global aggregation stage receives batches with accumulated (reduced) data instead of processing a huge number of raw inputs. Aggregating the batched data assumes an even load on each operator.
Such splitting can significantly reduce the network shuffle and the frequency of state access operations. The number of input records accumulated by local aggregation is based on the mini-batch interval, meaning that this optimization works together with the MiniBatch optimization.
Consider the following SQL:
SELECT acc_id, SUM(txn_id)
FROM transactions
GROUP BY acc_id;
If the records in the input data stream are skewed, some of the SUM
operators will have to process much more records than others, which creates an uneven load.
Adding the local aggregation stage allows accumulating batches of records with the same key and updating the related state data with fewer operations.
The following diagram illustrates the local-global aggregation flow.
To enable this optimization, set the table.optimizer.agg-phase-strategy=TWO_PHASE
property in the Custom flink-conf.yaml ADCM section or set it programmatically:
Configuration configuration = new Configuration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "2 s");
configuration.setString("table.exec.mini-batch.size", "1000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
...
val configuration = new Configuration;
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "2 s")
configuration.setString("table.exec.mini-batch.size", "1000")
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE")
...
configuration = Configuration()
configuration.set("table.exec.mini-batch.enabled", "true")
configuration.set("table.exec.mini-batch.allow-latency", "2 s")
configuration.set("table.exec.mini-batch.size", "1000")
configuration.set_string("table.optimizer.agg-phase-strategy", "TWO_PHASE")
...
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '5000';
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';
Split DISTINCT aggregation
The two-phase local-global aggregation is effective for minimizing data skews for aggregations like SUM
, COUNT
, MAX
, MIN
, AVG
.
However, it is not so efficient when it comes to DISTINCT
.
Consider the following SQL used for getting the number of unique accounts per day:
SELECT txn_date, COUNT(DISTINCT acc_id)
FROM transactions
GROUP BY txn_date;
Enabling the local-global aggregation for this query gives very little gain in performance (especially if the values of acc_id
are sparse).
In this case, Flink has to put almost all the input records into the accumulator, and the global aggregation stage will throttle down the overall processing flow.
The solution for this is splitting the DISTINCT
aggregation into two phases:
-
First, Flink shuffles the aggregation by a group key and additionally by a bucket key. The bucket key is calculated using the formula: .
BUCKET_NUM
defaults to1024
and can be changed using thetable.optimizer.distinct-agg.split.bucket-num
property. -
Then, another aggregation shuffles the data by the original group key, and
SUM
is used to sum upCOUNT(DISTINCT …)
values from different buckets. The bucket key in this flow plays the role of an additional group key to reduce the load for a single group.
With this DISTINCT
-splitting approach, Flink executes the above SQL query as if it had the following structure:
SELECT txn_date, SUM(cnt)
FROM (
SELECT txn_date, COUNT(DISTINCT acc_id) as cnt
FROM transactions
GROUP BY txn_date, MOD(HASH_CODE(acc_id), 1024)
)
GROUP BY txn_date;
The following diagram illustrates the aggregation splitting process for the sample SQL query above (colors represent the txn_date
groups and letters indicate account IDs).
NOTE
This diagram illustrates the simplest example that can benefit from this optimization.
Flink also supports splitting more complex aggregation queries, involving two or more aggregates with different distinct keys, for example, COUNT(DISTINCT k1), SUM(DISTINCT k2) .
|
To enable this optimization, set the table.optimizer.distinct-agg.split.enabled=true
property in the Custom flink-conf.yaml ADCM section or specify it in your Flink app:
TableEnvironment env = ...;
env.getConfig()
.set("table.optimizer.distinct-agg.split.enabled", "true");
val tenv: TableEnvironment = ...
tenv.getConfig
.set("table.optimizer.distinct-agg.split.enabled", "true")
tenv = ...
tenv.get_config().set("table.optimizer.distinct-agg.split.enabled", "true")
Data processing and serialization
Below are some useful tips and recommendations that can help improve Flink’s data processing speed.
-
Adjust network buffers. Changing network buffer settings can affect how fast your Flink apps run. The idea is to pick a proper buffer size by balancing between memory consumption and processing speed. Track how much data is flowing through these buffers. This is one way to avoid slowdowns in your applications.
-
Use windows for managing data streams.
-
Optimizing de/serialization processes in Flink is vital for high performance and scalability. Inefficient de/serialization multiplied by the amount of input data may slow down Flink applications, especially with event-intensive data streams. Choose a serialization framework that fits your app, like Avro or Protobuf.