Потребители данных

Потребитель платфоры ADS передает выборки запросов брокерам-лидерам необходимых ему партиций и задает свое смещение в журнале. Таким образом, потребитель имеет контроль над своей позицией в журнале и при необходимости может повторно считывать данные.

Изначально вопрос заключался в том, должны ли потребители получать данные от брокеров или брокеры должны передавать данные потребителю. В этом отношении ADS следует более традиционному способу, разделяемому большинством систем обмена сообщениями, где данные отправляются в брокер поставщиком и получаются из брокера потребителем. Некоторые системы, ориентированные на ведение журнала, такие как Scribe и Apache Flume, следуют совершенно другому принципу – модели “push”. У обоих подходов есть плюсы и минусы. Система push сталкивается с трудностями при работе с несколькими потребителями, поскольку брокер контролирует скорость передачи данных. А цель, как правило, заключается в том, чтобы потребитель считывал данные с максимально возможной скоростью; к сожалению, в системе push потребитель имеет тенденцию к перегрузке, когда его уровень потребления падает ниже уровня поставки данных. Система “pull” имеет лучшие свойства, при которых потребитель просто “отстает и догоняет”. При этом можно использовать протокол, в котором потребитель указывает свою перегрузку, но получение повышенной скорости передачи данных для потребителя сложнее, чем кажется. Предыдущие попытки построения систем привели ADS к использованию традиционной модели pull.

Другим преимуществом системы pull является то, что она поддается агрессивной пакетной обработке данных, отправляемых потребителю. В то время как система push должна либо немедленно отправить запрос, либо накопить больше данных, а затем отправить их, не зная, сможет ли нижестоящий потребитель немедленно их обработать. А при настроенной низкой задержке оправка сообщений осуществляется по одному за раз и в любом случае буферизуется, что является расточительным. В модели pull потребитель всегда получает все доступные ему данные в зависимости от его текущего положения в журнале (или до установленного максимального размера). Таким образом, можно получить оптимальное дозирование без лишней задержки.

Недостаток системы pull заключается в том, что если у брокера нет данных, потребитель может выполнять запрос в жестком цикле, фактически находясь в режиме ожидания поступления данных. Во избежание этого в ADS есть параметры запроса pull, которые позволяют заявке потребителя блокировать “длинный опрос” (“long poll”) до поступления данных (и при необходимости до тех пор, пока не будет достигнуто заданное количество байтов).

Позиция потребителя

Отслеживание считываемых данных, на удивление, является одной из ключевых точек производительности системы обмена сообщениями.

Большинство систем обмена сообщениями содержат метаданные о том, какие сообщения были считаны в брокер. То есть, при передаче сообщения потребителю брокер либо немедленно записывает его локально, либо ожидает подтверждения от потребителя. Это довольно интуитивный выбор, и действительно, серверу неясно, куда еще может потребоваться заявка. Поскольку структуры данных, используемые для хранения во многих системах обмена сообщениями, плохо масштабируются, это прагматичный выбор – поскольку брокер знает, какие данные считаны, и он может тут же удалить их, сохраняя размер данных небольшим.

Что, возможно, не очевидно, так это то, что заставить брокера и потребителя прийти к соглашению о считанных данных не является тривиальной задачей. Если брокер каждый раз немедленно записывает сообщение как считанное при его передаче по сети, то если потребитель не обрабатывает сообщение (например, по причине сбоя или времени ожидания запроса), сообщение теряется. Для решения этой проблемы многие системы обмена сообщениями добавляют функцию подтверждения, которая помечает сообщения как отправленные, но не считанные, пока брокер не дождется подтверждения от потребителя, чтобы записать сообщение как считанное. Такая стратегия устраняет проблему потери данных, но создает новые проблемы. Прежде всего, если потребитель обрабатывает сообщение, но не может отправить подтверждение, сообщение считывается дважды. Вторая проблема связана с производительностью – теперь брокер должен хранить несколько состояний о каждом сообщении (сначала заблокировать, чтобы не было повторного считывания, затем отметить как явно считанное, чтобы можно было удалить). Подобные замысловатые проблемы необходимо решать, как, например, что делать с сообщениями, которые отправляются, но никогда не подтверждаются.

ADS справляется с этим по-другому. Топик разделяется на набор полностью упорядоченных партиций, каждая из которых потребляется в любой момент времени ровно одним потребителем из подписавшейся группы потребителей. Это означает, что позиция потребителя в каждой партиции, то есть смещение от следующего сообщения – это всего лишь целое число. Это говорит о том, что состояние о считывании данных очень малоемкое – всего лишь одно число для каждой партиции. Текущее состояние можно проверять. Все это делает подобный эквивалент функции подтверждения о считывании сообщений очень дешевым.

К тому же данное решение имеет побочное преимущество – потребитель имеет возможность намеренно вернуть назад свое смещение и повторно считать данные. Это нарушает общий порядок очереди, но при этом является существенным плюсом для многих потребителей.

Автономная загрузка данных

Масштабируемая персистентность позволяет потребителям, которые только периодически считывают пакетные данные, время от времени загружать данные в автономную систему, такую как, например, Hadoop или реляционное хранилище данных.

В случае Hadoop нагрузка данных распараллеливается на отдельные задачи, по одной для каждой комбинации узел/топик/партиция. Hadoop обеспечивает управление задачами, а задачи, потерпевшие неудачу, могут перезапускаться без риска дублирования данных – они просто возобновляются с исходной позиции.

Отслеживание смещений потребителя

Потребитель высокого уровня отслеживает свое максимальное смещение при считывании данных в каждой партиции и периодически фиксирует вектор смещения для возможности возобновления работы с необходимых позиций. ADS предоставляет возможность хранения всех смещений группы потребителей в определенном брокере (для каждой группы) – в менеджере смещений, то есть любой инстанс потребителя группы должен передавать свои смещения и запрашивать выборки через менеджер смещения (брокер). У потребителей высокого уровня эта операция выполняется автоматически, в то время как простой потребитель управляет своими смещениями вручную, так как в настоящее время автоматическая передача смещений простых потребителей не поддерживается Java, им доступно только ручное фиксирование или извлечение смещений в ZooKeeper. При использовании Scala простой потребитель может явно зафиксировать или получить смещения в менеджере.

Поиск менеджера смещения осуществляется по запросу GroupCoordinatorRequest в любой брокер ADS с последующим ответом GroupCoordinatorResponse, в котором содержится менеджер смещения. После чего потребитель может перейти к фиксации или извлечению смещений. В случае перемещения менеджера смещений потребителю необходимо повторно выполнить его поиск.

При получении менеджером смещения запроса на фиксацию OffsetCommitRequest, он добавляет смещение в специальный сжатый топик ADS с именем __consumer_offsets. Менеджер смещения отправляет успешный ответ фиксации смещения потребителю только после получения смещений всеми репликами топика. В случае если смещения не могут реплицироваться в пределах настренного времени ожидания, фиксация смещения завершается неудачей, и потребитель может повторить ее после отмены действия (у потребителей высокого уровня процедура выполняется автоматически). Брокеры периодически сжимают топик смещения, так как ему требуется поддерживать только последнее смещение на партицию. Менеджер смещения также упорядоченно кэширует все смещения в таблице in-memory для возможности их быстрой обработки.

Когда менеджер смещения получает запрос на извлечение смещения, он просто возвращает последний зафиксированный вектор смещения из кэша. В случае если менеджер был только что запущен или стал менеджером смещения для нового набора групп потребителей (став лидером для партиции топика смещения), может потребоваться загрузка партиции топика смещений в кэш. В это время операции по извлечению смещения завершаются ошибкой OffsetsLoadInProgress, и потребитель может повторить запрос OffsetFetchRequest после окончания копирования (у потребителей высокого уровня процедура выполняется автоматически).

Миграция смещений из ZooKeeper в ADS

Для миграции потребителей и смещений из ZooKeeper в ADS необходимо выполнить следующие действия:

  1. Установить offsets.storage=ads и dual.commit.enabled=true в настройках потребителя.
  2. Выполнить резкий переход к потребителям и убедиться в их исправности.
  3. Установить dual.commit.enabled=false в настройках потребителя.
  4. Выполнить резкий переход к потребителям и убедиться в их исправности.

Обратный переход (с ADS в ZooKeeper) также может выполняться с помощью вышеуказанных шагов при установке offsets.storage=zookeeper.