Логирование в Flink
Сервис Flink генерирует текстовые логи, которые могут быть полезны при анализе причин различных ошибок, возникающих при работе с сервисом. По умолчанию Flink использует фасад протоколирования SLF4J, а также Log4j 2 в качестве фреймворка логирования. Также допускается использование любого SLF4J-совместимого фреймворка, например Log4j 1, Logback и так далее.
Расположение и формат лог-файлов
По умолчанию Flink хранит логи в локальной директории /var/log/flink на хостах ADH, где установлены компоненты Flink. Пример содержимого директории /var/log/flink/ показан ниже.
$ ls -l /var/log/flink
-rw-rw-r-- 1 admin admin 37329 Mar 13 11:34 flink-admin-client-ka-adh-6.ru-central1.internal.log -rw-r--r-- 1 flink flink 77921 Mar 13 11:34 flink-flink-standalonesession-0-ka-adh-6.ru-central1.internal.log -rw-r--r-- 1 flink flink 457 Mar 13 11:07 flink-flink-standalonesession-0-ka-adh-6.ru-central1.internal.out -rw-r--r-- 1 flink flink 70599 Mar 13 11:34 flink-flink-taskexecutor-0-ka-adh-6.ru-central1.internal.log -rw-r--r-- 1 flink flink 3084 Mar 13 11:34 flink-flink-taskexecutor-0-ka-adh-6.ru-central1.internal.out -rw-r--r-- 1 flink flink 73 Mar 13 11:07 flink-jobmanager.out -rw-r--r-- 1 root root 38489 Mar 13 11:34 flink-root-client-ka-adh-6.ru-central1.internal.log -rw-r--r-- 1 flink flink 68 Mar 13 11:07 flink-taskmanager.out
Типы логов Flink
Компоненты Flink генерируют несколько типов лог-файлов, описанных ниже.
Имя лог-файла | Атрибуты | Комментарий |
---|---|---|
flink-<user_name>-client-<host_name>.log |
|
Клиентские логи Flink |
flink-flink-standalonesession-<n>-<host_name>.log |
|
Логи YARN |
flink-flink-taskexecutor-<n>-<host_name>.log |
|
Логи TaskExecutor |
flink-jobmanager.out |
— |
Вывод Flink JobManager |
flink-taskmanager.out |
— |
Вывод Flink TaskManager |
Настройка логирования в ADCM
С использованием Log4j настройка процесса логирования в сервисе Flink сводится к модификации конфигурационных файлов log4j.properties. Сервис Flink использует два основных файла конфигурации:
-
log4j.properties. Файл конфигурации, используемый компонентами JobManager/TaskManager.
-
log4j-cli.properties. Используется для логирования событий, связанных с интерфейсом командной строки Flink.
Данные файлы хранятся в директории /etc/flink/conf на хостах с установленными компонентами Flink. Однако вместо того, чтобы редактировать файлы вручную на каждом хосте, ADCM предоставляет возможность обновить конфигурацию Log4j на всех хостах ADH одновременно. Для этого:
-
В ADCM UI перейдите на страницу Clusters → <cluster_name> → Services → Flink → Primary Configuration и активируйте опцию Show advanced.
-
Выберите раздел log4j.properties/log4j-cli.properties для редактирования конфигурации. Конфигурация Log4j, указанная через ADCM UI, перезапишет содержимое файлов /etc/flink/conf/log4j*.properties при рестарте сервиса Flink. Подробная информация о конфигурационных свойствах Log4j доступна в документации Log4j.
ПРИМЕЧАНИЕ
Обновление конфигурационных файлов Log4j не требует перезапуска сервиса Flink.
Данные файлы периодически сканируются (по умолчанию каждые 30 секунд) и могут обновляться "на лету".
|
Ниже приведены параметры конфигурации сервиса Flink, связанные с логированием.
Раздел настроек | Конфигурационный параметр | Значение по умолчанию | Описание |
---|---|---|---|
flink-env.sh |
FLINK_LOG_DIR |
/var/log/flink |
Корневая директория лог-файлов Flink |
flink-conf.yaml |
Logging level |
INFO |
Рутовый уровень логирования |
Файл log4j.properties
Ниже представлен файл log4j.properties, используемый Flink по умолчанию. Основные свойства подсвечены комментариями с информацией о назначении конкретного параметра.
################################################################################
{% include 'apache_license.txt' %}
################################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30 (1)
# This affects logging for both user code and Flink
rootLogger.level = {{ services.flink.config.flink_conf.logging_level }} (2)
rootLogger.appenderRef.file.ref = MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO
{% raw %}
# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file} (3)
appender.main.filePattern = ${sys:log.file}.%i (4)
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n (5)
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy (6)
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy (7)
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10} (8)
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
{% endraw %}
1 | Интервал в секундах для сканирования данной конфигурации Log4j на предмет изменений. |
2 | Установка рутового уровня логирования.
Возможные уровни: ALL , TRACE , DEBUG , INFO , WARN , ERROR , FATAL , OFF .
Дефолтное значение берется из свойства flink-conf.yaml→Logging level в ADCM. |
3 | Установка имени лог-файлов. Шаблон по умолчанию генерирует имена файлов типа flink-flink-taskexecutor-<n>-<host_name>.log. |
4 | Шаблон для имен лог-файлов после ротации. |
5 | Шаблон для записи объекта события в лог-файл. |
6 | Политика ротации лог-файлов по размеру.
Когда файл достигает размера, указанного в appender.main.policies.size.size , аппендер прекращает запись в этот файл и начинает записывать все события в новый. |
7 | Политика, выполняющая ротацию лог-файлов при старте Flink (если лог-файл старше времени запуска JVM). |
8 | Установка максимального количества лог-файлов. При превышении этого значения самые старые файлы удаляются. |
Просмотр логов в Flink UI
Логи Flink, генерируемые компонентами TaskManager/JobManager, также доступны в Flink UI. Чтобы увидеть логи TaskManager, в Flink UI откройте страницу Task Managers и выберите нужный Task Manager. Текстовые логи, системный вывод, а также другие метаданные доступны на вкладках, как показано на рисунке ниже.


Логи Job Manager доступны на странице Job Manager на вкладке Logs.
Логирование в приложениях Flink
В следующем примере показано базовое использование логера SLF4J в Java-приложении Flink.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyFlinkApp {
private static final Logger logger = LoggerFactory.getLogger(MyFlinkApp.class); (1)
public static void main(String[] args) {
logger.info("Log INFO event"); (2)
try {
logger.debug("In-try with placeholder: {}", "foobar"); (3)
} catch (Exception e) {
logger.error("An {} has occurred.", "error", e);
}
}
}
1 | Получение объекта логера.
Объект логера рекомендуется хранить в поле с модификаторами private static final . |
2 | Сохранение произвольного сообщения в лог. |
3 | Логирование сообщения с плейсхолдером. |
Для Flink-приложений на Python пример логирования выглядит следующим образом:
import logging
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(message)s")
logging.info("Log this INFO event")