FileStream in ADSCC

This article describes how to configure FileStream connectors using the ADS Command Center.

FileStream connectors are Kafka Connect service tools:

  • FileStreamSource Connector reads data from a local file and writes it to a Kafka topic.

  • FileStreamSink Connector reads data from a Kafka topic and outputs it to a local file.

NOTE
Before configuring FileStream connectors in the cluster, the Kafka Connect service must be installed and configured. Kafka Connect is available for installation in ADS starting from version 1.7.1.

Creating FileStream connectors

To create connectors using the ADS Command Center, you need to:

  1. Install ADS Command Center in ADCM and integrate it with the target ADS cluster.

  2. Expand the ADS Command Center user interface.

  3. In the ADS Command Center user interface, select the required ADS cluster.

    adscc mm2 01 dark
    Cluster selection in the ADS Command Center interface
    adscc mm2 01 light
    Cluster selection in the ADS Command Center interface
  4. On the page that opens, select a ADS cluster from the list of Kafka Connect clusters.

    adscc mm2 02 dark
    Opening the page with Kafka Connect connectors
    adscc mm2 02 light
    Opening the page with Kafka Connect connectors
  5. On the page that opens, click Create Connector.

    adscc mm2 03 dark
    Creating a Kafka Connect Connector
    adscc mm2 03 light
    Creating a Kafka Connect Connector
  6. Select the desired connector to create.

    adscc work 03 dark
    Selecting the Kafka Connect connector to create
    adscc work 03 light
    Selecting the Kafka Connect connector to create
  7. Fill in the connector configuration parameters. If necessary, refer to the Kafka Connect service configuration information in ADS configuration parameters. You can use populating the config as a JSON file. To do this, turn on the corresponding switch.

    adscc work 04 dark
    Connector configuration
    adscc work 04 light
    Конфигурация коннектора
    adscc work 05 dark
    Connector configuration JSON file
    adscc work 05 light
    Connector configuration JSON file
    Sample JSON file content for simple FileStreamSinkConnector configuration
    {
      "name": "FileSink",
      "config.action.reload": "restart",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
      "errors.deadletterqueue.context.headers.enable": "false",
      "errors.deadletterqueue.topic.name": null,
      "errors.deadletterqueue.topic.replication.factor": "3",
      "errors.log.enable": "false",
      "errors.log.include.messages": "false",
      "errors.retry.delay.max.ms": "60000",
      "errors.retry.timeout": "0",
      "errors.tolerance": "none",
      "file": "/tmp/test.txt",
      "header.converter": null,
      "key.converter": null,
      "predicates": null,
      "tasks.max": "1",
      "topics": "test_topic_name",
      "topics.regex": null,
      "transforms": null,
      "value.converter": null
    }
    Attribute Description

    name

    Unique connector name

    config.action.reload

    The action Kafka Connect should take on a connector when changes to external configuration providers result in changes to the connector’s configuration properties:

    • none — the connector update is not required;

    • restart — the connector will be restarted with updated configuration properties.

    connector.class

    The name of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector

    errors.deadletterqueue.context.headers.enable

    The value is set to true if it is necessary to add headers containing the error context to messages written to the dead-letter queue (DLQ). To avoid conflicts with headers from the original entry, all error context header keys will start with __connect.errors

    errors.deadletterqueue.topic.name

    The name of the section to be used as the dead-letter queue (DLQ) for messages that result in an error when processed by this connector or its transforms

    errors.deadletterqueue.topic.replication.factor

    The replication factor used to create the dead-letter queue topic

    errors.log.enable

    Parameter values:

    • true — each error, failed operation, and problem entry is logged in the Connect application log;

    • false — only those errors that are invalid are fixed.

    errors.log.include.messages

    Parameter values:

    • true — logs the connection entry that caused the failure. For this connector, the topic, partition, offset, and timestamp will be logged. For source records, the key and value (and their schemas), all headers and timestamp, Kafka topic, Kafka partition, source partition, and source offset will be logged;

    • false — no entry keys, values, and headers are written to the log files.

    errors.retry.delay.max.ms

    Maximum duration between consecutive request retries (in milliseconds)

    errors.retry.timeout

    The maximum duration that the failed operation will be retried (in milliseconds). Parameter values:

    • 0 — no retries;

    • -1 — the number of retries is infinite.

    errors.tolerance

    Behavior for allowable errors while the connector is running. Parameter values:

    • none — any error will cause the connector task to fail immediately;

    • all — changes behavior to skip problem entries.

    file

    The file in which messages are generated when added to the specified topic

    header.converter

    The HeaderConverter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. This controls the format of header values ​​in messages written to or read from Kafka, and since it is independent of connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. By default, SimpleHeaderConverter is used to serialize header values ​​into strings and deserialize them by inferring schemas

    key.converter

    The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of keys in messages written to or read from Kafka, and because it does not depend on connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro

    predicates

    Aliases for predicates used by transformations

    tasks.max

    The maximum number of tasks that can be used for a connector

    topics

    List of topics separated by commas. When messages are added to these topics, they are automatically created in the specified local file

    topics.regex

    A regular expression that specifies the topic names to consume. Regular expression compiles to java.util.regex.Pattern

    transforms

    Aliases for transformations applied to records

    value.converter

    The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of values ​​in messages written to or read from Kafka and, because it is connector independent, allows any connector to work with any serialization format. Examples of common formats include JSON and Avro

    Sample JSON file content for a simple FileStreamSourceConnector configuration
    {
      "name": "FileSource",
      "batch.size": "2000",
      "config.action.reload": "restart",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
      "errors.log.enable": "false",
      "errors.log.include.messages": "false",
      "errors.retry.delay.max.ms": "60000",
      "errors.retry.timeout": "0",
      "errors.tolerance": "none",
      "file": "/tmp/test.txt",
      "header.converter": null,
      "key.converter": null,
      "predicates": null,
      "tasks.max": "1",
      "topic": "test",
      "topic.creation.groups": null,
      "transforms": null,
      "value.converter": null
    }
    Attribute Description

    name

    Unique connector name

    batch.size

    Permissible batch size

    config.action.reload

    The action Kafka Connect should take on a connector when changes to external configuration providers result in changes to the connector’s configuration properties:

    • none — the connector update is not required;

    • restart — the connector will be restarted with updated configuration properties.

    connector.class

    The name of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector

    errors.log.enable

    Parameter values:

    • true — each error, failed operation, and problem entry is logged in the Connect application log;

    • false — only those errors that are invalid are fixed.

    errors.log.include.messages

    Parameter values:

    • true — logs the connection entry that caused the failure. For this connector, the topic, partition, offset, and timestamp will be logged. For source records, the key and value (and their schemas), all headers and timestamp, Kafka topic, Kafka partition, source partition, and source offset will be logged;

    • false — no entry keys, values, and headers are written to the log files.

    errors.retry.delay.max.ms

    Maximum duration between consecutive request retries (in milliseconds)

    errors.retry.timeout

    The maximum duration that the failed operation will be retried (in milliseconds). Parameter values:

    • 0 — no retries;

    • -1 — the number of retries is infinite.

    errors.tolerance

    Behavior for allowable errors while the connector is running. Parameter values:

    • none — any error will cause the connector task to fail immediately;

    • all — changes behavior to skip problem entries.

    file

    File from which messages are sent to the specified topic

    header.converter

    The HeaderConverter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. This controls the format of header values ​​in messages written to or read from Kafka, and since it is independent of connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. By default, SimpleHeaderConverter is used to serialize header values ​​into strings and deserialize them by inferring schemas

    key.converter

    The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of keys in messages written to or read from Kafka, and because it does not depend on connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro

    predicates

    Aliases for predicates used by transformations

    tasks.max

    The maximum number of tasks that can be used for a connector

    topics

    List of topics separated by commas. When messages are added to these topics, they are automatically created in the specified local file

    topic.creation.groups

    The default group is always defined for theme configurations. Values ​​for this property apply to additional groups

    transforms

    Aliases for transformations applied to records

    value.converter

    The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of values ​​in messages written to or read from Kafka and, because it is connector independent, allows any connector to work with any serialization format. Examples of common formats include JSON and Avro

  8. After filling, click Save configuration and get a message about the successful creation of the connector.

    adscc mm2 08 dark
    Message about the successful creation of the connector
    adscc mm2 08 light
    Message about the successful creation of the connector
  9. Check that the Connectors for ads page displays the created connectors in working status. The status is determined depending on the color of the indicator in front of the connector name:

    • green — the connector is working correctly;

    • gray — the connector is not working at full capacity (the number of tasks in the connector configuration is more than used);

    • red — an error in the connector.

      adscc work 06 dark
      Created connectors
      adscc work 06 light
      Created connectors

After creating connectors, it becomes possible to write data from a local file to a Kafka topic and vice versa, depending on the type of connector.

Found a mistake? Seleсt text and press Ctrl+Enter to report it