FileStream in ADSCC
This article describes how to configure FileStream connectors using the ADS Command Center.
FileStream connectors are Kafka Connect service tools:
-
FileStreamSource Connector reads data from a local file and writes it to a Kafka topic.
-
FileStreamSink Connector reads data from a Kafka topic and outputs it to a local file.
NOTE
Before configuring FileStream connectors in the cluster, the Kafka Connect service must be installed and configured. Kafka Connect is available for installation in ADS starting from version 1.7.1.
|
Creating FileStream connectors
To create connectors using the ADS Command Center, you need to:
-
Install ADS Command Center in ADCM and integrate it with the target ADS cluster.
-
Expand the ADS Command Center user interface.
-
In the ADS Command Center user interface, select the required ADS cluster.
Cluster selection in the ADS Command Center interfaceCluster selection in the ADS Command Center interface -
On the page that opens, select a ADS cluster from the list of Kafka Connect clusters.
Opening the page with Kafka Connect connectorsOpening the page with Kafka Connect connectors -
On the page that opens, click Create Connector.
Creating a Kafka Connect ConnectorCreating a Kafka Connect Connector -
Select the desired connector to create.
Selecting the Kafka Connect connector to createSelecting the Kafka Connect connector to create -
Fill in the connector configuration parameters. If necessary, refer to the Kafka Connect service configuration information in ADS configuration parameters. You can use populating the config as a JSON file. To do this, turn on the corresponding switch.
Connector configurationКонфигурация коннектораConnector configuration JSON fileConnector configuration JSON fileSample JSON file content for simple FileStreamSinkConnector configuration{ "name": "FileSink", "config.action.reload": "restart", "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "errors.deadletterqueue.context.headers.enable": "false", "errors.deadletterqueue.topic.name": null, "errors.deadletterqueue.topic.replication.factor": "3", "errors.log.enable": "false", "errors.log.include.messages": "false", "errors.retry.delay.max.ms": "60000", "errors.retry.timeout": "0", "errors.tolerance": "none", "file": "/tmp/test.txt", "header.converter": null, "key.converter": null, "predicates": null, "tasks.max": "1", "topics": "test_topic_name", "topics.regex": null, "transforms": null, "value.converter": null }
Attribute Description name
Unique connector name
config.action.reload
The action Kafka Connect should take on a connector when changes to external configuration providers result in changes to the connector’s configuration properties:
-
none
— the connector update is not required; -
restart
— the connector will be restarted with updated configuration properties.
connector.class
The name of the class for this connector. Must be a subclass of
org.apache.kafka.connect.connector.Connector
errors.deadletterqueue.context.headers.enable
The value is set to
true
if it is necessary to add headers containing the error context to messages written to the dead-letter queue (DLQ). To avoid conflicts with headers from the original entry, all error context header keys will start with__connect.errors
errors.deadletterqueue.topic.name
The name of the section to be used as the dead-letter queue (DLQ) for messages that result in an error when processed by this connector or its transforms
errors.deadletterqueue.topic.replication.factor
The replication factor used to create the dead-letter queue topic
errors.log.enable
Parameter values:
-
true
— each error, failed operation, and problem entry is logged in the Connect application log; -
false
— only those errors that are invalid are fixed.
errors.log.include.messages
Parameter values:
-
true
— logs the connection entry that caused the failure. For this connector, the topic, partition, offset, and timestamp will be logged. For source records, the key and value (and their schemas), all headers and timestamp, Kafka topic, Kafka partition, source partition, and source offset will be logged; -
false
— no entry keys, values, and headers are written to the log files.
errors.retry.delay.max.ms
Maximum duration between consecutive request retries (in milliseconds)
errors.retry.timeout
The maximum duration that the failed operation will be retried (in milliseconds). Parameter values:
-
0
— no retries; -
-1
— the number of retries is infinite.
errors.tolerance
Behavior for allowable errors while the connector is running. Parameter values:
-
none
— any error will cause the connector task to fail immediately; -
all
— changes behavior to skip problem entries.
file
The file in which messages are generated when added to the specified topic
header.converter
The
HeaderConverter
class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. This controls the format of header values in messages written to or read from Kafka, and since it is independent of connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. By default,SimpleHeaderConverter
is used to serialize header values into strings and deserialize them by inferring schemaskey.converter
The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of keys in messages written to or read from Kafka, and because it does not depend on connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro
predicates
Aliases for predicates used by transformations
tasks.max
The maximum number of tasks that can be used for a connector
topics
List of topics separated by commas. When messages are added to these topics, they are automatically created in the specified local file
topics.regex
A regular expression that specifies the topic names to consume. Regular expression compiles to
java.util.regex.Pattern
transforms
Aliases for transformations applied to records
value.converter
The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of values in messages written to or read from Kafka and, because it is connector independent, allows any connector to work with any serialization format. Examples of common formats include JSON and Avro
Sample JSON file content for a simple FileStreamSourceConnector configuration{ "name": "FileSource", "batch.size": "2000", "config.action.reload": "restart", "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "errors.log.enable": "false", "errors.log.include.messages": "false", "errors.retry.delay.max.ms": "60000", "errors.retry.timeout": "0", "errors.tolerance": "none", "file": "/tmp/test.txt", "header.converter": null, "key.converter": null, "predicates": null, "tasks.max": "1", "topic": "test", "topic.creation.groups": null, "transforms": null, "value.converter": null }
Attribute Description name
Unique connector name
batch.size
Permissible batch size
config.action.reload
The action Kafka Connect should take on a connector when changes to external configuration providers result in changes to the connector’s configuration properties:
-
none
— the connector update is not required; -
restart
— the connector will be restarted with updated configuration properties.
connector.class
The name of the class for this connector. Must be a subclass of
org.apache.kafka.connect.connector.Connector
errors.log.enable
Parameter values:
-
true
— each error, failed operation, and problem entry is logged in the Connect application log; -
false
— only those errors that are invalid are fixed.
errors.log.include.messages
Parameter values:
-
true
— logs the connection entry that caused the failure. For this connector, the topic, partition, offset, and timestamp will be logged. For source records, the key and value (and their schemas), all headers and timestamp, Kafka topic, Kafka partition, source partition, and source offset will be logged; -
false
— no entry keys, values, and headers are written to the log files.
errors.retry.delay.max.ms
Maximum duration between consecutive request retries (in milliseconds)
errors.retry.timeout
The maximum duration that the failed operation will be retried (in milliseconds). Parameter values:
-
0
— no retries; -
-1
— the number of retries is infinite.
errors.tolerance
Behavior for allowable errors while the connector is running. Parameter values:
-
none
— any error will cause the connector task to fail immediately; -
all
— changes behavior to skip problem entries.
file
File from which messages are sent to the specified topic
header.converter
The
HeaderConverter
class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. This controls the format of header values in messages written to or read from Kafka, and since it is independent of connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. By default,SimpleHeaderConverter
is used to serialize header values into strings and deserialize them by inferring schemaskey.converter
The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of keys in messages written to or read from Kafka, and because it does not depend on connectors, it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro
predicates
Aliases for predicates used by transformations
tasks.max
The maximum number of tasks that can be used for a connector
topics
List of topics separated by commas. When messages are added to these topics, they are automatically created in the specified local file
topic.creation.groups
The default group is always defined for theme configurations. Values for this property apply to additional groups
transforms
Aliases for transformations applied to records
value.converter
The converter class used to convert between the Kafka Connect format and the serialized form that is written to Kafka. It controls the format of values in messages written to or read from Kafka and, because it is connector independent, allows any connector to work with any serialization format. Examples of common formats include JSON and Avro
-
-
After filling, click Save configuration and get a message about the successful creation of the connector.
Message about the successful creation of the connectorMessage about the successful creation of the connector -
Check that the Connectors for ads page displays the created connectors in working status. The status is determined depending on the color of the indicator in front of the connector name:
-
green — the connector is working correctly;
-
gray — the connector is not working at full capacity (the number of tasks in the connector configuration is more than used);
-
red — an error in the connector.
Created connectorsCreated connectors
-
After creating connectors, it becomes possible to write data from a local file to a Kafka topic and vice versa, depending on the type of connector.