FileStream в ADS Control

В данной статье описывается настройка коннекторов FileStream при помощи ADS Control.

Коннекторы FileStream — инструменты сервиса Kafka Connect:

  • FileStreamSource Connector считывает данные из локального файла и записывает их в топик Kafka.

  • FileStreamSink Connector считывает данные из топика Kafka и выводит их в локальный файл.

ПРИМЕЧАНИЕ
Перед настройкой FileStream-коннекторов в кластере должен быть установлен и настроен сервис Kafka Connect. Kafka Connect доступен к установке в ADS начиная с версии 1.7.1.

Создание коннекторов FileStream

Для создания коннекторов при помощи ADS Control необходимо:

  1. В пользовательском интерфейсе ADS Control выбрать нужный кластер ADS.

    Выбор кластера в интерфейсе ADS Control
    Выбор кластера в интерфейсе ADS Control
    Выбор кластера в интерфейсе ADS Control
    Выбор кластера в интерфейсе ADS Control
  2. На открывшейся странице в списке кластеров Kafka Connect выбрать кластер ADS.

    Открытие страницы с коннекторами Kafka Connect
    Открытие страницы с коннекторами Kafka Connect
    Открытие страницы с коннекторами Kafka Connect
    Открытие страницы с коннекторами Kafka Connect
  3. На открывшейся странице кликнуть Create Connector.

    Создание коннектора Kafka Connect
    Создание коннектора Kafka Connect
    Создание коннектора Kafka Connect
    Создание коннектора Kafka Connect
  4. Выбрать нужный коннектор для создания.

    Выбор коннектора Kafka Connect для создания
    Выбор коннектора Kafka Connect для создания
    Выбор коннектора Kafka Connect для создания
    Выбор коннектора Kafka Connect для создания
  5. Заполнить параметры конфигурации коннектора. При необходимости воспользоваться информацией о конфигурации сервиса Kafka Connect в статье Конфигурационные параметры ADS. Вы можете использовать заполнение конфигурации в виде файла JSON. Для этого включите соответствующий переключатель.

    Конфигурация коннектора
    Конфигурация коннектора
    Конфигурация коннектора
    Конфигурация коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    Пример содержимого JSON-файла для простой конфигурации FileStreamSinkConnector
    {
      "name": "FileSink",
      "config.action.reload": "restart",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
      "errors.deadletterqueue.context.headers.enable": "false",
      "errors.deadletterqueue.topic.name": null,
      "errors.deadletterqueue.topic.replication.factor": "3",
      "errors.log.enable": "false",
      "errors.log.include.messages": "false",
      "errors.retry.delay.max.ms": "60000",
      "errors.retry.timeout": "0",
      "errors.tolerance": "none",
      "file": "/tmp/test.txt",
      "header.converter": null,
      "key.converter": null,
      "predicates": null,
      "tasks.max": "1",
      "topics": "test_topic_name",
      "topics.regex": null,
      "transforms": null,
      "value.converter": null
    }
    Атрибут Описание

    name

    Уникальное имя коннектора

    config.action.reload

    Действие, которое Kafka Connect должен выполнять над коннектором, когда изменения во внешних поставщиках конфигурации приводят к изменению свойств конфигурации коннектора:

    • none — обновление коннектора не требуется;

    • restart — коннектор будет перезагружен с обновленными свойствами конфигурации.

    connector.class

    Имя класса для коннектора. Должен быть подклассом org.apache.kafka.connect.connector.Connector

    errors.deadletterqueue.context.headers.enable

    Значение устанавливается в true, если необходимо добавлять заголовки, содержащие контекст ошибки, к сообщениям, записываемым в очередь недоставленных сообщений (Dead Letter Queue, DLQ). Чтобы избежать конфликтов с заголовками из исходной записи, все ключи заголовка контекста ошибки будут начинаться с __connect.errors

    errors.deadletterqueue.topic.name

    Имя раздела, которое будет использоваться в качестве очереди недоставленных сообщений (Dead Letter Queue, DLQ) для сообщений, которые приводят к ошибке при обработке этим коннектором или его преобразователями

    errors.deadletterqueue.topic.replication.factor

    Фактор репликации, используемый для создания топика очереди недоставленных сообщений

    errors.log.enable

    Значения параметра:

    • true — каждая ошибка и сведения о неудачной операции и проблемной записи фиксируются в журнале приложения Connect;

    • false — фиксируются только те ошибки, которые недопустимы.

    errors.log.include.messages

    Значения параметра:

    • true — в логе фиксируется запись подключения, которая привела к сбою. Для данного коннектора будут регистрироваться топик, партиция, смещение и отметка времени. Для исходных записей будут регистрироваться ключ и значение (и их схемы), все заголовки и отметка времени, топик Kafka, партиция Kafka, исходная партиция и исходное смещение;

    • false — ключи записей, значений и заголовков в логе не фиксируются.

    errors.retry.delay.max.ms

    Максимальная продолжительность между последовательными повторными попытками запросов (в миллисекундах)

    errors.retry.timeout

    Максимальная продолжительность, в течение которой неудачная операция будет повторяться (в миллисекундах). Значения параметра:

    • 0 — повторных попыток не будет;

    • -1 — количество попыток бесконечно.

    errors.tolerance

    Поведение для допустимых ошибок во время работы коннектора. Значения параметра:

    • none — любая ошибка приведет к немедленному сбою задачи коннектора;

    • all — изменяет поведение, чтобы пропускать проблемные записи.

    file

    Файл, в котором создаются сообщения при добавлении в указанный топик

    header.converter

    Класс HeaderConverter, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Управляет форматом значений заголовков в сообщениях, записываемых или считываемых из Kafka, и, являясь независимым от коннекторов, это позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro. По умолчанию SimpleHeaderConverter используется для сериализации значений заголовков в строки и их десериализации путем вывода схем

    key.converter

    Класс преобразователя, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Он управляет форматом ключей в сообщениях, записываемых в Kafka или считываемых из него, и, поскольку он не зависит от коннекторов, позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro

    predicates

    Псевдонимы для предикатов, используемых преобразователями

    tasks.max

    Максимальное количество задач, которые можно использовать для коннектора

    topics

    Список топиков, разделенных запятыми. Когда сообщения добавляются в эти топики, они автоматически создаются в указанном локальном файле

    topics.regex

    Регулярное выражение, задающее имена топиков для потребления. Регулярное выражение компилируется в java.util.regex.Pattern

    transforms

    Псевдонимы для преобразователей, применяемых к записям

    value.converter

    Класс преобразователя, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Он управляет форматом значений в сообщениях, записываемых в Kafka или считываемых из него, и, поскольку он не зависит от коннектора, позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro

    Пример содержимого JSON-файла для простой конфигурации FileStreamSourceConnector
    {
      "name": "FileSource",
      "batch.size": "2000",
      "config.action.reload": "restart",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
      "errors.log.enable": "false",
      "errors.log.include.messages": "false",
      "errors.retry.delay.max.ms": "60000",
      "errors.retry.timeout": "0",
      "errors.tolerance": "none",
      "file": "/tmp/test.txt",
      "header.converter": null,
      "key.converter": null,
      "predicates": null,
      "tasks.max": "1",
      "topic": "test",
      "topic.creation.groups": null,
      "transforms": null,
      "value.converter": null
    }
    Атрибут Описание

    name

    Уникальное имя коннектора

    batch.size

    Допустимый размер батча

    config.action.reload

    Действие, которое Kafka Connect должен выполнять над коннектором, когда изменения во внешних поставщиках конфигурации приводят к изменению свойств конфигурации коннектора:

    • none — обновление коннектора не требуется;

    • restart — коннектор будет перезагружен с обновленными свойствами конфигурации.

    connector.class

    Имя класса для коннектора. Должен быть подклассом org.apache.kafka.connect.connector.Connector

    errors.log.enable

    Значения параметра:

    • true — каждая ошибка и сведения о неудачной операции и проблемной записи фиксируются в журнале приложения Connect;

    • false — фиксируются только те ошибки, которые недопустимы.

    errors.log.include.messages

    Значения параметра:

    • true — в логе фиксируется запись подключения, которая привела к сбою. Для данного коннектора будут регистрироваться топик, партиция, смещение и отметка времени. Для исходных записей будут регистрироваться ключ и значение (и их схемы), все заголовки и отметка времени, топик Kafka, партиция Kafka, исходная партиция и исходное смещение;

    • false — ключи записей, значений и заголовков в логе не фиксируются.

    errors.retry.delay.max.ms

    Максимальная продолжительность между последовательными повторными попытками запросов (в миллисекундах)

    errors.retry.timeout

    Максимальная продолжительность, в течение которой неудачная операция будет повторяться (в миллисекундах). Значения параметра:

    • 0 — повторных попыток не будет;

    • -1 — количество попыток бесконечно.

    errors.tolerance

    Поведение для допустимых ошибок во время работы коннектора. Значения параметра:

    • none — любая ошибка приведет к немедленному сбою задачи коннектора;

    • all — изменяет поведение, чтобы пропускать проблемные записи.

    file

    Файл, из которого сообщения передаются в указанный топик

    header.converter

    Класс HeaderConverter, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Управляет форматом значений заголовков в сообщениях, записываемых или считываемых из Kafka, и, являясь независимым от коннекторов, это позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro. По умолчанию SimpleHeaderConverter используется для сериализации значений заголовков в строки и их десериализации путем вывода схем

    key.converter

    Класс преобразователя, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Он управляет форматом ключей в сообщениях, записываемых в Kafka или считываемых из него, и, поскольку он не зависит от коннекторов, позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro

    predicates

    Псевдонимы для предикатов, используемых преобразователями

    tasks.max

    Максимальное количество задач, которые можно использовать для коннектора

    topics

    Список топиков, разделенных запятыми. Когда сообщения добавляются в эти топики, они автоматически создаются в указанном локальном файле

    topic.creation.groups

    Группа по умолчанию всегда определяется для конфигураций темы. Значения этого свойства относятся к дополнительным группам

    transforms

    Псевдонимы для преобразователей, применяемых к записям

    value.converter

    Класс преобразователя, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Он управляет форматом значений в сообщениях, записываемых в Kafka или считываемых из него, и, поскольку он не зависит от коннектора, позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro

  6. После заполнения кликнуть Save и получить сообщение об успешном создании коннектора.

    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
  7. Проверить, что на странице Connectors for ads отображаются созданные коннекторы в рабочем статусе. Статус определяется в зависимости от индикатора перед названием коннектора:

    • green — коннектор/задача запущены;

    • yellow — коннектор/задача были административно приостановлены;

    • red — ошибка в работе коннектора/задачи;

    • unassigned — коннектор/задача еще не назначена процессу.

    Созданные коннекторы
    Созданные коннекторы
    Созданные коннекторы
    Созданные коннекторы

После создания коннекторов появляется возможность записи данных из локального файла в топик Kafka и наоборот в зависимости от типа коннектора.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней