Сжатие журналов в Kafka

В данной статье описаны существующие политики очищения (сжатия) журналов в Kafka, параметры, используемые при настройке политик, а также способы настройки политик.

Политика очищения журналов

Политика очищения журнала log.cleanup.policy — основной параметр, определяющий стратегию удаления старых записей.

Ниже приведены возможные варианты политики очищения журнала:

  • Удаление (delete) — удаление записей по причине достижения одного из заданных параметров или двух параметров одновременно:

    • срока хранения записи;

    • обьема сохраненной информации (в байтах).

  • Сжатие (compact) — сохранение только последних значений для каждого ключа.

  • Удаление и сжатие (delete and compact)  — сохранение только последних значений для каждого ключа с учетом параметров сроков хранения или обьема информации.

Выбор политики очищения журналов зависит от требований проекта.

На рисунке ниже показано, как отличается очередь сообщений после применения разных политик очищения журнала.

Политики очищения журнала
Политики очищения журнала
Политики очищения журнала
Политики очищения журнала

Ниже описаны алгоритмы хранения в зависимости от выбранной политики для сообщения со смещением 5:

  • при политике delete сообщение удаляется, так как имеет время хранения больше, чем заданное значение;

  • при политике compact сообщение сохраняется в очереди, так как содержит последнее значение для ключа key2;

  • при политике delete and compact сообщение удаляется из-за срока хранения, несмотря на то, что значение для ключа key2 не было перезаписано.

Все политики сжатия журнала основаны на работе с сегментами партиции.

Ниже подробно рассмотрены все политики очищения журнала и использующиеся параметры.

Удаление (delete)

Политика удаления (delete) подходит для журналов, где необходима только фиксация данных о событии, а сообщения не будут перезаписываться.

При выборе политики удаления удаление сообщений происходит в зависимости от настроенных параметров:

  • log.retention.hours — время хранения сообщения, после достижения которого оно будет удалено. Этот параметр по умолчанию установлен для всех брокеров и создаваемых топиков (значение 168). Также это значение может быть изменено для каждого топика индивидуально при помощи параметра retention.ms.

  • log.retention.bytes — размер журнала в байтах, после достижения которого журнал будет удален (по умолчанию установлено значение -1 — ограничения нет). Также это значение может быть установлено для каждого топика индивидуально при помощи параметра retention.bytes.

Также допускается устанавливать сразу оба параметра для одновременного управления временем жизни сообщений и обьемом журнала.

Для управления частотой удаления сообщений используется параметр log.cleanup.interval.mins, определяющий, как часто средство очистки журналов проверяет журналы, подлежащие удалению.

В алгоритме удаления сообщений различаются следующие виды сегментов партиции журнала:

  • Закрытые сегменты журнала (log segment) — сегменты, в которых хранятся сообщения и запись новых сообщений не производится.

  • Aктивный сегмент журнала (active log segment) — сегмент, в который в данный момент времени производится запись. Активный сегмент не участвует в алгоритме удаления сообщений.

Схема удаления сообщений журнала Kafka при ограничении по времени хранения показана на рисунке ниже.

Удаление сообщений
Удаление сообщений
Удаление сообщений
Удаление сообщений

Сообщения удаляются в составе сегментов в следующей последовательности:

  1. Определение сообщения с максимальной временной меткой (самое новое из имеющихся) внутри одного сегмента.

  2. Определение разницы между максимальной временной меткой (Tmax) и текущим временем (Tcurrent), сравнение полученного значения со значением, заданным в log.retention.hours (retention.ms для топиков).

  3. Если для сегмента выполняется условие Tcurrent - Tmax > log.retention, то сегмент удаляется целиком.

Таким образом, общее время жизни сообщения может быть больше указанного значения в log.retention.hours и зависеть от установленного значения размера сегмента segment.bytes.

Cжатие (сompact)

Обзор политики сжатия

Политика сжатия (compact) обычно применяется для журналов, сообщения в которых имеют формат ключ/значение. Например, базы данных, сохраняющие часто перезаписываемые значения для одних и тех же ключей.

Эта политика используется по умолчанию для служебного топика Kafka __consumer_offsets.

В алгоритме сжатия журналов различаются следующие виды сегментов партиции:

  • "Чистые" сегменты журнала (clean log segment) — сегменты, в которых нет повторяющихся ключей, т.е. все сегменты содержат только по одному значению для каждого ключа — последнему на момент предыдущего сжатия. В "чистых" сегментах могут быть пробелы в нумерации смещений из-за предыдущего сжатия.

  • "Грязные" сегменты журнала (dirty log segment) — сегменты, которые были записаны после последнего "чистого" сегмента. В них могут быть повторяющиеся ключи, а также ключи, присутствующие в "чистых" сегментах. "Грязные" сегменты начинаются с точки cleaner point.

  • "Aктивный" сегмент журнала (active log segment) — сегмент, в который в данный момент времени производится запись. Активный сегмент не участвует в алгоритме сжатия.

Запуск сжатия

Момент запуска сжатия регулируется параметрами брокера:

  • log.cleaner.min.cleanable.ratio — соотношение объема "грязных" сегментов к общему объему журнала, при котором начинается запуск сжатия (по умолчанию установлено значение 0.5). Также это значение может быть установлено для каждого топика индивидуально при помощи параметра min.cleanable.dirty.ratio.

  • log.cleaner.min.compaction.lag.ms — минимальное время, в течение которого сообщение будет оставаться в журнале несжатым (по умолчанию задано значение 0 — ограничения нет). Сжатие не начнется, пока не истечет заданное время, даже если значение параметра min.cleanable.dirty.ratio достигло установленного уровня. Также это значение может быть установлено для каждого топика индивидуально при помощи параметра min.compaction.lag.ms.

  • log.cleaner.max.compaction.lag.ms — максимальное время, в течение которого сообщение не подлежит сжатию в журнале (по умолчанию для всех создаваемых топиков задано значение 9223372036854775807). Сжатие начнется по истечении заданного времени, даже если значение параметра min.cleanable.dirty.ratio не достигло установленного уровня. Также это значение может быть установлено для каждого топика индивидуально при помощи параметра max.compaction.lag.ms.

Алгоритм сжатия

Основа алгоритма сжатия — создание хеш-таблицы (hash table), которая является результатом сканирования "грязных" сегментов. В хеш-таблицу записываются последние найденные смещения для каждого ключа. Далее, используя данную таблицу, потоки очистки копируют весь журнал в буфер, удаляют смещения, отсутствующие в хеш-таблице, и перезаписывают оставшиеся смещения в новый сегмент. При этом исходное смещение каждого события сохраняется (это может привести к появлению пробелов в смещениях).

Размер буфера потоков очистки и фактор его заполнения определяются конфигурационными параметрами брокера:

Последовательность сжатия журнала Kafka показана на рисунке ниже.

Сжатие журнала
Сжатие журнала
Сжатие журнала
Сжатие журнала
  1. Сканирование "грязных сегментов", начиная с точки cleaner point и заканчивая началом "активного" сегмента, создание хеш-таблицы с последними смещениями и определение сообщений, которые будут удалены. Точное число сегментов, сканируемых за одну итерацию, зависит от объема памяти буфера потока очистки.

  2. Сканирование "чистых" сегментов с учетом последних ключей, записанных в хеш-таблицу при сканировании "грязных" сегментов, и определение сообщений, которые будут удалены.

  3. Копирование в новые сегменты журнала сохраняемых смещений как в "грязных", так и в "чистых" сегментах. В зависимости от установленного значения размера сегмента segment.bytes, сообщения могут быть скопированы в один или несколько новых сегментов.

  4. Удаление старых сегментов и установка контрольной точки (new cleaner point) на последнем очищенном смещении. Эта контрольная точка отмечает начало первого "грязного сегмента", в который превратится "активный" сегмент после заполнения. Следующий этап сжатия журнала начнется с этого сегмента.

Удаление при помощи сжатия

После записи сообщения с ключом и нулевым значением это сообщение будет удалено из журнала.

Последовательность удаления записи из журнала при помощи сжатия показана на рисунке ниже.

Удаление записи при помощи сжатия в Kafka
Удаление записи при помощи сжатия в Kafka
Удаление записи при помощи сжатия в Kafka
Удаление при помощи сжатия в Kafka
  1. Cообщение с ключом и нулевым значением записывается в "активный" сегмент журнала. Такую запись называют tombstone.

  2. После того как сегмент, в котором находится tombstone, закрывается, он переходит в разряд "грязных" сегментов. Когда он подвергается сжатию согласно алгоритму, описанному выше, все предыдущие сообщения с таким же ключом, как у tombstone, удаляются, а tombstone помечается как готовое к удалению, и ему назначается время удаления, определяемое конфигурационным параметром брокера log.cleaner.delete.retention.ms (также это значение может быть изменено для каждого топика индивидуально при помощи параметра delete.retention.ms). В течение этого времени tombstone будет доступен для чтения.

  3. При последующих сжатиях все события, время удаления которых уже достигнуто, будут удалены.

Гарантии и преимущества сжатия

  • Так как сжатие сохраняет самое последнее значение для каждого ключа, оно идеально подходит для резервного копирования таблиц различных баз данных.

  • Сжатые топики продолжают получать обновления без увеличения объема журналов.

  • Пользователи, читающие сжатые топики, гарантированно увидят самое последнее значение для нужного ключа.

  • Сжатие никогда не меняет порядок сообщений.

  • Смещение сообщения никогда не меняется. Это постоянный идентификатор позиции в журнале.

  • Существует возможность настройки времени хранения маркеров удаления записей.

  • Сжатие не блокирует чтение и запись сообщений. Сжатие может быть настроено на использование рационального обьема ресурсов памяти, чтобы не влиять на производителей и потребителей.

Удаление и сжатие (delete and compact)

При выборе данной политики будут совмещены алгоритмы удаления и сжатия сообщений в журнале. Политика может применяться для журналов с сообщениями в формате ключ/значение, которые не нуждаются в длительном хранении, если они не были перезаписаны.

Настройка сжатия

В Kafka очищение журналов включается при помощи параметра брокера log.cleaner.enable, который определяется в конфигурационном файле /usr/lib/kafka/config/server.properties.

После добавления и установки сервиса Kafka в состав кластера ADS включение Log Cleaner выполняется на странице конфигурирования сервиса Kafka в интерфейсе ADCM. Достаточно перевести в активное состояние переключатель Log Cleaner, сохранить изменения (нажав Save) и перезагрузить сервис при помощи действия Restart.

Настройка кластера

Настройку сжатия для всего кластера, отдельных хостов (брокеров Kafka) или групп хостов можно выполнить при помощи интерфейса ADCM на странице конфигурирования сервиса Kafka. Для этого необходимо раскрыть узлы Main и Log Cleaner в дереве конфигурационных настроек. Вторая группа становится доступна после активации соответствующего переключателя.

Настройка сжатия для кластера
Настройка сжатия для кластера

Ниже приведены доступные в ADCM настраиваемые параметры очищения журнала со ссылкой на описание использования параметра:

После изменения параметров при помощи интерфейса ADCM перезагрузите сервис Kafka. Для этого примените действие Restart, нажав на иконку actions default dark actions default light в столбце Actions.

Также необходимый параметр брокера может быть установлен в конфигурационном файле /usr/lib/kafka/config/server.properties.

Настройка топика

Настройка отдельного топика выполняется в командной строке. Для этого используется скрипт kafka-topics.sh c применением опции --config, которая переопределяет параметры конфигурации для созданного топика или устанавливает параметры конфигурации при создании нового топика.

Пример настройки сжатия журнала при создании топика при помощи командной строки
$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic new-topic1 --config cleanup.policy=delete --config retention.ms=10000 --config retention.bytes=128000 --bootstrap-server localhost:9092

Также существует возможность настройки сжатия журнала для отдельного топика при помощи пользовательского интерфейса CMAK, который становится доступен после добавления и установки сервиса Kafka-Manager в кластере ADS.

Настройка может быть выполнена для созданного топика или при создании нового топика.

Пример настройки сжатия журналов для существующего топика при помощи сервиса Kafka-Manager

Выбрав TopicList в верхнем меню, перейдите к списку топиков. В списке выберите нужный топик, нажав на название. На открывшейся странице топика нажмите кнопку Update Config.

Переход к настройке топика в Kafka-Manager
Переход к настройке топика в Kafka-Manager
Переход к настройке топика в Kafka-Manager
Переход к настройке топика в Kafka-Manager

Установите необходимые значения параметров сжатия и нажмите кнопку Update Config.

Настройка топика в Kafka-Manager
Настройка топика в Kafka-Manager
Настройка топика в Kafka-Manager
Настройка топика в Kafka-Manager
ПРИМЕЧАНИЕ

Для получения информации об основных принципах работы в сервисах Kafka и Kafka-Manager можно обратиться к статье Начало работы c Kafka.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней