Kafka Connect configuration parameters
To configure the service, use the following configuration parameters in ADCM.
|
NOTE
|
| Parameter | Description | Default value |
|---|---|---|
tasks.max |
Maximum number of tasks that should be created for this connector |
10 |
offset-syncs.topic.replication.factor |
Replication factor for internal |
1 |
checkpoint.topic.replication.factor |
Replication factor used for internal |
1 |
sync.topic.acls.enabled |
Enable monitoring of the source cluster for ACL changes |
false |
| Parameter | Description | Default value |
|---|---|---|
LOG_DIR |
Directory for logs |
/var/log/kafka |
KAFKA_OPTS |
Environment variables for Kafka |
— |
KAFKA_HEAP_OPTS |
Heap size allocated to the KAFKA server process |
-Xms256M -Xmx2G |
KAFKA_LOG4J_OPTS |
Environment variable for LOG4J logging configuration |
-Dlog4j.configuration=file:/etc/kafka/conf/connect-distributed-log4j.properties |
Add property |
Parameters and their values entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values for all user parameters that are not displayed in the interface, but are allowed in the configuration file kafka-connect-env.sh |
— |
| Parameter | Description | Default value |
|---|---|---|
group.id |
Unique string that identifies a Kafka Connect group, to which this connector belongs |
mm-connect |
rest.port |
Port for Kafka Connect REST API to work |
8083 |
plugin.path |
Path to Kafka Connect plugin |
Preconfigured values:
|
config.storage.replication.factor |
Replication factor for |
1 |
offset.storage.replication.factor |
Replication factor for |
1 |
status.storage.replication.factor |
Replication factor for |
1 |
offset.flush.interval.ms |
Interval at which to try committing offsets for tasks |
1000 |
key.converter |
Converter class for key Connect data |
org.apache.kafka.connect.converters.ByteArrayConverter |
value.converter |
Converter class for value Connect data |
org.apache.kafka.connect.converters.ByteArrayConverter |
connector.client.config.override.policy |
Class name or alias of implementation of ConnectorClientConfigOverridePolicy |
None |
Add key, value |
Parameters and their values entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values for all user parameters that are not displayed in the interface, but are allowed in the configuration file connect-distributed.properties |
— |
| Parameter | Description | Default value |
|---|---|---|
rootLogger |
Logging level |
INFO |
MaxBackupIndex |
Maximum number of saved files |
10 |
MaxFileSize |
Maximum file size |
100MB |
Setting the structure of the logging configuration file for Kafka Connect
| Logger | Default package names | Default event level |
|---|---|---|
loggers |
org.reflections |
ERROR |
org.I0Itec.zkclient |
ERROR |
|
org.apache.zookeeper |
ERROR |
|
Add property |
Parameters and their values entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values for all user parameters that are not displayed in the interface, but are allowed in the configuration file connect-distributed-log4j.properties |
The connect_distributed_log4j_properties template is intended for specifying custom logging parameters.
Default value:
{% set connect_distributed_log4j_properties = services.kafka_connect.config.connect_distributed_log4j_properties_content %}
log4j.rootLogger={{ connect_distributed_log4j_properties['rootLogger'] }}, connectDRFA, connectRFA, stdout
# Send the logs to the console.
#
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#
# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual information.
#
#log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
connect.log.pattern=[%d] %p %m (%c:%L)%n
{% for key, value in connect_distributed_log4j_properties['loggers'] | dictsort -%}
log4j.logger.{{ key }}={{ value }}
{% endfor -%}
log4j.appender.connectDRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectDRFA.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectDRFA.File=${kafka.logs.dir}/connect-distributed.log
log4j.appender.connectDRFA.layout=org.apache.log4j.PatternLayout
log4j.appender.connectDRFA.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectDRFA.MaxBackupIndex={{ connect_distributed_log4j_properties['MaxBackupIndex'] }}
log4j.appender.connectRFA=org.apache.log4j.RollingFileAppender
log4j.appender.connectRFA.File=${kafka.logs.dir}/connect-distributed.log
log4j.appender.connectRFA.layout=org.apache.log4j.PatternLayout
log4j.appender.connectRFA.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.connectRFA.Append=true
log4j.appender.connectRFA.MaxBackupIndex={{ connect_distributed_log4j_properties['MaxBackupIndex'] }}
log4j.appender.connectRFA.MaxFileSize={{ connect_distributed_log4j_properties['MaxFileSize'] }}
| Parameter | Description | Default value |
|---|---|---|
ssl.keystore.location |
Location of the keystore file. This is optional for client and can be used for two-way authentication for client |
— |
ssl.keystore.password |
Store password for the keystore file. This is optional for client and only needed if |
— |
ssl.key.password |
Password of the private key in the keystore file. This is optional for client |
— |
ssl.keystore.type |
File format of the keystore file. This is optional for client |
— |
ssl.truststore.location |
Location of the truststore file |
— |
ssl.truststore.password |
Store password for the truststore file. This is optional for client and only needed if |
— |
ssl.truststore.type |
File format of the truststore file |
— |
The user file template jaas.conf is intended for specifying user data for connecting clients of other services to the current service (paths to keytab files, the useTicketCache parameter, and others). For more information, see Configure a custom jaas.conf.
Default value:
{% if cluster.config.kerberos_client and cluster.config.kerberos_client.enable_kerberos %}
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName="kafka"
keyTab="{{ cluster.config.kerberos_client.keytab_dir }}/kafka-connect.service.keytab"
principal="kafka-connect/{{ ansible_fqdn }}@{{ cluster.config.kerberos_client.realm }}";
};
{%- elif cluster.config.sasl_plain_auth_default_config is not none %}
{%- set credential = cluster.config.sasl_plain_auth_default_config.sasl_plain_users_data %}
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka-connect"
password="{{ credential['kafka-connect'] }}";
};
{%- endif %}
Kafka Connect Worker component configuration parameters:
| Parameter | Description | Default value |
|---|---|---|
jmx_port |
Port to which JMX metrics are sent |
9996 |
JMX Exporter Port |
Port for connecting to Prometheus JMX Exporter |
11205 |
Enables authentication for JMX in the service (used when access to the JMX port needs to be protected).
| Parameter | Description | Default value |
|---|---|---|
Username |
Username for authentication in JMX |
monitoring |
Password |
User password for authentication in JMX |
— |