FileStream в ADS Control
В данной статье описывается настройка коннекторов FileStream при помощи ADS Control.
Коннекторы FileStream — инструменты сервиса Kafka Connect:
-
FileStreamSource Connector считывает данные из локального файла и записывает их в топик Kafka.
-
FileStreamSink Connector считывает данные из топика Kafka и выводит их в локальный файл.
ПРИМЕЧАНИЕ
Перед настройкой FileStream-коннекторов в кластере должен быть установлен и настроен сервис Kafka Connect. Kafka Connect доступен к установке в ADS начиная с версии 1.7.1.
|
Создание коннекторов FileStream
Для создания коннекторов при помощи ADS Control необходимо:
-
В пользовательском интерфейсе ADS Control выбрать нужный кластер ADS.
Выбор кластера в интерфейсе ADS ControlВыбор кластера в интерфейсе ADS Control -
На открывшейся странице в списке кластеров Kafka Connect выбрать кластер ADS.
Открытие страницы с коннекторами Kafka ConnectОткрытие страницы с коннекторами Kafka Connect -
На открывшейся странице кликнуть Create Connector.
Создание коннектора Kafka ConnectСоздание коннектора Kafka Connect -
Выбрать нужный коннектор для создания.
Выбор коннектора Kafka Connect для созданияВыбор коннектора Kafka Connect для создания -
Заполнить параметры конфигурации коннектора. При необходимости воспользоваться информацией о конфигурации сервиса Kafka Connect в статье Конфигурационные параметры ADS. Вы можете использовать заполнение конфигурации в виде файла JSON. Для этого включите соответствующий переключатель.
Конфигурация коннектораКонфигурация коннектораJSON-файл конфигурации коннектораJSON-файл конфигурации коннектораПример содержимого JSON-файла для простой конфигурации FileStreamSinkConnector{ "name": "FileSink", "config.action.reload": "restart", "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "errors.deadletterqueue.context.headers.enable": "false", "errors.deadletterqueue.topic.name": null, "errors.deadletterqueue.topic.replication.factor": "3", "errors.log.enable": "false", "errors.log.include.messages": "false", "errors.retry.delay.max.ms": "60000", "errors.retry.timeout": "0", "errors.tolerance": "none", "file": "/tmp/test.txt", "header.converter": null, "key.converter": null, "predicates": null, "tasks.max": "1", "topics": "test_topic_name", "topics.regex": null, "transforms": null, "value.converter": null }
Атрибут Описание name
Уникальное имя коннектора
config.action.reload
Действие, которое Kafka Connect должен выполнять над коннектором, когда изменения во внешних поставщиках конфигурации приводят к изменению свойств конфигурации коннектора:
-
none
— обновление коннектора не требуется; -
restart
— коннектор будет перезагружен с обновленными свойствами конфигурации.
connector.class
Имя класса для коннектора. Должен быть подклассом
org.apache.kafka.connect.connector.Connector
errors.deadletterqueue.context.headers.enable
Значение устанавливается в
true
, если необходимо добавлять заголовки, содержащие контекст ошибки, к сообщениям, записываемым в очередь недоставленных сообщений (Dead Letter Queue, DLQ). Чтобы избежать конфликтов с заголовками из исходной записи, все ключи заголовка контекста ошибки будут начинаться с__connect.errors
errors.deadletterqueue.topic.name
Имя раздела, которое будет использоваться в качестве очереди недоставленных сообщений (Dead Letter Queue, DLQ) для сообщений, которые приводят к ошибке при обработке этим коннектором или его преобразователями
errors.deadletterqueue.topic.replication.factor
Фактор репликации, используемый для создания топика очереди недоставленных сообщений
errors.log.enable
Значения параметра:
-
true
— каждая ошибка и сведения о неудачной операции и проблемной записи фиксируются в журнале приложения Connect; -
false
— фиксируются только те ошибки, которые недопустимы.
errors.log.include.messages
Значения параметра:
-
true
— в логе фиксируется запись подключения, которая привела к сбою. Для данного коннектора будут регистрироваться топик, партиция, смещение и отметка времени. Для исходных записей будут регистрироваться ключ и значение (и их схемы), все заголовки и отметка времени, топик Kafka, партиция Kafka, исходная партиция и исходное смещение; -
false
— ключи записей, значений и заголовков в логе не фиксируются.
errors.retry.delay.max.ms
Максимальная продолжительность между последовательными повторными попытками запросов (в миллисекундах)
errors.retry.timeout
Максимальная продолжительность, в течение которой неудачная операция будет повторяться (в миллисекундах). Значения параметра:
-
0
— повторных попыток не будет; -
-1
— количество попыток бесконечно.
errors.tolerance
Поведение для допустимых ошибок во время работы коннектора. Значения параметра:
-
none
— любая ошибка приведет к немедленному сбою задачи коннектора; -
all
— изменяет поведение, чтобы пропускать проблемные записи.
file
Файл, в котором создаются сообщения при добавлении в указанный топик
header.converter
Класс
HeaderConverter
, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Управляет форматом значений заголовков в сообщениях, записываемых или считываемых из Kafka, и, являясь независимым от коннекторов, это позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro. По умолчаниюSimpleHeaderConverter
используется для сериализации значений заголовков в строки и их десериализации путем вывода схемkey.converter
Класс преобразователя, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Он управляет форматом ключей в сообщениях, записываемых в Kafka или считываемых из него, и, поскольку он не зависит от коннекторов, позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro
predicates
Псевдонимы для предикатов, используемых преобразователями
tasks.max
Максимальное количество задач, которые можно использовать для коннектора
topics
Список топиков, разделенных запятыми. Когда сообщения добавляются в эти топики, они автоматически создаются в указанном локальном файле
topics.regex
Регулярное выражение, задающее имена топиков для потребления. Регулярное выражение компилируется в
java.util.regex.Pattern
transforms
Псевдонимы для преобразователей, применяемых к записям
value.converter
Класс преобразователя, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Он управляет форматом значений в сообщениях, записываемых в Kafka или считываемых из него, и, поскольку он не зависит от коннектора, позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro
Пример содержимого JSON-файла для простой конфигурации FileStreamSourceConnector{ "name": "FileSource", "batch.size": "2000", "config.action.reload": "restart", "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "errors.log.enable": "false", "errors.log.include.messages": "false", "errors.retry.delay.max.ms": "60000", "errors.retry.timeout": "0", "errors.tolerance": "none", "file": "/tmp/test.txt", "header.converter": null, "key.converter": null, "predicates": null, "tasks.max": "1", "topic": "test", "topic.creation.groups": null, "transforms": null, "value.converter": null }
Атрибут Описание name
Уникальное имя коннектора
batch.size
Допустимый размер батча
config.action.reload
Действие, которое Kafka Connect должен выполнять над коннектором, когда изменения во внешних поставщиках конфигурации приводят к изменению свойств конфигурации коннектора:
-
none
— обновление коннектора не требуется; -
restart
— коннектор будет перезагружен с обновленными свойствами конфигурации.
connector.class
Имя класса для коннектора. Должен быть подклассом
org.apache.kafka.connect.connector.Connector
errors.log.enable
Значения параметра:
-
true
— каждая ошибка и сведения о неудачной операции и проблемной записи фиксируются в журнале приложения Connect; -
false
— фиксируются только те ошибки, которые недопустимы.
errors.log.include.messages
Значения параметра:
-
true
— в логе фиксируется запись подключения, которая привела к сбою. Для данного коннектора будут регистрироваться топик, партиция, смещение и отметка времени. Для исходных записей будут регистрироваться ключ и значение (и их схемы), все заголовки и отметка времени, топик Kafka, партиция Kafka, исходная партиция и исходное смещение; -
false
— ключи записей, значений и заголовков в логе не фиксируются.
errors.retry.delay.max.ms
Максимальная продолжительность между последовательными повторными попытками запросов (в миллисекундах)
errors.retry.timeout
Максимальная продолжительность, в течение которой неудачная операция будет повторяться (в миллисекундах). Значения параметра:
-
0
— повторных попыток не будет; -
-1
— количество попыток бесконечно.
errors.tolerance
Поведение для допустимых ошибок во время работы коннектора. Значения параметра:
-
none
— любая ошибка приведет к немедленному сбою задачи коннектора; -
all
— изменяет поведение, чтобы пропускать проблемные записи.
file
Файл, из которого сообщения передаются в указанный топик
header.converter
Класс
HeaderConverter
, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Управляет форматом значений заголовков в сообщениях, записываемых или считываемых из Kafka, и, являясь независимым от коннекторов, это позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro. По умолчаниюSimpleHeaderConverter
используется для сериализации значений заголовков в строки и их десериализации путем вывода схемkey.converter
Класс преобразователя, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Он управляет форматом ключей в сообщениях, записываемых в Kafka или считываемых из него, и, поскольку он не зависит от коннекторов, позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro
predicates
Псевдонимы для предикатов, используемых преобразователями
tasks.max
Максимальное количество задач, которые можно использовать для коннектора
topics
Список топиков, разделенных запятыми. Когда сообщения добавляются в эти топики, они автоматически создаются в указанном локальном файле
topic.creation.groups
Группа по умолчанию всегда определяется для конфигураций темы. Значения этого свойства относятся к дополнительным группам
transforms
Псевдонимы для преобразователей, применяемых к записям
value.converter
Класс преобразователя, используемый для преобразования между форматом Kafka Connect и сериализованной формой, которая записывается в Kafka. Он управляет форматом значений в сообщениях, записываемых в Kafka или считываемых из него, и, поскольку он не зависит от коннектора, позволяет любому коннектору работать с любым форматом сериализации. Примеры распространенных форматов включают JSON и Avro
-
-
После заполнения кликнуть Save и получить сообщение об успешном создании коннектора.
Сообщение об успешном создании коннектораСообщение об успешном создании коннектора -
Проверить, что на странице Connectors for ads отображаются созданные коннекторы в рабочем статусе. Статус определяется в зависимости от индикатора перед названием коннектора:
-
— коннектор/задача запущены;
-
— коннектор/задача были административно приостановлены;
-
— ошибка в работе коннектора/задачи;
-
— коннектор/задача еще не назначена процессу.
Созданные коннекторыСозданные коннекторы -
После создания коннекторов появляется возможность записи данных из локального файла в топик Kafka и наоборот в зависимости от типа коннектора.