Kafka Java Producer

Платформа ADS включает в себя Java producer, поставляемый вместе с Kafka.

В документе представлен общий обзор работы поставщика, введение в параметры конфигурации для настройки и примеры из каждой клиентской библиотеки.

Концепция

Поставщик Kafka концептуально намного проще, чем потребитель, так как у него нет необходимости в групповой координации. Его основная функция состоит в том, чтобы сопоставить каждое сообщение с партицией топика и отправить запрос лидеру соответствующей партиции. Первое выполняется с помощью partitioner – механизма, выбирающего партицию посредством хэш-функции, и гарантирующего, что все сообщения с одинаковым (непустым) ключом отправляются в одну и ту же партицию. Если ключ не указан, то партиция определяется циклически с целью обеспечения равномерного распределения по партициям топика.

В каждой партиции кластера Kafka есть лидер и набор реплик среди брокеров – все записи проходят через лидера партиции, а реплики синхронизируются посредством выборки из него. Когда лидер отключается или выходит из строя, следующий лидер выбирается из числа синхронизированных реплик. В зависимости от того, как настроен поставщик, каждый запрос лидеру партиции может удерживаться до тех пор, пока реплики не одобрят запись. Это дает поставщику некий контроль над эксплуатацией сообщения при условии некоторой стоимости общей пропускной способности.

Сообщения, направленные лидеру партиции, не могут сразу считываться потребителями независимо от настроек подтверждения поставщика. Только когда все синхронизированные реплики подтверждают запись, сообщение считается зафиксированным, что делает его доступным для чтения. Такой подход гарантирует, что уже прочитанные сообщения не могут быть потеряны по причине сбоя брокера. Но это также подразумевает, что сообщения, подтвержденые только лидером (то есть acks=1), могут быть потеряны в случае, если лидер партиции терпит неудачу до момента копирования сообщений репликами. Тем не менее, в большинстве случаев на практике такой способ часто является разумным компромиссом для обеспечения жизнеспособности сообщений (durability), при этом не оказывая существенного влияния на пропускную способность.

Большая часть тонкостей вокруг поставщиков связана с достижением высокой пропускной способности с учетом пакетирования/сжатия и обеспечением гарантий доставки сообщений. Далее приведены наиболее распространенные параметры настройки поведения поставщиков.

Конфигурация

Полный список параметров конфигурации доступен в документе Настройки платформы Arenadata Streaming. Но некоторые из ключевых параметров и их влияние на поведение поставщиков описаны в текущей главе.

Базовая конфигурация (Core Configuration)

Для того, чтобы поставщик мог найти кластер Kafka, необходимо установить свойство bootstrap.servers. Так же, хотя это и не требуется обязательно, но всегда следует устанавливать client.id, поскольку это позволяет легко сопоставлять запросы в брокере со сделавшим их инстансом клиента. Данные настройки одинаковы для клиентов Java, C/C++, Python, Go и .NET.

Жизнеспособность сообщений (Message Durability)

Жизнеспособность сообщений, записанных в Kafka, можно контролировать с помощью параметра acks. Значение по умолчанию 1 требует от лидера партиции явного подтверждения об успешно выполненной записи. Самая сильная гарантия, которую предоставляет Kafkaacks=all – сообщение не только допущено к записи лидером партиции, но и успешно скопировано на все синхронизированные реплики. Можно также использовать значение 0 для максимизации пропускной способности, но тогда отсутствует гарантия успешной записи сообщения в журнал брокера, так как в этом случае брокер не отправляет ответ, что также означает невозможность определения смещение сообщения. Для клиентов C/C++, Python, Go и .NET это является конфигурацией для каждого отдельного топика, но ее можно применять глобально с помощью вложенной конфигурации default_topic_conf в C/C++ и default.topic.config в Python, Go и .NET.

Порядок сообщений (Message Ordering)

Как правило, сообщения записываются в брокер в том же порядке, в котором они поступают от клиента поставщика. Однако если разрешить повторные попытки сообщений, установив для них значение больше 0 (0 – значение по умолчанию), может измениться их порядок, так как повтор возможен только после свершения успешной записи. Чтобы избежать переупорядочения, можно установить параметр max.in.flight.requests.per.connection в значение 1, тогда брокеру одновременно может быть отправлен только один запрос. В случае без подключения повторных попыток сообщений брокер сохраняет порядок получаемых записей, но при этом могут быть пробелы из-за отдельных сбоев отправки.

Пакетирование и сжатие (Batching/Compression)

Поставщики Kafka пакетируют отправляемые сообщения для повышения пропускной способности. С клиентом Java можно управлять максимальным размером каждого пакета сообщений в параметре batch.size. Для большего времени на заполнение пакетов доступен параметр linger.ms, на значение которого поставщик задерживает отправку. Установкой compression.type включается сжатие, оно охватывает полные пакеты сообщений, поэтому большие пакеты обычно означают более высокую степень сжатия.

С клиентами C/C++, Python, Go и .NET для установки ограничения на количество сообщений в пакете используется batch.num.messages, сжатие включается с помощью compression.codec.

Ограничения очереди (Queuing Limits)

Ограничение общего объема доступной Java-клиенту памяти для сбора неотправленных сообщений контролируется параметром buffer.memory. При достижении установленного предела поставщик блокирует последующий набор записей до тех пор, пока max.block.ms не выводит исключение. Кроме того, чтобы избежать бесконечного хранения сообщений, можно установить тайм-аут в request.timeout.ms. Если это время ожидания истекает до того, как сообщение может быть успешно отправлено, то оно удаляется из очереди и генерируется исключение.

Клиенты C/C++, Python, Go и .NET имеют аналогичные настройки. Параметр queue.buffering.max.messages – для ограничения общего количества сообщений, поставляемых в очередь в любой момент времени (для отчетов о передаче, повторных попытках или доставке). И параметр queue.buffering.max.ms – для ограничения периода времени ожидания клиентом заполнения пакета перед отправкой брокеру.

Примеры

Начальная настройка

Поставщик Java создается с помощью стандартного файла свойств Properties:

Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", "host1:9092,host2:9092");
config.put("acks", "all");
new KafkProducer<K, V>(config);

Ошибки конфигурации приводят к появлению KafkaException от конструктора KafkaProducer. Основное отличие librdkafka заключается в том, что она обрабатывает ошибки для каждого параметра напрямую:

char hostname[128];
char errstr[512];

rd_kafka_conf_t *conf = rd_kafka_conf_new();

if (gethostname(hostname, sizeof(hostname))) {
  fprintf(stderr, "%% Failed to lookup hostname\n");
  exit(1);
}

if (rd_kafka_conf_set(conf, "client.id", hostname,
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();

if (rd_kafka_topic_conf_set(topic_conf, "acks", "all",
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

/* Create Kafka producer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                        errstr, sizeof(errstr)))) {
  fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
  exit(1);
}

В Python:

from confluent_kafka import Producer
import socket

conf = {'bootstrap.servers': "host1:9092,host2:9092",
        'client.id': socket.gethostname(),
        'default.topic.config': {'acks': 'all'}}

producer = Producer(conf)

В Go:

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

p, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "host1:9092,host2:9092",
    "client.id": socket.gethostname(),
    "default.topic.config": kafka.ConfigMap{'acks': 'all'}
})

if err != nil {
    fmt.Printf("Failed to create producer: %s\n", err)
    os.Exit(1)
}

В C#:

using Confluent.Kafka;
using System.Net;

...

var config = new Dictionary<string, object>
{
    { "bootstrap.servers", "host1:9092,host2:9092" },
    { "client.id", Dns.GetHostName() },
    { "default.topic.config", new Dictionary<string, object>
        {
            { "acks", "all" }
        }
    }
}

using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
    ...
}

Асинхронные записи

Все записи являются асинхронными по умолчанию. Поставщик Java включает в себя API send(), возвращающий future, которое можно опрашивать для получения результата отправки:

final ProducerRecord<K, V> = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);

В librdkafka сначала необходимо создать дескриптор rd_kafka_topic_t для топика, в который планируется запись, а затем использовать rd_kafka_produce для отправки в него сообщений. Например:

rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, topic_conf);

if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA,
                     RD_KAFKA_MSG_F_COPY,
                     payload, payload_len,
                     key, key_len,
                     NULL) == -1) {
  fprintf(stderr, "%% Failed to produce to topic %s: %s\n",
       topic, rd_kafka_err2str(rd_kafka_errno2err(errno)));
}

Конкретную топику конфигурацию можно назначить третьему аргументу rd_kafka_topic_new – тогда необходимо передать topic_conf и добавить настройку для подтверждений. Значение NULL приводит к использованию поставщиком конфигурации по умолчанию.

Второй аргумент для rd_kafka_produce может использоваться для установки желаемой партиции для сообщения. При установленом значении RD_KAFKA_PARTITION_UA, как в данном примере, выбор партиции для сообщения осуществляется механизмом partitioner по умолчанию. Третий аргумент указывает, что librdkafka должна скопировать информацию и ключ, что позволяет освободить его по возвращении.

В Python отправка инициируется методом produce с передачей значения и по необходимости – ключа, партиции и обратного вызова. Запрос возвращается немедленно без значения:

producer.produce(topic, key="key", value="value")

Аналогично, в Go отправка инициируется методом Produce() с передачей объекта Message` object and an optional ``chan Event, применяемого для прослушивания результата отправки. Объект Message содержит непрозрачное поле interface{}, которое может использоваться для передачи произвольных данных вместе с сообщением последующему обработчику событий.

delivery_chan := make(chan kafka.Event, 10000)
err = p.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: "topic", Partition: kafka.PartitionAny},
    Value: []byte(value)},
    delivery_chan,
)

В C# отправка инициируется вызовом метода ProduceAsync в инстансе Producer. Например:

producer.ProduceAsync("topic", key, value);

При необходимости применения некоторого кода после завершения записи может быть предоставлен обратный вызов. В Java это реализовано как объект Callback:

final ProducerRecord<K, V> = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null)
      log.debug("Send failed for record {}", record, e);
  }
});

В реализации Java следует избегать дорогостоящей работы с обратным вызовом, поскольку он выполняется в потоке ввода-вывода поставщика.

Аналогичная функция доступна в librdkafka, но ее необходимо настраивать при инициализации:

static void on_delivery(rd_kafka_t *rk,
                        const rd_kafka_message_t *rkmessage
                        void *opaque) {
  if (rkmessage->err)
    fprintf(stderr, "%% Message delivery failed: %s\n",
            rd_kafka_message_errstr(rkmessage));
}

void init_rd_kafka() {
  rd_kafka_conf_t *conf = rd_kafka_conf_new();
  rd_kafka_conf_set_dr_msg_cb(conf, on_delivery);

  // initialization ommitted
}

Обратный вызов доставки (delivery callback) в librdkafka осуществляется в потоке пользователя путем вызова rd_kafka_poll. Распространенным шаблоном является вызов функции после каждого вызова API поставщика, но этого может быть недостаточно для обеспечения регулярных отчетов о доставке, если скорость создания сообщений не равномерна. Так же данный API не предоставляет прямого способа блокировки для получения результата доставки конкретного сообщения. При наличии такой необходимости рекомендуется рассмотреть пример синхронной записи (Синхронные записи).

В Python параметр callback можно передать с помощью любого вызываемого средства, например, лямбды, функции, связанного метода или вызываемого объекта. Хотя метод produce() сразу ставит сообщение в очередь для пакетной обработки, сжатия и передачи брокеру, он не свершает обработку каких-либо событий (то есть подтверждений и обратных вызовов, которые они инициируют) до вызова poll().

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err))
    else
        print("Message produced: %s" % (str(msg))

producer.produce(topic, key="key", value="value", callback=acked)

# Wait up to 1 second for events. Callbacks will be invoked during
# this method call if the message is acknowledged.
producer.poll(1)

В Go можно использовать канал отчета о доставке в Produce, чтобы дождаться результата отправки сообщения:

e := <-delivery_chan
m := e.(*kafka.Message)

if m.TopicPartition.Error != nil {
    fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
    fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
               *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}

close(delivery_chan)

В C# есть два варианта. Первый: можно использовать ProduceAsync, возвращающий стандартный объект Task, который выполняет await (приостановку выполнения метода до завершения выполнения ожидаемой задачи), обрабатывается с помощью метода .ContinueWith или ожидает использования методов .Wait или .WaitAll:

var deliveryReportTask = producer.ProduceAsync("topic", key, val);
deliveryReportTask.ContinueWith(task =>
{
    Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}");
});

Во втором варианте используется .ProduceAsync, который принимает реализацию IDeliveryHandler. Данный подход следует использовать при необходимости получения уведомлений о доставке сообщений (или сбое доставки) строго в порядке подтверждения брокером, поскольку Tasks могут выполняться в любом потоке, что не гарантирует их упорядоченность.

Синхронные записи

Чтобы сделать запись синхронной, следует дождаться возвращения future. Как правило, это плохая затея, так как она может существенно снизить пропускную способность, но в некоторых случаях может быть оправдана.

Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();

Аналогичная возможность может быть достигнута в C/C++ и Python с помощью обратного вызова доставки, но это более трудоемко. Полный пример приведен по ссылке. Клиент Python также содержит метод flush(), имеющий тот же эффект:

producer.produce(topic, key="key", value="value")
producer.flush()

В Go осуществляется через канал доставки, посредством вызова метода Produce():

delivery_chan := make(chan kafka.Event, 10000)
err = p.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: "topic", Partition: kafka.PartitionAny},
    Value: []byte(value)},
    delivery_chan
)

 e := <-delivery_chan
 m := e.(*kafka.Message)

Для ожидания подтверждения всех сообщений используется метод Flush():

p.Flush()

Важно обратить внимание, что Flush() обслуживает только канал Events() поставщика, а не каналы доставки, указанные приложением. Если Flush() вызывается, и при этом никакая горутина не обрабатывает канал доставки, то буфер может заполниться и привести к истечению времени ожидания.

В C# необходимо получить доступ к свойству .Result объекта Task, возвращенного из .ProduceAsync, которое будет блокироваться до тех пор, пока не станет доступен отчет о доставке:

var deliveryReport = producer.ProduceAsync("topic", key, value).Result;