Kafka Connect overview
Kafka Connect features
Kafka Connect — a tool for scalable and reliable data streaming between Apache Kafka and other data systems. Kafka Connect works by creating special connectors that move data to Kafka or vice versa.
The main benefits of Kafka Connect are listed below:
-
Copying data (creating connectors, managing them) is performed using the REST interface.
-
Ample scalability: changing the number of workers, connecting new connectors to existing platforms, managing the number of running tasks.
-
Automatic control of data offsets during connector operation.
-
Wide selection of existing connector plugins, as well as the ability to create your own to suit the needs of your system.
-
Integration of streaming/batch data processing.
Kafka Connect architecture
The Kafka Connect architecture is described below for the case when data transfer from an external system (source) to Kafka is required. To perform data copying, the source connector is launched in the Kafka Connect cluster with two Kafka Connect worker components and two tasks created by the connector.
A Kafka Connect cluster consists of several workers.
To create a connector instance, you can send a request to the REST API of any of the workers. The created connector connects to the data storage, creates the required (specified in the connector configuration) number of tasks and evenly divides the volume of copied data between them. After creating a connector, workers (cluster members) launch the created tasks, and also evenly rebalance all created connectors and tasks so that each worker has an equal load.
The data copied by each task can be modified as the data passes through Kafka Connect using the single message transform module. The required data transformations are specified in the connector configuration upon creation.
As data passes through Kafka Connect, it is serialized using a special converter before being written to Kafka. The converter type can be specified for the entire Kafka Connect service, as well as for each specific connector.
Worker
Worker is a container that launches connectors and tasks. Each worker is a Java process running inside the JVM.
A worker can function in two modes:
-
Standalone mode — Kafka Connect operating mode when only one worker is used. In this case, all data about connectors is stored inside Kafka Connect. Offline mode has limited functionality and lacks fault tolerance.
-
Distributed mode — Kafka Connect operating mode in which several workers are used. To combine several workers into a cluster, use the group.id parameter.
In this mode, the REST API of each worker is used to create, configure, and manage connectors. Requests can be sent to any member of the cluster.
In distributed mode, workers perform the following functions:
-
Processing REST requests to create and configure connectors using special connector plugins.
-
Recording and storing data about connectors in special Kafka topics, defined by the following parameters:
-
config.storage.topic — the name of the topic that stores connector settings.
-
offset.storage.topic — the name of the topic that stores offsets for data already processed by connectors. This is described in more detail below in the description of the task architecture.
-
status.storage.topic — the name of the topic that stores information about the state of connectors.
To write data to the Kafka broker, workers use the Kafka API.
-
-
Management of connectors and tasks.
-
Ensuring scalability and automatic fault tolerance for Kafka Connect:
-
when adding workers, tasks are redistributed among all workers;
-
when the number of workers is reduced (managed or in case of failure), tasks are rebalanced between available workers.
-
The Kafka Connect worker settings for the distributed mode are set in the connect-distributed.properties configuration file.
The figure below shows an example of task distribution among several workers and the interaction of the worker with Kafka.
Connector
A connector instance is a logical job that copies messages from an input stream to an output stream, with a Kafka topic (or multiple topics) being the data source or copying destination. Depending on what Kafka is in this process, the type of connector is determined:
-
Source connector is a connector that interacts with the API of the data source system, retrieves data and data schema, and passes it to Kafka. Such connectors can ingest entire databases and write new data to Kafka topics when tables are updated.
-
Sink connector is a connector that accepts data from Kafka and writes it to the target system using its API. Such connectors can deliver data from Kafka topics to systems such as Elasticsearch or Hadoop.
The figure below shows the process of creating a connector instance.
Worker creates and launches a connector instance based on a REST request. To create a connector, you need the following:
-
Connector plugin is a JAR file containing executable Java class files. Connector plugins do not read or write messages to Kafka, but rather provide an interface architecture between Kafka and the external system. The location where plugins are stored is determined by the Kafka Connect parameter plugin.path.
Classes that the plugin should contain:
-
Connector class — a connector class based on the SourceConnector or SinkConnector interface, used to create the connector instance itself.
-
Task class — a task class based on the SourceTask or SinkTask interface is used to create the task instance. The methods used in these classes ensure that the connector instance interacts with Kafka via the Kafka API (they write or read messages).
-
-
set of connector configuration parameters — parameters of source connector or sink connector, indicating the source and purpose of data copying, the number of tasks, serialization and conversion parameters, etc.
The connector instance does:
-
monitors input data for changes and notifies the Kafka Connect runtime of this to run tasks;
-
creates the required number of task instances and divides the work of copying data between tasks;
-
receives settings for tasks from performers.
The minimum parameters that must be provided in the request to successfully create a connector are:
-
name — the unique name of the connector. This name must not be repeated for connectors created in this cluster.
-
connector.class — the name of the connector class.
-
tasks.max — the maximum number of threads (tasks) in which the connector can run (default is
1
).
Below are examples of creating a connector using several technologies.
Each Kafka Connect worker provides its own REST API for requests to create connectors and manage connectors and tasks.
For example, a request to create a simple FileStreamSource connector (copies data from a local file to a Kafka topic) might look like this:
$ curl -X POST --data-binary "@source.json" -H "Content-Type: application/json" http://localhost:8083/connectors | jq
where @source.json
is a file that contains all the parameters describing the connector in JSON format:
{
"name":"FileStream-Kafka",
"config": {
"connector.class": "FileStreamSource",
"file": "/data/source.csv",
"topic": "data"
}
}
In the ksqlDB environment, connectors are created using special CREATE CONNECTOR operators. For example, the command to create a Debezium connector to communicate with Postgres looks like this:
CREATE SOURCE CONNECTOR `debezium` WITH (
'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
'database.hostname' = 'postgres',
'database.port' = '5432',
'database.user' = 'postgres',
'database.password' = 'password',
'database.dbname' = 'postgres',
'database.server.name' = 'postgres',
'table.whitelist' = 'public.customers',
'topic.prefix' = 'postgres',
'transforms' = 'unwrap',
'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.drop.tombstones' = 'false',
'transforms.unwrap.delete.handling.mode' = 'rewrite',
'plugin.name' = 'pgoutput',
'tasks.max' = '1'
);
For ADS, it is possible to create connectors via the Kafka Connects page of the ADS Control user interface.
The following plugins are available for use:
-
MirrorCheckpointConnector, MirrorHeartbeatConnector, MirrorSourceConnector — connectors designed for replication of topics via the Mirror Maker 2 mechanism. For information on creating these connectors using ADS Control, see Mirror Maker 2 in ADS Control.
-
PostgreSQL — Debezium connector for PostgreSQL Server.
-
MS SQL — Debezium connector for MS SQL Server.
Task
Task is an implementation of data copying between the external system and Kafka. Each task instance is created inside a connector instance using a special task class and parameters set when creating the connector.
Each task runs in its own Java thread.
The figure below shows how data flows through the source task in Kafka Connect.
A task instance (SourceTask) for each record creates an instance of the SourceRecord class, which transmits a message (key, value) to the Kafka Connect platform (worker) for writing to Kafka, while performing all transformations (if necessary) and serialization for the message.
The record instance transmits to the worker, via special fields, information about where the data is stored in Kafka (topic, kafkaPartition) and can also transmit data about the record source via the sourcePartition
method (passes the record source, for example, file name, table name, etc.) and sourceOffset
method (passes the position of sourcePartition
). This data can be used to resume copying data from the source in a previous location in case of failures or restarts for maintenance.
The message and offset data sent by SourceRecord to the worker are recorded by the worker in Kafka topics using the Kafka API.
The tasks themselves have no state stored in them. The task status is stored in special Kafka topics defined by the config.storage.topic and status.storage.topic parameters, and is managed by the corresponding connector. Tasks can be started, stopped, or restarted at any time using the worker’s REST API.
Converter
Converter is a ready-made component that converts data formats. Converters provide a mechanism for converting data from the internal data types used by Kafka Connect to data types exposed as AVRO, PROTOBUF, or JSON Schema.
Converters do:
-
serialization and deserialization of data, connecting serializers and deserializers from Kafka client libraries;
-
saving and retrieving AVRO, PROTOBUF, or JSON schemas from Schema Registry;
-
interaction of the connector with the Schema Registry service for saving and retrieving schemas.
The converter class is specified in the configuration parameters of the Kafka Connect workers:
Also, the converter class can be defined separately for each connector during creation.
The figure below shows how data is serialized when using the source connector.
When connecting for the first time, connectors create schemas for incoming data and use a converter to register the schema in the Schema Registry. During the process of copying data, converters extract the desired schema and serialize the data before writing it to a Kafka topic.
Transform
The single message transform (SMT) module is a ready-made component with simple logic for changing a single message.
SMT transformations are optional and are used to modify data before it is written to Kafka, and to modify data read from Kafka before it is written to the sink.
The necessary parameters for SMT conversion are configured using the transforms configuration parameter.
Examples of using SMT transformations:
-
Removing fields from ingest data, such as personally identifiable information (PII), if specified by system requirements.
-
Adding metadata information such as provenance to data received through Kafka Connect.
-
Changing field data types.
-
Changing topic title to include timestamp.
-
Renaming fields.
For more complex transformations, such as aggregation or merging with other topics, it is recommended to use stream processing in ksqlDB or Kafka Streams.
Handle errors
Possible error handling options are described below:
-
Fail fast is an error handling strategy in which any error during the operation of the Kafka Connect task and connectors causes the connector to fail.
This strategy is configured by default using the errors.tolerance connector parameter if its value is
none
. When the parameter value isall
, if an error occurs, the connector continues to work. In this case, there are two possible scenarios for dealing with errors: ignoring errors and redirecting problematic records to the dead letter queue. -
Dead letter queue is a queue of undelivered messages, a storage place for messages during the processing of which an error occurred. Undelivered messages are recorded in a special log.
The dead letter queue is configured using the following parameters:
-
errors.log.enable — if the parameter value is
true
, problematic entries that cause an error are written to a special log. -
errors.log.include.messages — if the parameter value is
true
, problematic entries that cause an error are written to a special log. At the same time, metadata will also be registered for records (for the sink connector: the topic, partition, offset, and timestamp; for the source connector: the key and value (and their schemes), all headers, as well as the timestamp, Kafka topic, Kafka partition, source partition, and source offset).
Kafka Connect in ADS
Connection
After addition and installation of the Kafka Connect service as part of an ADS cluster, you can connect to Kafka Connect using the host where the Kafka Connect Worker component is located, and the port specified as the rest.port parameter (default 8083
) in the connect-distributed.properties group on the configuration page of the Kafka Connect service.
Kafka
In ADS, the Kafka Connect service can be installed only after installing the Kafka service. After installing Kafka Connect, the bootstrap.servers
parameter in the /etc/kafka-connect/config/connect-distributed.properties configuration file is automatically set to communicate with the Kafka broker, as well as other options for interaction between Kafka and Kafka Connect (for example, for internal Kafka Connect topics created in Kafka).
Schema Registry
The Schema Registry service, installed in ADS simultaneously with the Kafka Connect service, provides the ability to convert data from internal data types used by Kafka Connect into data in AVRO, PROTOBUF, or JSON format and vice versa.
For example, to work with data in AVRO format in the connect-distributed.properties configuration file, you should set a special AvroConverter as the key.converter
and/or value.converter
parameter, as well as for the key.converter.schema.registry parameter.url
and/or value.converter.schema.registry.url
set the Schema Registry server URL.
ksqlDB
The ksqlDB service, installed in ADS simultaneously with the Kafka Connect service, provides the ability to manage Kafka Connect connectors. Configuring the interaction between two services is performed on the side of the ksqlDB service. For more information, see ksqlDB overview.
Configure Kafka Connect
Configuring Kafka Connect parameters in the ADCM interface is performed on the configuration page of the Kafka Connect service.
To configure the parameters of the /etc/kafka-connect/config/connect-distributed.properties configuration file, set the Show advanced switch to active, expand the connect-distributed.properties node and enter new values for the parameters. To change Kafka Connect parameters that are not available in the ADCM interface, use the Add key,value field. Select Add property and enter the name of the parameter and its value.
After changing the parameters using the ADCM interface, restart the Kafka Connect service. To do this, apply the Restart action by clicking in the Actions column.