Kafka configuration parameters
To configure the service, use the following configuration parameters in ADCM.
|
NOTE
|
| Parameter | Description | Default value |
|---|---|---|
Host index matching |
Specifies |
— |
Kafka quorum solution |
Indicates which quorum Kafka uses to store metadata and elect a leader. Must be set to |
Zookeeper |
KRaft migration mode |
Indicates whether the KRaft migration mode is enabled in Kafka |
false |
Kafka Ranger plugin enabled |
Indicates whether the Ranger Kafka plugin is enabled (auto-populated) |
false |
Kafka service environment variable settings
| Parameter | Description | Default value |
|---|---|---|
LOG_DIR |
Directory for logs |
/var/log/kafka |
PID_DIR |
Directory to store the Kafka process ID |
/var/run/kafka |
JMX_PORT |
Port on which Kafka sends JMX metrics |
9999 |
KAFKA_OPTS |
Environment variables for Kafka |
-DZstdTempFolder=/usr/lib/kafka/tmp -Dorg.xerial.snappy.tempdir=/usr/lib/kafka/tmp |
| Parameter | Description | Default value |
|---|---|---|
auto.create.topics.enable |
Enables automatic topic creation |
false |
auto.leader.rebalance.enable |
Enables automatic leader balancing in the background at regular intervals |
true |
queued.max.requests |
Number of requests in the queue before blocking network flows |
500 |
num.network.threads |
Number of threads used by the server to receive requests from the network and send responses to the network |
3 |
num.io.threads |
Sets the number of threads spawned for IO operations |
8 |
unclean.leader.election.enable |
Specifies whether to include out-of-ISR replicas and set the last resort as the leader, even if doing so may result in data loss |
false |
offsets.topic.replication.factor |
Replication factor for the offsets topic (set higher to ensure availability). Internal topic creation does not occur until the cluster size meets this replication factor requirement |
1 |
transaction.state.log.min.isr |
Overrides the |
1 |
transaction.state.log.replication.factor |
Replication factor for the transaction topic (set higher to ensure availability). Internal topic creation fails until the cluster size meets this replication factor requirement |
1 |
zookeeper.connection.timeout.ms |
Max time that the client waits to establish a connection to ZooKeeper. If not set, the value in |
30000 |
zookeeper.session.timeout.ms |
ZooKeeper session timeout (in ms) |
30000 |
zookeeper.sync.time.ms |
How far a ZooKeeper follower can be behind a ZooKeeper leader (in ms) |
2000 |
security.inter.broker.protocol |
Security protocol used to communicate between brokers |
PLAINTEXT |
ssl.keystore.location |
Location of the keystore file. This is optional for client and can be used for two-way authentication for client |
— |
ssl.keystore.password |
Store password for the keystore file. This is optional for client and only needed if |
— |
ssl.key.password |
Password of the private key in the keystore file. This is optional for client |
— |
ssl.keystore.type |
File format of the keystore file. This is optional for client |
— |
ssl.truststore.location |
Location of the truststore file |
— |
ssl.truststore.password |
Store password for the truststore file. This is optional for client and only needed if |
— |
ssl.truststore.type |
File format of the truststore file |
— |
Add key, value |
Parameters and their values entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values for all user parameters that are not displayed in the interface, but are allowed in the configuration file server.properties |
— |
log.dirs |
Directory to store the logs |
/kafka-logs |
listeners |
Comma-separated list of URIs to listen for and the names of the listeners. Only changing the port is supported. Changing the protocol or adding a listener may cause errors |
PLAINTEXT://:9092 |
default.replication.factor |
Default replication factor for automatically created topics |
1 |
num.partitions |
Default number of log partitions per topic |
1 |
delete.topic.enable |
Enables topics deletion. Topics deletion has no effect if this config is turned off |
true |
log.retention.hours |
Number of hours to keep a log file before deleting it |
168 |
log.roll.hours |
Maximum time before a new log segment is rolled out |
168 |
log.cleanup.policy |
Log cleanup policy |
delete |
log.cleanup.interval.mins |
Interval that determines how often the log cleaner checks for logs to be deleted. The log file must be deleted if it has not been modified within the time specified by the |
10 |
log.cleaner.min.compaction.lag.ms |
Minimum time a message remains uncompacted in the log. Only applicable for logs that are being compacted (in ms) |
0 |
log.cleaner.delete.retention.ms |
Amount of time to retain tombstone markers for log compacted topics (in ms) |
86400000 |
log.cleaner.enable |
Enables running the log cleaning process on the server |
true |
Apache Ranger options
| Parameter | Description | Default value |
|---|---|---|
xasecure.audit.destination.solr.batch.filespool.dir |
Directory for Solr audit spool |
/srv/ranger/kafka_plugin/audit_solr_spool |
xasecure.audit.destination.solr.urls |
Specifies Solr URL. Not setting when using ZooKeeper to connect to Solr |
— |
xasecure.audit.destination.solr.zookeepers |
Enables saving audit data to Solr for the Ranger plugins |
— |
Add key, value |
Parameters and their values entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values for all user parameters that are not displayed in the interface, but are allowed in the configuration file ranger-kafka-audit.xml |
— |
Apache Ranger options
| Parameter | Description | Default value |
|---|---|---|
ranger.plugin.kafka.policy.rest.url |
URL to Ranger Admin |
— |
ranger.plugin.kafka.service.name |
Name of the Ranger Service containing policies for this Kafka instance |
— |
ranger.plugin.kafka.policy.cache.dir |
Directory where Ranger policies are cached after successful retrieval from the source |
/srv/ranger/kafka/policycache |
ranger.plugin.kafka.policy.pollIntervalMs |
How often to poll for changes in policies (in ms) |
30000 |
ranger.plugin.kafka.policy.rest.client.connection.timeoutMs |
Kafka plugin RangerRestClient connection timeout (in ms) |
120000 |
ranger.plugin.kafka.policy.rest.client.read.timeoutMs |
Kafka plugin RangerRestClient read timeout (in ms) |
30000 |
ranger.plugin.kafka.policy.rest.ssl.config.file |
Location of the file containing SSL data for connecting to Ranger Admin |
/etc/kafka/conf/ranger-policymgr-ssl.xml |
Add key, value |
Parameters and their values entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values for all user parameters that are not displayed in the interface, but are allowed in the configuration file ranger-kafka-security.xml |
— |
Apache Ranger options
| Parameter | Description | Default value |
|---|---|---|
xasecure.policymgr.clientssl.keystore |
Location of the keystore file |
— |
xasecure.policymgr.clientssl.keystore.password |
Keystore password |
— |
xasecure.policymgr.clientssl.truststore |
Location of the truststore file |
— |
xasecure.policymgr.clientssl.truststore.password |
Truststore password |
— |
xasecure.policymgr.clientssl.keystore.credential.file |
Location of the keystore credential file |
/etc/kafka/conf/keystore.jceks |
xasecure.policymgr.clientssl.truststore.credential.file |
Location of the truststore credential file |
/etc/kafka/conf/truststore.jceks |
Add key, value |
Parameters and their values entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values for all user parameters that are not displayed in the interface, but are allowed in the configuration file ranger-policymgr-ssl.xml |
— |
The user file template jaas.conf is intended for specifying user data for connecting clients of other services to the current service (paths to keytab files, the useTicketCache parameter, and others). For more information, see Configure a custom jaas.conf.
Default value:
{% if cluster.config.kerberos_client and cluster.config.kerberos_client.enable_kerberos %}
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
keyTab="{{ cluster.config.kerberos_client.keytab_dir }}/kafka.service.keytab"
principal="kafka/{{ ansible_fqdn }}@{{ cluster.config.kerberos_client.realm }}";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
keyTab="{{ cluster.config.kerberos_client.keytab_dir }}/kafka.service.keytab"
principal="kafka/{{ ansible_fqdn }}@{{ cluster.config.kerberos_client.realm }}";
};
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName="kafka"
keyTab="{{ cluster.config.kerberos_client.keytab_dir }}/kafka.service.keytab"
principal="kafka/{{ ansible_fqdn }}@{{ cluster.config.kerberos_client.realm }}";
};
{%- elif cluster.config.sasl_plain_auth_default_config is not none %}
{%- set credential = cluster.config.sasl_plain_auth_default_config.sasl_plain_users_data %}
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="{{ credential['kafka'] }}"
{% for user, password in credential.items() %}
user_{{ user }}="{{ password }}"{% if loop.index != loop | length %}
{% endif %}
{% endfor %};
};
{% endif %}
The template is intended to specify data for connecting to Hadoop nodes.
Default value:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hadoop.security.authentication</name>
<value>kerberos</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
</configuration>
| Parameter | Description | Default value |
|---|---|---|
storage.hdfs.root |
Full path to the HDFS directory intended for storing Kafka segments |
/kafka |
storage.hdfs.upload.buffer.size |
Size of the buffer in bytes used for data transfer |
8192 |
| Parameter | Description | Default value |
|---|---|---|
storage.s3.endpoint.url |
S3 storage endpoint |
— |
storage.s3.bucket.name |
S3 bucket name — object storage unit allocated for user data |
— |
storage.aws.access.key.id |
S3 storage access ID |
— |
storage.aws.secret.access.key |
S3 storage access key |
— |
storage.s3.region |
Code for S3 storage region. Cannot be empty. If the S3 server does not provide such a parameter, set it to any value, for example, |
— |
| Parameter | Description | Default value |
|---|---|---|
chunk.size |
Size of the data fragment that will be simultaneously retrieved/loaded from/to the tiered data store |
4194304 |
fetch.chunk.cache.path |
Path to a directory for storing cache fragments of received data from multi-level storage |
/var/cache/kafka |
fetch.chunk.cache.prefetch.max.size |
Maximum size of data fragment retrieved |
8388608 |
fetch.chunk.cache.size |
Maximum size of the entire receive cache |
1073741824 |
fetch.chunk.cache.retention.ms |
Cache lifetime |
600000 |
| Parameter | Description | Default value |
|---|---|---|
Interval_sec |
Interval in seconds between checks |
10 |
Retries |
Max number of checks |
12 |
Click +1 to specify a custom listener for Kafka on the service configuration page. The new listener will be added to the listeners parameter in /usr/lib/kafka/config/server.properties in the {name}://:{port} format, and the new listener protocol will be added to the listener.security.protocol.map parameter in the {name}:{protocol} format.
| Parameter | Description |
|---|---|
name |
Name of the new listener, must be in uppercase |
protocol |
Protocol of the listener, one of the protocols must be specified: |
port |
Port of the listener |
Kafka Broker component configuration parameters:
| Parameter | Description | Default value |
|---|---|---|
log4j.rootLogger |
Logging level |
INFO |
log4j.logger.org.apache.zookeeper |
ZooKeeper client logging level |
INFO |
log4j.logger.kafka |
General broker logging level (output to server.log and stdout). See also |
INFO |
log4j.logger.org.apache.kafka |
General broker logging level (output to server.log and stdout). See also |
INFO |
log4j.logger.kafka.request.logger |
Change to |
WARN |
log4j.logger.kafka.controller |
Controller Kafka logging level |
TRACE |
log4j.logger.kafka.log.LogCleaner |
Kafka log cleaning level |
INFO |
log4j.logger.state.change.logger |
Status changes logging level |
INFO |
log4j.logger.kafka.authorizer.logger |
Access denials are logged at |
INFO |
| Parameter | Description | Default value |
|---|---|---|
log4j.logger.kafka.network.Processor |
Configures the processor network threads logging level |
TRACE |
log4j.logger.kafka.server.KafkaApis |
Configures the KafkaApis logging level (processing requests to the Kafka broker) |
TRACE |
log4j.logger.kafka.network.RequestChannel |
Configures the logging level for requests in a queue |
TRACE |
The custom log4j_properties file template is intended for specifying custom logging parameters.
Default value:
{% set kafka_broker_log4j_properties_configuration = services.kafka.BROKER.config.log4j_properties_configuration %}
log4j.rootLogger={{ kafka_broker_log4j_properties_configuration['log4j.rootLogger'] }}, stdout, kafkaAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.org.apache.zookeeper={{ kafka_broker_log4j_properties_configuration['log4j.logger.org.apache.zookeeper'] }}
log4j.logger.kafka={{ kafka_broker_log4j_properties_configuration['log4j.logger.kafka'] }}
log4j.logger.org.apache.kafka={{ kafka_broker_log4j_properties_configuration['log4j.logger.org.apache.kafka'] }}
log4j.logger.kafka.request.logger={{ kafka_broker_log4j_properties_configuration['log4j.logger.kafka.request.logger'] }}, requestAppender
log4j.additivity.kafka.request.logger=false
{% if services.kafka.BROKER.config.log4j_advanced_properties_configuration['log4j.logger.kafka.network.Processor'] is defined %}
{% set kafka_broker_log4j_advanced_properties_configuration = services.kafka.BROKER.config.log4j_advanced_properties_configuration %}
log4j.logger.kafka.network.Processor={{ kafka_broker_log4j_advanced_properties_configuration['log4j.logger.kafka.network.Processor'] }}, requestAppender
log4j.logger.kafka.server.KafkaApis={{ kafka_broker_log4j_advanced_properties_configuration['log4j.logger.kafka.server.KafkaApis'] }}, requestAppender
log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.network.RequestChannel$={{ kafka_broker_log4j_advanced_properties_configuration['log4j.logger.kafka.network.RequestChannel'] }}, requestAppender
{% else %}
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
{% endif %}
log4j.additivity.kafka.network.RequestChannel$=false
log4j.logger.kafka.controller={{ kafka_broker_log4j_properties_configuration['log4j.logger.kafka.controller'] }}, controllerAppender
log4j.additivity.kafka.controller=false
log4j.logger.kafka.log.LogCleaner={{ kafka_broker_log4j_properties_configuration['log4j.logger.kafka.log.LogCleaner'] }}, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false
log4j.logger.state.change.logger={{ kafka_broker_log4j_properties_configuration['log4j.logger.state.change.logger'] }}, stateChangeAppender
log4j.additivity.state.change.logger=false
log4j.logger.kafka.authorizer.logger={{ kafka_broker_log4j_properties_configuration['log4j.logger.kafka.authorizer.logger'] }}, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false
| Parameter | Description | Default value |
|---|---|---|
log4j.rootLogger |
Logging level |
WARN |
The custom tools_log4j_properties file template is intended for specifying custom logging parameters.
Default value:
{% set kafka_broker_tools_log4j_properties_configuration = services.kafka.BROKER.config.tools_log4j_properties_configuration %}
log4j.rootLogger={{ kafka_broker_tools_log4j_properties_configuration['log4j.rootLogger'] }}, stderr
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stderr.Target=System.err
JMX Monitoring must be enabled for Kafka monitoring to work.
| Parameter | Description | Default value |
|---|---|---|
jmx_port |
Port to which JMX metrics are sent |
9999 |
JMX Exporter Port |
Port for connecting to Prometheus JMX Exporter |
11201 |
Enables authentication for JMX in the service (used when access to the JMX port needs to be protected).
| Parameter | Description | Default value |
|---|---|---|
Username |
Username for authentication in JMX |
monitoring |
Password |
User password for authentication in JMX |
— |
Kafka Controller component configuration parameters:
| Parameter | Description | Default value |
|---|---|---|
listeners |
Port for connecting to Kafka Controller |
CONTROLLER://:9093 |
Metadata log.dirs |
Path to where cluster metadata is stored |
/kafka-meta |
| Parameter | Description | Default value |
|---|---|---|
LOG_DIR |
Directory for storing logs |
/var/log/kafka-controller |
PID_DIR |
Directory for storing Kafka Controller IDs |
/var/run/kafka-controller |
Add property |
Parameters and their values entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values for all user parameters that are not displayed in the interface, but are allowed in the configuration file kafka-controller-env.sh |
— |
Kafka Cruise Control component configuration parameters:
| Parameter | Description | Default value |
|---|---|---|
default.goals |
List of inter-broker goals that will be used by default if no |
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal |
goals |
Set of case-insensitive inter-broker goals supported by Cruise Control. Inter-broker goals facilitate in distributing the load across brokers |
com.linkedin.kafka.cruisecontrol.analyzer.goals.BrokerSetAwareGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal, com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal |
intra.broker.goals |
List of case-insensitive intra-broker goals in the order of priority. The high-priority goals will be executed first. The intra-broker goals are only relevant if intra-broker operation is supported (i.e. in Cruise Control versions above 2), otherwise this list should be empty |
com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskUsageDistributionGoal |
hard.goals |
List of case-insensitive, inter-broker hard goals. Hard goals will be enforced to execute if Cruise Control runs in non-kafka-assigner mode and the |
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal |
anomaly.detection.goals |
Goals that anomaly detector should detect if they are violated |
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal |
self.healing.enabled |
Whether to enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled |
false |
sample.store.topic.replication.factor |
Replication factor of Kafka Sample Store topics |
1 |
webserver.http.cors.enabled |
Enables CORS when set to |
false |
webserver.http.cors.origin |
Value for the |
http://localhost:8080/ |
webserver.http.cors.allowmethods |
Value for the |
OPTIONS,GET,POST |
webserver.http.cors.exposeheaders |
Value for the |
User-Task-ID |
webserver.http.port |
Cruise Control Webserver bind port |
9181 |
webserver.http.address |
Cruise Control Webserver bind IP address |
0.0.0.0 |
metric.sampler.class |
Class name of the Metric Sampler |
com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler |
sampling.allow.cpu.capacity.estimation |
Flag to indicate whether sampling process allows CPU capacity estimation of brokers used for CPU utilization estimation |
true |
metric.reporter.topic |
Exact topic name from which the sampler should be consuming the interested metrics from |
__CruiseControlMetrics |
sample.store.class |
Sample store class name. User may configure a sample store that persists the metric samples that have already been aggregated into Kafka Cruise Control. Later on the persisted samples can be reloaded from the sample store to Kafka Cruise Control |
com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore |
partition.metric.sample.store.topic |
Topic in which Cruise Control will store its processed metric samples as a backup. When Cruise Control is rebooted, it will load the metrics from this topic to populate the load monitor |
__KafkaCruiseControlPartitionMetricSamples |
broker.metric.sample.store.topic |
Topic in which Cruise Control will store its broker metric samples as a backup. When Cruise Control is rebooted, it will load the broker metric samples from this topic to train its cluster model |
__KafkaCruiseControlModelTrainingSamples |
num.sample.loading.threads |
The number of threads to load from the sample store topics |
8 |
metric.sampler.partition.assignor.class |
Class used to assign the partitions to the metric samplers |
com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor |
metric.sampling.interval.ms |
Interval of metric sampling |
120000 |
partition.metrics.window.ms |
Size of the window in milliseconds to aggregate the Kafka partition metrics. The window must be wider than the |
300000 |
num.partition.metrics.windows |
Maximum number of partition window the load monitor would keep. Each window covers a time window defined by |
5 |
min.samples.per.partition.metrics.window |
Minimum number of metric samples a valid partition window should have. If a partition does not have enough samples in a partition window, the topic of the partition will be removed from the window due to in sufficient data |
1 |
broker.metrics.window.ms |
Size of the window in milliseconds to aggregate the Kafka broker metrics. The window must be greater than the |
300000 |
num.broker.metrics.windows |
Maximum number of broker window the load monitor would keep. Each window covers a time window defined by |
20 |
min.samples.per.broker.metrics.window |
Minimum number of metrics that a valid broker window should have. If a broker does not have enough samples in a broker window, this broker will be removed from the window due to in sufficient data |
1 |
capacity.config.file |
Path to the configuration JSON file that provides the capacity of the brokers |
config/capacity.json |
min.valid.partition.ratio |
Minimum percentage of the total partitions required to be monitored in order to generate a valid load model. Since topics and partitions in the Kafka cluster change dynamically, the load monitor will exclude some of the topics that do not have sufficient metric samples |
0.95 |
cpu.balance.threshold |
Maximum allowed extent of unbalance for CPU utilization. For example, |
1.1 |
disk.balance.threshold |
Maximum allowed extent of unbalance for disk utilization. For example, |
1.1 |
network.inbound.balance.threshold |
Maximum allowed extent of unbalance for network inbound usage. For example, |
1.1 |
network.outbound.balance.threshold |
Maximum allowed extent of unbalance for network outbound usage. For example, |
1.1 |
replica.count.balance.threshold |
Maximum allowed extent of unbalance for replica distribution. For example, |
1.1 |
cpu.capacity.threshold |
Maximum percentage of the total |
0.7 |
disk.capacity.threshold |
Maximum percentage of the total |
0.8 |
network.inbound.capacity.threshold |
Maximum percentage of the total |
0.8 |
network.outbound.capacity.threshold |
Maximum percentage of the total |
0.8 |
cpu.low.utilization.threshold |
Threshold to define the utilization of CPU is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. Such a cluster is overprovisioned in terms of its CPU utilization. The threshold is in percentage |
0 |
disk.low.utilization.threshold |
Threshold to define the utilization of disk is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. Such a cluster is overprovisioned in terms of its disk utilization. The threshold is in percentage |
0 |
network.inbound.low.utilization.threshold |
Threshold to define the utilization of network inbound rate is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. Such a cluster is overprovisioned in terms of its network inbound rate. The threshold is in percentage |
0 |
network.outbound.low.utilization.threshold |
Threshold to define the utilization of network outbound rate is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. Such a cluster is overprovisioned in terms of its network outbound rate. The threshold is in percentage |
0 |
metric.anomaly.percentile.upper.threshold |
Upper percentile threshold for the metric anomaly detector to identify an increase in the metric values of a broker as a metric anomaly. The current metric value is compared against the historical value corresponding to given percentile in the metric history after the application of the upper margin |
90 |
metric.anomaly.percentile.lower.threshold |
Lower percentile threshold for the metric anomaly detector to identify a decrease in the metric values of a broker as a metric anomaly. The current metric value is compared against the historical value corresponding to given percentile in the metric history after the application of the lower margin |
10 |
proposal.expiration.ms |
Kafka Cruise Control will cache one of the best proposals among all the optimization proposal candidates it recently computed. This configuration defines when will the cached proposal will be invalidated and needs a recomputation. If |
60000 |
max.replicas.per.broker |
Maximum number of replicas allowed to reside on a broker. The analyzer will enforce a hard goal that the number of replicas on a broker cannot be higher than this config |
10000 |
num.proposal.precompute.threads |
Number of threads used to precompute the optimization proposal candidates. The more threads are used, the more memory and CPU resource will be used |
1 |
num.concurrent.partition.movements.per.broker |
Maximum number of partitions the executor will move to or out of a broker at the same time. For example, setting the value to |
10 |
max.num.cluster.partition.movements |
Maximum number of allowed partition movements in the cluster. This global limit cannot be exceeded regardless of the per-broker replica movement concurrency. For example, setting the value to |
1250 |
num.concurrent.intra.broker.partition.movements |
Maximum number of partitions the executor will move across disks within a broker at the same time. For example, setting the value to |
2 |
num.concurrent.leader.movements |
Maximum number of leader movements the executor will take as one batch. This is mainly because the znode has a 1 MB size upper limit. And it will also reduce the controller load |
1000 |
execution.progress.check.interval.ms |
The interval in milliseconds during which the executor will check the progress of execution |
10000 |
anomaly.notifier.class |
Notifier class to trigger an alert when an anomaly is violated |
com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier |
metric.anomaly.finder.class |
List of metric anomaly finder classes to find the current state to identify metric anomalies |
com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomalyFinder |
metric.anomaly.analyzer.metrics |
Metric IDs that the metric anomaly detector should detect if they are violated |
BROKER_PRODUCE_LOCAL_TIME_MS_50TH, BROKER_PRODUCE_LOCAL_TIME_MS_999TH, BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH, BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH, BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH, BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH, BROKER_LOG_FLUSH_TIME_MS_50TH, BROKER_LOG_FLUSH_TIME_MS_999TH |
self.healing.exclude.recently.demoted.brokers |
Set to |
true |
self.healing.exclude.recently.removed.brokers |
Set to |
true |
failed.brokers.zk.path |
Path in ZooKeeper for storing the failed broker list. This is to persist the broker failure time in case Cruise Control failed and restarted when some brokers are down |
/CruiseControlBrokerList |
topic.config.provider.class |
Provider class that reports the active configuration of topics |
com.linkedin.kafka.cruisecontrol.config.KafkaAdminTopicConfigProvider |
cluster.configs.file |
Path to the cluster configuration file |
config/clusterConfigs.json |
completed.kafka.monitor.user.task.retention.time.ms |
Maximum time in milliseconds to store the response and access details of a completed Kafka monitoring user task. If this setting is missing, the |
86400000 |
completed.cruise.control.monitor.user.task.retention.time.ms |
Maximum time in milliseconds to store the response and access details of a completed Cruise Control monitoring user task. If this setting is missing, the |
86400000 |
completed.kafka.admin.user.task.retention.time.ms |
Maximum time in milliseconds to store the response and access details of a completed Kafka administration user task. If this setting is missing, the |
604800000 |
completed.cruise.control.admin.user.task.retention.time.ms |
Maximum time in milliseconds to store the response and access details of a completed Cruise Control administration user task. If this setting is missing, the |
604800000 |
completed.user.task.retention.time.ms |
Maximum time in milliseconds to store the response and access details of a completed user task |
86400000 |
demotion.history.retention.time.ms |
Maximum time in milliseconds to retain the demotion history of brokers |
1209600000 |
removal.history.retention.time.ms |
Maximum time in milliseconds to retain the removal history of brokers |
1209600000 |
max.cached.completed.kafka.monitor.user.tasks |
Maximum number of completed Kafka monitoring user tasks for which the response and access details will be cached. If this setting is missing, the |
20 |
max.cached.completed.cruise.control.monitor.user.tasks |
Maximum number of completed Cruise Control monitoring user tasks for which the response and access details will be cached. If this setting is missing, the |
20 |
max.cached.completed.kafka.admin.user.tasks |
Maximum number of completed Kafka administration user tasks for which the response and access details will be cached. If this setting is missing, the |
30 |
max.cached.completed.cruise.control.admin.user.tasks |
Maximum number of completed Cruise Control administration user tasks for which the response and access details will be cached. If this setting is missing, the |
30 |
max.cached.completed.user.tasks |
Maximum number of completed user tasks for which the response and access details will be cached |
25 |
max.active.user.tasks |
Maximum number of user tasks for concurrently running in async endpoints across all users |
5 |
webserver.api.urlprefix |
REST API default URL prefix |
/kafkacruisecontrol/* |
webserver.ui.diskpath |
Location where the Cruise Control frontend is deployed |
./cruise-control-ui/dist/ |
webserver.ui.urlprefix |
URL path where UI is served from |
/* |
webserver.request.maxBlockTimeMs |
Time after which request is converted to async |
10000 |
webserver.session.maxExpiryTimeMs |
Default session expiry period |
60000 |
webserver.session.path |
Default session path (for cookies) |
/ |
webserver.accesslog.enabled |
Set to |
true |
two.step.verification.enabled |
Set to |
false |
two.step.purgatory.retention.time.ms |
Maximum time in milliseconds to retain the requests in two-step (verification) purgatory |
1209600000 |
two.step.purgatory.max.requests |
Maximum number of requests in two-step (verification) purgatory |
25 |
vertx.enabled |
Enables the use of verticls |
false |
spnego.principal |
Kerberos authentication principal for Spnego |
HTTP/{{ groups['kafka.CRUISECONTROL'][0] | d(omit) }}@{{ cluster.config.kerberos_client.realm }} |
spnego.keytab.file |
Keytab file for Spnego |
{{ cluster.config.kerberos_client.keytab_dir }}/HTTP.service.keytab |
additional |
Parameters and their values entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values for all user parameters that are not displayed in the interface, but are allowed in the configuration file cruisecontrol.properties |
— |
User template for configuring the JSON file that provides information about broker capacities.
Default value:
{
"brokerCapacities":[
{
"brokerId": "-1",
"capacity": {
"DISK": "20000",
"CPU": "100",
"NW_IN": "50000",
"NW_OUT": "50000"
},
"doc": "This is the default capacity. Capacity unit used for disk is in MB, CPU is in percentage, network throughput is in KB."
}
]
}
| Parameter | Description | Default value |
|---|---|---|
property.filename |
Path to the Cruise Control logs |
/var/log/cruise-control |
rootLogger.level |
Root logger logging level |
INFO |
logger.cruisecontrol.level |
Cruise Control logging level |
INFO |
logger.detector.level |
Anomaly detector logging level |
INFO |
logger.operationLogger.level |
Execution logging level |
INFO |
logger.CruiseControlPublicAccessLogger.level |
Public access Cruise Control logging level |
INFO |
The template for the log4j_properties user file in Cruise Control is intended to specify custom logging parameters.
Default value:
#jinja2: lstrip_blocks:"True", trim_blocks:"True"
# {{ ansible_managed }}
{% set kafka_cruisecontrol_log4j = services.kafka.CRUISECONTROL.config.log4j_properties_configuration %}
rootLogger.level={{ kafka_cruisecontrol_log4j['rootLogger.level'] }}
property.filename={{ kafka_cruisecontrol_log4j['property.filename'] }}
appenders=console, kafkaCruiseControlAppender, operationAppender, requestAppender
appender.console.type=Console
appender.console.name=STDOUT
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=[%d] %p %m (%c)%n
appender.kafkaCruiseControlAppender.type=RollingFile
appender.kafkaCruiseControlAppender.name=kafkaCruiseControlFile
appender.kafkaCruiseControlAppender.fileName=${filename}/kafkacruisecontrol.log
appender.kafkaCruiseControlAppender.filePattern=${filename}/kafkacruisecontrol.log.%d{yyyy-MM-dd-HH}
appender.kafkaCruiseControlAppender.layout.type=PatternLayout
appender.kafkaCruiseControlAppender.layout.pattern=[%d] %p %m (%c)%n
appender.kafkaCruiseControlAppender.policies.type=Policies
appender.kafkaCruiseControlAppender.policies.time.type=TimeBasedTriggeringPolicy
appender.kafkaCruiseControlAppender.policies.time.interval=1
appender.operationAppender.type=RollingFile
appender.operationAppender.name=operationFile
appender.operationAppender.fileName=${filename}/kafkacruisecontrol-operation.log
appender.operationAppender.filePattern=${filename}/kafkacruisecontrol-operation.log.%d{yyyy-MM-dd}
appender.operationAppender.layout.type=PatternLayout
appender.operationAppender.layout.pattern=[%d] %p [%c] %m %n
appender.operationAppender.policies.type=Policies
appender.operationAppender.policies.time.type=TimeBasedTriggeringPolicy
appender.operationAppender.policies.time.interval=1
appender.requestAppender.type=RollingFile
appender.requestAppender.name=requestFile
appender.requestAppender.fileName=${filename}/kafkacruisecontrol-request.log
appender.requestAppender.filePattern=${filename}/kafkacruisecontrol-request.log.%d{yyyy-MM-dd-HH}
appender.requestAppender.layout.type=PatternLayout
appender.requestAppender.layout.pattern=[%d] %p %m (%c)%n
appender.requestAppender.policies.type=Policies
appender.requestAppender.policies.time.type=TimeBasedTriggeringPolicy
appender.requestAppender.policies.time.interval=1
# Loggers
logger.cruisecontrol.name=com.linkedin.kafka.cruisecontrol
logger.cruisecontrol.level={{ kafka_cruisecontrol_log4j['logger.cruisecontrol.level'] }}
logger.cruisecontrol.appenderRef.kafkaCruiseControlAppender.ref=kafkaCruiseControlFile
logger.detector.name=com.linkedin.kafka.cruisecontrol.detector
logger.detector.level={{ kafka_cruisecontrol_log4j['logger.detector.level'] }}
logger.detector.appenderRef.kafkaCruiseControlAppender.ref=kafkaCruiseControlFile
logger.operationLogger.name=operationLogger
logger.operationLogger.level={{ kafka_cruisecontrol_log4j['logger.operationLogger.level'] }}
logger.operationLogger.appenderRef.operationAppender.ref=operationFile
logger.CruiseControlPublicAccessLogger.name=CruiseControlPublicAccessLogger
logger.CruiseControlPublicAccessLogger.level={{ kafka_cruisecontrol_log4j['logger.CruiseControlPublicAccessLogger.level'] }}
logger.CruiseControlPublicAccessLogger.appenderRef.requestAppender.ref=requestFile
rootLogger.appenderRefs=console, kafkaCruiseControlAppender
# Write to Syslog
# rootLogger.appenderRef.console.ref=STDOUT
rootLogger.appenderRef.kafkaCruiseControlAppender.ref=kafkaCruiseControlFile
| Parameter | Description | Default value |
|---|---|---|
ADSControl_user |
ADS Control service user name |
admin |
Kerberos API configuration menu for Cruise Control.
| Parameter | Description | Default value |
|---|---|---|
HTTP |
Principal for HTTP connection |
ADMIN |
adscc |
Principal of the ADS Control service user |
ADMIN |
Add property |
Allows you to add a short principal name (without specifying |
— |