FileStream in ADS Control

This article describes how to configure FileStream connectors using the ADS Control.

FileStream connectors are Kafka Connect service tools:

  • FileStreamSource Connector reads data from a local file and writes it to a Kafka topic.

  • FileStreamSink Connector reads data from a Kafka topic and outputs it to a local file.

NOTE
Before configuring FileStream connectors in the cluster, the Kafka Connect service must be installed and configured. Kafka Connect is available for installation in ADS starting from version 1.7.1.

Create FileStream connectors

To create connectors using the ADS Control, you need to:

  1. In the ADS Control user interface, select the required ADS cluster.

    Cluster selection in the ADS Control interface
    Cluster selection in the ADS Control interface
    Cluster selection in the ADS Control interface
    Cluster selection in the ADS Control interface
  2. On the page that opens, select a ADS cluster from the list of Kafka Connects.

    Opening the page with Kafka Connect connectors
    Opening the page with Kafka Connect connectors
    Opening the page with Kafka Connect connectors
    Opening the page with Kafka Connect connectors
  3. On the page that opens, click Create Connector.

    Creating a Kafka Connect Connector
    Creating a Kafka Connect Connector
    Creating a Kafka Connect Connector
    Creating a Kafka Connect Connector
  4. Select the desired connector to create.

    Selecting the Kafka Connect connector to create
    Selecting the Kafka Connect connector to create
    Selecting the Kafka Connect connector to create
    Selecting the Kafka Connect connector to create
  5. Fill in the connector configuration parameters. If necessary, refer to the Kafka Connect service configuration information in ADS configuration parameters. You can use populating the config as a JSON file. To do this, turn on the corresponding switch.

    Connector configuration
    Connector configuration
    Connector configuration
    Connector configuration
    Connector configuration JSON file
    Connector configuration JSON file
    Connector configuration JSON file
    Connector configuration JSON file
    Sample JSON file content for simple FileStreamSinkConnector configuration
    {
      "name": "FileSink",
      "config.action.reload": "restart",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
      "errors.deadletterqueue.context.headers.enable": "false",
      "errors.deadletterqueue.topic.name": null,
      "errors.deadletterqueue.topic.replication.factor": "3",
      "errors.log.enable": "false",
      "errors.log.include.messages": "false",
      "errors.retry.delay.max.ms": "60000",
      "errors.retry.timeout": "0",
      "errors.tolerance": "none",
      "file": "/tmp/test.txt",
      "header.converter": null,
      "key.converter": null,
      "predicates": null,
      "tasks.max": "1",
      "topics": "test_topic_name",
      "topics.regex": null,
      "transforms": null,
      "value.converter": null
    }
    Attribute Description

    name

    Unique connector name

    config.action.reload

    The action Kafka Connect should take on a connector when changes to external configuration providers result in changes to the connector’s configuration properties:

    • none — the connector update is not required;

    • restart — the connector will be restarted with updated configuration properties.

    connector.class

    The class name for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector

    errors.deadletterqueue.context.headers.enable

    The value is set to true if it is necessary to add headers containing the error context to messages written to the dead-letter queue (DLQ). To avoid conflicts with headers from the original entry, all error context header keys will start with __connect.errors

    errors.deadletterqueue.topic.name

    The name of the section to be used as the dead-letter queue (DLQ) for messages that result in an error when processed by this connector or its transforms

    errors.deadletterqueue.topic.replication.factor

    The replication factor used to create the dead-letter queue topic

    errors.log.enable

    Parameter values:

    • true — each error, failed operation, and problem entry is logged in the Connect application log;

    • false — only those errors that are invalid are fixed.

    errors.log.include.messages

    Parameter values:

    • true — logs the connection entry that caused the failure. For this connector, the topic, partition, offset, and timestamp will be logged. For source records, the key and value (and their schemas), all headers and timestamp, Kafka topic, Kafka partition, source partition, and source offset will be logged;

    • false — no entry keys, values, and headers are written to the log files.

    errors.retry.delay.max.ms

    Maximum duration between consecutive request retries (in milliseconds)

    errors.retry.timeout

    The maximum duration that the failed operation will be retried (in milliseconds). Parameter values:

    • 0 — no retries;

    • -1 — the number of retries is infinite.

    errors.tolerance

    Behavior for allowable errors while the connector is running. Parameter values:

    • none — any error will cause the connector task to fail immediately;

    • all — changes behavior to skip problem entries.

    file

    The file in which messages are generated when added to the specified topic

    header.converter

    The HeaderConverter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. This controls the format of header values ​​in messages written to or read from Kafka, and since it is independent of connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. By default, SimpleHeaderConverter is used to serialize header values ​​into strings and deserialize them by inferring schemas

    key.converter

    The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of keys in messages written to or read from Kafka, and because it does not depend on connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro

    predicates

    Aliases for predicates used by transformations

    tasks.max

    The maximum number of tasks that can be used for a connector

    topics

    List of topics separated by commas. When messages are added to these topics, they are automatically created in the specified local file

    topics.regex

    A regular expression that specifies the topic names to consume. Regular expression compiles to java.util.regex.Pattern

    transforms

    Aliases for transformations applied to records

    value.converter

    The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of values ​​in messages written to or read from Kafka and, because it is connector independent, allows any connector to work with any serialization format. Examples of common formats include JSON and Avro

    Sample JSON file content for a simple FileStreamSourceConnector configuration
    {
      "name": "FileSource",
      "batch.size": "2000",
      "config.action.reload": "restart",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
      "errors.log.enable": "false",
      "errors.log.include.messages": "false",
      "errors.retry.delay.max.ms": "60000",
      "errors.retry.timeout": "0",
      "errors.tolerance": "none",
      "file": "/tmp/test.txt",
      "header.converter": null,
      "key.converter": null,
      "predicates": null,
      "tasks.max": "1",
      "topic": "test",
      "topic.creation.groups": null,
      "transforms": null,
      "value.converter": null
    }
    Attribute Description

    name

    Unique connector name

    batch.size

    Permissible batch size

    config.action.reload

    The action Kafka Connect should take on a connector when changes to external configuration providers result in changes to the connector’s configuration properties:

    • none — the connector update is not required;

    • restart — the connector will be restarted with updated configuration properties.

    connector.class

    The class name for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector

    errors.log.enable

    Parameter values:

    • true — each error, failed operation, and problem entry is logged in the Connect application log;

    • false — only those errors that are invalid are fixed.

    errors.log.include.messages

    Parameter values:

    • true — logs the connection entry that caused the failure. For this connector, the topic, partition, offset, and timestamp will be logged. For source records, the key and value (and their schemas), all headers and timestamp, Kafka topic, Kafka partition, source partition, and source offset will be logged;

    • false — no entry keys, values, and headers are written to the log files.

    errors.retry.delay.max.ms

    Maximum duration between consecutive request retries (in milliseconds)

    errors.retry.timeout

    The maximum duration that the failed operation will be retried (in milliseconds). Parameter values:

    • 0 — no retries;

    • -1 — the number of retries is infinite.

    errors.tolerance

    Behavior for allowable errors while the connector is running. Parameter values:

    • none — any error will cause the connector task to fail immediately;

    • all — changes behavior to skip problem entries.

    file

    File from which messages are sent to the specified topic

    header.converter

    The HeaderConverter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. This controls the format of header values ​​in messages written to or read from Kafka, and since it is independent of connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. By default, SimpleHeaderConverter is used to serialize header values ​​into strings and deserialize them by inferring schemas

    key.converter

    The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of keys in messages written to or read from Kafka, and because it does not depend on connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro

    predicates

    Aliases for predicates used by transformations

    tasks.max

    The maximum number of tasks that can be used for a connector

    topics

    List of topics separated by commas. When messages are added to these topics, they are automatically created in the specified local file

    topic.creation.groups

    The default group is always defined for theme configurations. Values ​​for this property apply to additional groups

    transforms

    Aliases for transformations applied to records

    value.converter

    The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of values ​​in messages written to or read from Kafka and, because it is connector independent, allows any connector to work with any serialization format. Examples of common formats include JSON and Avro

  6. After filling, click Save and get a message about the successful creation of the connector.

    Message about the successful creation of the connector
    Message about the successful creation of the connector
    Message about the successful creation of the connector
    Message about the successful creation of the connector
  7. Check that the Connectors for ads page displays the created connectors in the working status. The status is determined depending on the indicator in front of the connector name:

    • green — the connector/task is running;

    • yellow — the connector/task has been administratively paused;

    • red — the connector/task has failed (usually by raising an exception, which is reported in the status output);

    • unassigned — the connector/task has not yet been assigned to a worker.

      Created connectors
      Created connectors
      Created connectors
      Created connectors

After creating connectors, it becomes possible to write data from a local file to a Kafka topic and vice versa, depending on the type of connector.

Found a mistake? Seleсt text and press Ctrl+Enter to report it