ksqlDB configuration parameters

To configure the service, use the following configuration parameters in ADCM.

NOTE
  • Some of the parameters become visible in the ADCM UI after the Advanced flag has been set.

  • The parameters that are set in the Custom group will overwrite the existing parameters even if they are read-only.

Main

 

Parameter Description Default value

Listener port

ksqlDB server listener port. Specified as listeners in ksql-server.properties file

8088

ksql-env.sh

 

Parameter Description Default value

JMX_PORT

Port on which ksqlDB-server sends JMX metrics

10099

LOG_DIR

Directory for storing logs

/var/log/ksql

KSQL_HEAP_OPTS

Heap size allocated to the ksqlDB-server process

-Xmx3g

KSQL_JVM_PERFORMANCE_OPTS

JVM options in terms of PERFORMANCE options

-server

-XX:+UseConcMarkSweepGC

-XX:+CMSClassUnloadingEnabled

-XX:+CMSScavengeBeforeRemark

-XX:+ExplicitGCInvokesConcurrent

-XX:NewRatio=1 -Djava.awt.headless=true

CLASSPATH

Setting for the Java Virtual Machine or Java compiler that specifies the location of custom classes and packages

/usr/lib/ksql/libs/*

KSQL_CLASSPATH

Path to Java deployment of ksqlDB Server and related Java classes

${CLASSPATH}

KSQL_OPTS

Environment variable that specifies the configuration settings for the ksqlDB server. Properties set with KSQL_OPTS take precedence over those specified in the ksqlDB configuration file

-Djava.security.auth.login.config=/etc/ksqldb/jaas_config.conf

Basic Auth properties

 

Parameter Description Default value

authentication.method

Authentication method

BASIC

authentication.roles

Defines a comma-separated list of user roles. To log in to the ksqlDB server, the authenticated user must belong to at least one of these roles. For more information, see Basic authentication

admin

authentication.realm

Corresponds to a section in the jaas_config.file that defines how the server authenticates users and must be passed as a parameter to the JVM during server startup

KsqlServer-Props

Path to password.properties

Path to password.properties

/etc/ksqldb/password.properties

server.properties

 

Parameter Description Default value

application.id

Application ID

ksql-server

ksql.internal.topic.replicas

Replication factor for the ksqlDB Servers internal topics

1

ksql.streams.state.dir

Storage directory for stateful operation

/usr/lib/ksql/state

ksql.streams.replication.factor

Underlying internal topics of Kafka Streams

1

ksql.streams.topic.min.insync.replicas

Minimum number of brokers that must have data written to synchronized replicas

2

ksql.streams.num.standby.replicas

Number of replicas for stateful operations

1

ksql.streams.producer.acks

Number of brokers that need to acknowledge receipt of a message before it is considered a successful write

all

ksql.streams.producer.delivery.timeout.ms

Batch expiry (in ms)

2147483647

ksql.streams.producer.max.block.ms

Maximum allowable time for the producer to block (in ms)

9223372036854775000

ssl.endpoint.identification.algorithm

Endpoint identification algorithm for server validation

 — 

ssl.keystore.location

Used for HTTPS. Location of the keystore file to use for SSL

 — 

ssl.keystore.type

File format of the keystore file

 — 

ssl.keystore.password

Used for HTTPS. The store password for the keystore file

 — 

ssl.key.password

Used for HTTPS. The password of the private key in the keystore file

 — 

ssl.truststore.location

Location of the truststore file

 — 

ssl.truststore.type

File format of the truststore file

 — 

ssl.truststore.password

Used for HTTPS. The store password for the truststore file

 — 

ksql.schema.registry.ssl.keystore.location

Location of the SSL keystore file

ksql.schema.registry.ssl.keystore.password

Password to access the keystore

 — 

ksql.schema.registry.ssl.key.password

Password of the key contained in the keystore

 — 

ksql.schema.registry.ssl.keystore.type

File format of the keystore

 — 

ksql.schema.registry.ssl.truststore.location

Location of the SSL truststore file

 — 

ksql.schema.registry.ssl.truststore.password

Password to access the truststore

 — 

ksql.schema.registry.ssl.truststore.type

File format of the truststore

 — 

Add key, value

Parameters and their values ​​entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values ​​for all user parameters that are not displayed in the interface, but are allowed in the configuration file server.properties

 — 

connect.properties

 

Parameter Description Default value

group.id

Group ID is a unique identifier for the set of workers

ksql-connect-cluster

key.converter

Converters specify the format of data in Kafka and how to translate it into Connect data

org.apache.kafka.connect.storage.StringConverter

key.converter.schema.registry.url

KSQL key data location

http://localhost:8081

value.converter

Converter class for value Connect data

io.confluent.connect.avro.AvroConverter

value.converter.schema.registry.url

Location of ksqlDB data values

http://localhost:8081

config.storage.topic

Name of the internal topic for storing configurations

ksql-connect-configs

offset.storage.topic

Topic to store statistics connect offsets

ksql-connect-offsets

status.storage.topic

Topic to store statistics connect status

ksql-connect-statuses

config.storage.replication.factor

Replication factor for config.storage.topic

1

offset.storage.replication.factor

Replication factor for offset.storage.topic

1

status.storage.replication.factor

Replication factor for status.storage.topic

1

internal.key.converter

Converter class for internal values with connect records

org.apache.kafka.connect.json.JsonConverter

internal.value.converter

Converter class for internal values with connect records

org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable

Schema configuration for internal statistics connect data

false

Add key, value

Parameters and their values ​​entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values ​​for all user parameters that are not displayed in the interface, but are allowed in the configuration file connect.properties

 — 

datagen.properties

 

Parameter Description Default value

interceptor.classes

If you are not using any interceptors currently, you will need to add a new item to the Java Properties object that you use to create a new Producer

io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor

Add key, value

Parameters and their values ​​entered in this field override the parameters specified in the ADCM user interface. This field also allows you to set values ​​for all user parameters that are not displayed in the interface, but are allowed in the configuration file datagen.properties

 — 

JAAS template

 

    The user file template jaas.conf is intended for specifying user data for connecting clients of other services to the current service (paths to keytab files, the useTicketCache parameter, and others). For more information, see Configure a custom jaas.conf.

    Default value:

{% if cluster.config.basic_auth_default_config is not none %}
{{ services.ksql.config.basic_auth_properties_content['authentication.realm'] }} {
  org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
  file="{{ ksql_home_path }}/config/password.properties"
  debug="false";
};
{% endif %}
{% if cluster.config.kerberos_client and cluster.config.kerberos_client.enable_kerberos %}
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    useTicketCache=false
    serviceName="kafka"
    keyTab="{{ cluster.config.kerberos_client.keytab_dir }}/ksql-server.service.keytab"
    principal="ksql-server/{{ ansible_fqdn }}@{{ cluster.config.kerberos_client.realm }}";
};
{%- elif cluster.config.sasl_plain_auth_default_config is not none %}
    {%- set credential = cluster.config.sasl_plain_auth_default_config.sasl_plain_users_data %}
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="ksql-server"
  password="{{ credential['ksql-server'] }}";
};
{% endif %}

 

ksqlDB Server component configuration parameters:

log4j properties configuration

 

Parameter Description Default value

log4j.rootLogger

Logging level

INFO

log4j.logger.org.reflections

Setting the Reflections warning level

ERROR

log4j.logger.org.apache.kafka.streams

Setting the logging level of Kafka Streams

INFO

log4j.logger.kafka

Change to adjust the general broker logging level (output to server.log and stdout). See also log4j.logger.org.apache.kafka

WARN

log4j.logger.org.apache.zookeeper

Change to adjust ZooKeeper client logging

WARN

log4j.logger.org.apache.kafka

Change to adjust the general broker logging level (output to server.log and stdout). See also log4j.logger.kafka

WARN

log4j.logger.org.I0Itec.zkclient

Change to adjust ZooKeeper client logging level

WARN

log4j.logger.io.confluent.ksql.rest.server.resources.KsqlResource

 

Parameter Description Default value

log4j.logger.io.confluent.ksql.rest.server.resources.KsqlResource

Stop ksqlDB from logging out each request it receives

WARN

log4j.logger.io.confluent.ksql.util.KsqlConfig

 

Parameter Description Default value

log4j.logger.io.confluent.ksql.util.KsqlConfig

Enable to avoid the logs being spammed with KsqlConfig values

WARN

log4j_properties_template

 

    Template for customizing the log4j logging library.

    Default value:

# Maintained by ADCM
{% set ksql_server_log4j_properties_configuration = services.ksql.SERVER.config.log4j_properties_configuration %}

log4j.rootLogger={{ ksql_server_log4j_properties_configuration['log4j.rootLogger'] }}, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.appender.streams=org.apache.log4j.ConsoleAppender
log4j.appender.streams.layout=org.apache.log4j.PatternLayout
log4j.appender.streams.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.org.reflections={{ ksql_server_log4j_properties_configuration['log4j.logger.org.reflections'] }}, stdout

{% if services.ksql.SERVER.config.log4j_logger_io_confluent_ksql_rest_server_resources_KsqlResource['log4j.logger.io.confluent.ksql.rest.server.resources.KsqlResource'] is defined %}
log4j.logger.io.confluent.ksql.rest.server.resources.KsqlResource={{ services.ksql.SERVER.config.log4j_logger_io_confluent_ksql_rest_server_resources_KsqlResource['log4j.logger.io.confluent.ksql.rest.server.resources.KsqlResource'] }}
{% endif %}
{% if services.ksql.SERVER.config.log4j_logger_io_confluent_ksql_util_KsqlConfig['log4j.logger.io.confluent.ksql.util.KsqlConfig'] is defined %}
log4j.logger.io.confluent.ksql.util.KsqlConfig={{ services.ksql.SERVER.config.log4j_logger_io_confluent_ksql_util_KsqlConfig['log4j.logger.io.confluent.ksql.util.KsqlConfig'] }}
{% endif %}

log4j.logger.org.apache.kafka.streams={{ ksql_server_log4j_properties_configuration['log4j.logger.org.apache.kafka.streams'] }}, streams
log4j.additivity.org.apache.kafka.streams=false

log4j.logger.kafka={{ ksql_server_log4j_properties_configuration['log4j.logger.kafka'] }}, stdout
log4j.logger.org.apache.zookeeper={{ ksql_server_log4j_properties_configuration['log4j.logger.org.apache.zookeeper'] }}, stdout
log4j.logger.org.apache.kafka={{ ksql_server_log4j_properties_configuration['log4j.logger.org.apache.kafka'] }}, stdout
log4j.logger.org.I0Itec.zkclient={{ ksql_server_log4j_properties_configuration['log4j.logger.org.I0Itec.zkclient'] }}, stdout
log4j_rolling_properties_template

 

    Template for customizing the log4j_rolling_properties logging file.

    Default value:

# Maintained by ADCM
{% set broker_port = (services.kafka.config.Main.listeners | regex_replace('.*:(\\d+)$', '\\1')) %}
{% set broker_hosts_with_port = services.kafka.config.bootstrap_servers_without_protocol %}
log4j.rootLogger=INFO, main

# appenders
log4j.appender.main=org.apache.log4j.RollingFileAppender
log4j.appender.main.File=${ksql.log.dir}/ksql.log
log4j.appender.main.layout=org.apache.log4j.PatternLayout
log4j.appender.main.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.main.MaxFileSize=10MB
log4j.appender.main.MaxBackupIndex=5
log4j.appender.main.append=true

log4j.appender.streams=org.apache.log4j.RollingFileAppender
log4j.appender.streams.File=${ksql.log.dir}/ksql-streams.log
log4j.appender.streams.layout=org.apache.log4j.PatternLayout
log4j.appender.streams.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.appender.kafka=org.apache.log4j.RollingFileAppender
log4j.appender.kafka.File=${ksql.log.dir}/ksql-kafka.log
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.kafka.MaxFileSize=10MB
log4j.appender.kafka.MaxBackupIndex=5
log4j.appender.kafka.append=true

log4j.appender.kafka_appender=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka_appender.layout=io.confluent.common.logging.log4j.StructuredJsonLayout
log4j.appender.kafka_appender.BrokerList=
{%- for host_with_port in broker_hosts_with_port.split(',') -%}
    {% if loop.index > 1 %},{% endif -%}
    {{ ('ssl' in cluster.multi_state) | ternary('https', 'http') }}://{{ host_with_port -}}
{% endfor %}

log4j.appender.kafka_appender.Topic=default_ksql_processing_log
log4j.appender.kafka_appender.SyncSend=true
log4j.appender.kafka_appender.IgnoreExceptions=false


{% if cluster.edition == 'enterprise' %}
{% set sasl_protocol = services.kafka.config['listeners_option']['sasl_protocol'] | d('none') %}
{% set ssl_enable = services.kafka.config['listeners_option']['ssl_enable'] | d(False) %}
log4j.appender.kafka_appender.SecurityProtocol={{ sasl_protocol | kafka_protocol(ssl_enable) }}
log4j.appender.kafka_appender.SaslMechanism={{ sasl_protocol | normalize_sasl_protocol }}

{% if sasl_protocol | normalize_sasl_protocol == 'PLAIN' %}
log4j.appender.kafka_appender.clientJaasConf=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username=ksql-server \
    password="{{ cluster.config.sasl_plain_auth_default_config.sasl_plain_users_data['ksql-server'] }}";
{% endif %}

{% if sasl_protocol | normalize_sasl_protocol == 'GSSAPI' %}
log4j.appender.kafka_appender.SaslKerberosServiceName=kafka
log4j.appender.kafka_appender.clientJaasConf=com.sun.security.auth.module.Krb5LoginModule required \
    useKeyTab=true \
    storeKey=true \
    keyTab="{{ cluster.config.kerberos_client.keytab_dir }}/ksql-server.service.keytab" \
    principal="ksql-server/{{ ansible_fqdn }}@{{ cluster.config.kerberos_client.realm }}" \
    serviceName="kafka";
{% endif %}

{% if ssl_enable %}
log4j.appender.kafka_appender.SslTruststoreLocation={{ services.kafka.config.server_properties_content['ssl.truststore.location'] }}
log4j.appender.kafka_appender.SslTruststorePassword={{ services.kafka.config.server_properties_content['ssl.truststore.password'] }}
{% endif %}
{% endif %}
# loggers

log4j.logger.org.reflections=ERROR, main

# Uncomment the following line to stop ksqlDB from logging out each request it receives:
#log4j.logger.io.confluent.ksql.rest.server.resources.KsqlResource=WARN

# And this one to avoid the logs being spammed with KsqlConfig values.
# Though these can be useful for debugging / investigations.
#log4j.logger.io.confluent.ksql.util.KsqlConfig=WARN

## ksqlDB Processing logs:
log4j.logger.processing=WARN, kafka_appender
log4j.additivity.processing=false

## Kafka Streams logs:
log4j.logger.org.apache.kafka.streams=INFO, streams
log4j.additivity.org.apache.kafka.streams=false

## Kafka Clients logs:
log4j.logger.org.apache.kafka.clients=INFO, clients
log4j.additivity.org.apache.kafka.clients=false

## Kafka Connect logs:
log4j.logger.org.apache.kafka.connect=INFO, connect
log4j.additivity.org.apache.kafka.connect=false

## Other Kafka logs:
log4j.logger.kafka=WARN, kafka
log4j.additivity.kafka=false

log4j.logger.org.apache.zookeeper=WARN, kafka
log4j.additivity.org.apache.zookeeper=false

log4j.logger.org.apache.kafka=WARN, kafka
log4j.additivity.org.apache.kafka=false

log4j.logger.org.I0Itec.zkclient=WARN, kafka
log4j.additivity.org.I0Itec.zkclient=false

# To achieve high throughput on pull queries, avoid logging every request from Jetty
log4j.logger.io.confluent.rest-utils.requests=WARN

ksql_migrations_log4j_properties_template
# Root logger -- disable all non-migrations-tool logging
log4j.rootLogger=OFF

# Migrations tool logger
log4j.logger.io.confluent.ksql.tools.migrations=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
log4j_file_properties_template

 

    Template for customizing the log4j_file_properties logging file.

    Default value:

#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License.  You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations under the License.
#

# For the general syntax of property based configuration files see
# the documentation of org.apache.log4j.PropertyConfigurator.

log4j.rootLogger=WARN, default.file

log4j.appender.default.file=io.confluent.ksql.util.TimestampLogFileAppender
log4j.appender.default.file.ImmediateFlush=true
log4j.appender.default.file.append=false

log4j.appender.default.file.file=${ksql.log.dir}/ksql-cli/cli-%timestamp.log
log4j.appender.default.file.layout=org.apache.log4j.PatternLayout
log4j.appender.default.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j_silent_properties_template

 

    Template for customizing the log4j_silent_properties logging file.

    Default value:

#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License.  You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations under the License.
#

log4j.rootLogger=OFF
Found a mistake? Seleсt text and press Ctrl+Enter to report it