Logging in Flink
The Flink service generates text logs that can be useful for analyzing the cause of different errors that may occur while working with the service. By default, Flink uses the SLF4J logging facade and Log4j 2 as a logging framework. You can also use any SLF4J-compliant logging framework, such as Log4j 1 or Logback.
Logs location and format
By default, Flink stores logs in the /var/log/flink directory on ADH hosts where Flink components are installed. An example of the /var/log/flink/ directory contents is below.
$ ls -l /var/log/flink
-rw-rw-r-- 1 admin admin 37329 Mar 13 11:34 flink-admin-client-ka-adh-6.ru-central1.internal.log -rw-r--r-- 1 flink flink 77921 Mar 13 11:34 flink-flink-standalonesession-0-ka-adh-6.ru-central1.internal.log -rw-r--r-- 1 flink flink 457 Mar 13 11:07 flink-flink-standalonesession-0-ka-adh-6.ru-central1.internal.out -rw-r--r-- 1 flink flink 70599 Mar 13 11:34 flink-flink-taskexecutor-0-ka-adh-6.ru-central1.internal.log -rw-r--r-- 1 flink flink 3084 Mar 13 11:34 flink-flink-taskexecutor-0-ka-adh-6.ru-central1.internal.out -rw-r--r-- 1 flink flink 73 Mar 13 11:07 flink-jobmanager.out -rw-r--r-- 1 root root 38489 Mar 13 11:34 flink-root-client-ka-adh-6.ru-central1.internal.log -rw-r--r-- 1 flink flink 68 Mar 13 11:07 flink-taskmanager.out
Log types
There are several types of logs generated by Flink components. Their description is listed below.
Log name | Attributes | Comment |
---|---|---|
flink-<user_name>-client-<host_name>.log |
|
Flink client logs |
flink-flink-standalonesession-<n>-<host_name>.log |
|
YARN logs |
flink-flink-taskexecutor-<n>-<host_name>.log |
|
TaskExecutor job details |
flink-jobmanager.out |
— |
Flink JobManager’s output |
flink-taskmanager.out |
— |
Flink TaskManager’s output |
Configure logging with ADCM
With the use of Log4j, configuring logging in Flink assumes editing Log4j configuration files. The Flink service uses two Log4j configuration files:
-
log4j.properties. The default configuration file used by JobManager/TaskManager components.
-
log4j-cli.properties. Used for logging events related to the Flink command line interface.
These files are located in the /etc/flink/conf directory on the ADH hosts with Flink components installed. However, instead of editing the files manually on each host, ADCM provides a way to update the Log4j configuration on all ADH hosts at once. For this:
-
In ADCM, go to Clusters → <cluster_name> → Services → Flink → Primary Configuration and enable the Show advanced option.
-
Click log4j.properties/log4j-cli.properties and edit the configuration. The Log4j configuration specified via ADCM UI will overwrite the contents of the /etc/flink/conf/log4j*.properties files during the service restart. For detailed reference on supported Log4j properties, see Log4j 2 documentation.
NOTE
Updates to Log4j configuration files do not require a restart of the Flink service.
The configuration is scanned periodically (30 seconds by default) and can be refreshed on the fly.
|
Below are ADCM configuration properties of the Flink service related to logging.
Settings section | Configuration property | Default value | Description |
---|---|---|---|
flink-env.sh |
FLINK_LOG_DIR |
/var/log/flink |
The location of Flink log files |
flink-conf.yaml |
Logging level |
INFO |
The root log level |
Default log4j.properties
Below is the default log4j.properties file used by Flink. The major properties are highlighted to help you get started with Log4j configuration.
################################################################################
{% include 'apache_license.txt' %}
################################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30 (1)
# This affects logging for both user code and Flink
rootLogger.level = {{ services.flink.config.flink_conf.logging_level }} (2)
rootLogger.appenderRef.file.ref = MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO
{% raw %}
# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file} (3)
appender.main.filePattern = ${sys:log.file}.%i (4)
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n (5)
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy (6)
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy (7)
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10} (8)
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
{% endraw %}
1 | An interval in seconds for scanning this Log4j configuration for updates. |
2 | Sets the root log level.
The possible severity levels are: ALL , TRACE , DEBUG , INFO , WARN , ERROR , FATAL , and OFF .
The default value is taken from the flink-conf.yaml→Logging level property in ADCM. |
3 | Defines a name for log files. The default pattern generates file names like flink-flink-taskexecutor-<n>-<host_name>.log. |
4 | Defines a file name pattern for rotated files. |
5 | Specifies a pattern to convert log events to strings. |
6 | A rollover policy for rotating log files upon reaching a certain size.
When a log file reaches the size limit set in the appender.main.policies.size.size parameter, the appender starts writing to a new log file. |
7 | Another policy that causes log files rollover on Flink startup (if a log file is older than the current JVM’s startup time). |
8 | Sets the maximum number of log files. When exceeded, the oldest log files are deleted. |
View logs in Flink UI
You can also view Flink log files generated by TaskManagers/JobManagers using Flink UI. To view TaskManager logs, in Flink UI go to the Task Managers page and select the Task Manager you need. Text logs, system output, and other metadata are available on tabs as shown in the image.


To view Job Manager logs, go to the Job Manager page and switch to the Logs tab.
Logging in Flink apps
The following snippet shows how to use an SLF4J logger in your Java applications for Flink.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyFlinkApp {
private static final Logger logger = LoggerFactory.getLogger(MyFlinkApp.class); (1)
public static void main(String[] args) {
logger.info("Log INFO event"); (2)
try {
logger.debug("In-try with placeholder: {}", "foobar"); (3)
} catch (Exception e) {
logger.error("An {} has occurred.", "error", e);
}
}
}
1 | Creates a logger object.
It is recommended to keep the logger object in a private static final field. |
2 | Logs an arbitrary event. |
3 | Logs an event using placeholders. |
For Python apps, log events can be created as follows:
import logging
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(message)s")
logging.info("Log this INFO event")