Примеры использования ADB to Kafka Connector
В статье показаны примеры передачи данных из таблицы ADB в топик Kafka (в составе ADS) через ADB to Kafka Connector. При выполнении примера соблюдены следующие условия:
-
Кластер ADS установлен согласно руководству Online-установка на следующих хостах:
bds-ads1,bds-ads2,bds-ads3. -
Кластер ADB установлен согласно руководству Online-установка на следующих хостах:
bds-mdw,bds-sdw1,bds-sdw2. -
В кластере ADB добавлены и установлены сервисы PXF и ADB to Kafka.
-
База данных с именем
adbсуществует в ADB. -
Все ноды кластера ADB имеют доступ к порту
9092на всех нодах кластера ADS, через которые планируется производить загрузку данных.
|
ПРИМЕЧАНИЕ
Поскольку ADB to Kafka Connector использует для сериализации данных формат AVRO OCF, выполнить десериализацию при помощи kafka-avro-console-consumer невозможно. Во всех примерах, приведенных ниже, для чтения пришедших со стороны ADB данных используется скрипт kafka-console-consumer.sh, возвращающий схему данных и сами данные в HEX-представлении. Десериализацию можно выполнить с помощью коннектора Kafka to ADB (см. Примеры использования Kafka to ADB Connector). |
Протокол PLAINTEXT (по умолчанию)
Без использования опции SERVER
-
Подключитесь к любому хосту кластера ADS, на котором установлен сервис Kafka. Перейдите в корневую директорию и создайте тестовый топик, используя следующую команду:
$ bin/kafka-topics.sh --create --topic topic_adb_to_kafka --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092Результат:
Created topic topic_adb_to_kafka.
РЕКОМЕНДАЦИЯ-
Дополнительную информацию о создании и чтении топиков в Kafka можно получить в статье документации ADS Начало работы c Kafka.
-
Создание топика является необязательным шагом. Если на шаге 3 будет указан несуществующий топик, он будет создан автоматически на стороне ADS при условии, что значение опции TOPIC_AUTO_CREATE_FLAG не изменено на
false.
-
-
Подключитесь к базе данных
adbна мастер-хосте кластера ADB под пользователемgpadmin(например, черезpsql):$ sudo su - gpadmin$ psql adb -
Создайте внешнюю таблицу со следующей структурой в ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export'); -
Добавьте тестовые данные во внешнюю таблицу:
INSERT INTO ext_adb_to_kafka VALUES (1,'test1'), (2,'test2'), (3,'test3'), (4,'test4'), (5,'test5'); -
Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и запустите скрипт bin/kafka-console-consumer.sh:
$ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=trueВ выводе команды присутствует информация о 5 прочитанных сообщениях (с номером смещения в очереди
Offset):Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E\x02\x12\x02\x02\x02\x0Atest1\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E Offset:1 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4\x02\x12\x02\x04\x02\x0Atest2\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4 Offset:2 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr\x02\x12\x02\x0A\x02\x0Atest5H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr Offset:3 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13\x02\x12\x02\x06\x02\x0Atest3\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13 Offset:4 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809\x02\x12\x02\x08\x02\x0Atest4\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809 ^CProcessed a total of 5 messages
С использованием опции SERVER
-
Выполните шаги 1—6, указанные в разделе Использование опции SERVER статьи Настройка ADB to Kafka Connector. В качестве имени сервера
<server_name>используйтеkafka_test. -
На стороне ADB создайте внешнюю таблицу
ext_adb_to_kafka2. В ней, в отличие от таблицы из предыдущего примера, необходимо заполнить опциюSERVER:CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka2 (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka?PROFILE=kafka&SERVER=kafka_test' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export'); -
Добавьте тестовые данные во внешнюю таблицу
ext_adb_to_kafka2:INSERT INTO ext_adb_to_kafka2 VALUES (1,'test1'); -
На стороне ADS-кластера убедитесь в поступлении нового сообщения в топик
topic_adb_to_kafka:$ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=trueРезультат содержит 6 сообщений:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E\x02\x12\x02\x02\x02\x0Atest1\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E Offset:1 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4\x02\x12\x02\x04\x02\x0Atest2\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4 Offset:2 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr\x02\x12\x02\x0A\x02\x0Atest5H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr Offset:3 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13\x02\x12\x02\x06\x02\x0Atest3\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13 Offset:4 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809\x02\x12\x02\x08\x02\x0Atest4\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809 Offset:5 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00%\xA7\xB4\x18 \xE3\x8C\xCDx\xB4\xEE\xE4'Cjs\x02\x12\x02\x02\x02\x0Atest1%\xA7\xB4\x18 \xE3\x8C\xCDx\xB4\xEE\xE4'Cjs ^CProcessed a total of 6 messages
Протокол SASL_PLAINTEXT
С использованием механизма PLAIN
-
Подключитесь к любому хосту кластера ADS, на котором установлен сервис Kafka, и выполните настройку аутентификации по протоколу SASL_PLAINTEXT для кластера ADS в соответствии со статьей SASL PLAINTEXT.
-
Убедитесь в корректной настройке аутентификации для сервиса Kafka, выполнив шаги из раздела Проверка включения аутентификации SASL PLAINTEXT.
-
Создайте пользователя с именем
adb-to-kafkaи паролем123на стороне ADS, следуя шагам раздела Создание пользователя. -
В соответствии с инструкциями раздела Аутентификация пользователя в среде Kafka обновите содержимое файла /etc/kafka/conf/client.properties следующим образом:
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN # Uncomment and set necessary username/password sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="adb-to-kafka" \ password="123"; -
Подключитесь к базе данных
adbна мастер-хосте кластера ADB под пользователемgpadmin(например, черезpsql):$ sudo su - gpadmin$ psql adb -
Создайте внешнюю таблицу со следующей структурой в ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_sasl (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka_sasl?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_PLAINTEXT&SASL_MECHANISM=PLAIN&SASL_USER=adb-to-kafka&SASL_USER_PASSWORD=123' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export'); -
Добавьте тестовые данные во внешнюю таблицу
ext_adb_to_kafka_sasl:INSERT INTO ext_adb_to_kafka_sasl VALUES (1,'test1'); -
Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и запустите скрипт bin/kafka-console-consumer.sh.
ПРИМЕЧАНИЕВ отличие от представленных выше примеров, при использовании протокола SASL_PLAINTEXT требуется дополнительно заполнить опцию
--consumer.config. Укажите в ней путь к файлу client.properties, описанному выше на шаге 4.$ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka_sasl --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /etc/kafka/conf/client.propertiesВ выводе команды успешно отображается одно прочитанное сообщение:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00c\xA9\x9A\xC6ZoI\xA2br\xCD\xFBi=@\x81\x02\x12\x02\x02\x02\x0Atest1c\xA9\x9A\xC6ZoI\xA2br\xCD\xFBi=@\x81 ^CProcessed a total of 1 messages
С использованием механизма GSSAPI (Kerberos)
|
ВНИМАНИЕ
Если ранее была выполнена настройка аутентификации по протоколу SASL_PLAINTEXT для кластера ADS в соответствии со статьей SASL PLAINTEXT (например, при запуске предыдущего примера), необходимо предварительно отключить ее перед керберизацией ADS. Для этого переведите в неактивное состояние переключатель SASL_PLAINTEXT Authentication на конфигурационной странице кластера ADS и сохраните изменения, нажав на кнопку Save. |
-
Выполните настройку MIT Kerberos KDC на отдельном хосте (в приведенном примере
bds-mit-kerberos.ru-central1.internal) и проведите керберизацию кластера ADS, следуя шагам руководства MIT Kerberos. В качестве имени realm используйтеADS-KAFKA.LOCAL. -
Убедитесь в корректной настройке аутентификации для сервиса Kafka, выполнив шаги из раздела Проверка установленного Kerberos SASL.
-
На хосте, где развернут MIT Kerberos KDC, создайте новый принципал
adb-to-kafka, который будет использоваться коннектором ADB to Kafka. В качестве пароля укажите123:$ sudo kadmin.local -q "add_principal -pw 123 adb-to-kafka@ADS-KAFKA.LOCAL"Результат:
Principal "adb-to-kafka@ADS-KAFKA.LOCAL" created.
-
Следуя инструкциям раздела Создание файла конфигурации .properties для пользователя, создайте файл /tmp/client.properties на стороне ADS.
-
Выполните шаги, описанные в разделе Создание JAAS-файла для пользователя.
-
Выполните установку необходимых пакетов на мастере и сегмент-хостах кластера ADB:
$ sudo yum install -y krb5-libs krb5-server krb5-workstation openldap-clients cyrus-sasl-gssapi -
На мастере и каждом сегмент-хосте кластера ADB отредактируйте содержимое файла /etc/krb5.conf аналогично тому, как это сделано при настройке MIT Kerberos KDC:
$ sudo vi /etc/krb5.confСодержимое файла:
[logging] default = FILE:/var/log/krb5libs.log kdc = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log [libdefaults] default_realm = ADS-KAFKA.LOCAL kdc_timesync = 1 ticket_lifetime = 24h [realms] ADS-KAFKA.LOCAL = { admin_server = bds-mit-kerberos.ru-central1.internal kdc = bds-mit-kerberos.ru-central1.internal } -
На мастере и каждом сегмент-хосте кластера ADB отредактируйте содержимое файла /var/kerberos/krb5kdc/kdc.conf аналогично тому, как это сделано при настройке MIT Kerberos KDC:
$ sudo vi /var/kerberos/krb5kdc/kdc.confСодержимое файла:
[kdcdefaults] kdc_ports = 88 kdc_tcp_ports = 88 default_realm = ADS-KAFKA.LOCAL [realms] ADS-KAFKA.LOCAL = { #master_key_type = aes256-cts acl_file = /var/kerberos/krb5kdc/kadm5.acl dict_file = /usr/share/dict/words admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal } -
На мастер-хосте ADB создайте директорию для хранения keytab-файла и назначьте для него владельца и необходимые разрешения:
$ sudo mkdir /var/lib/pxf/conf/kerberos $ sudo chmod 744 /var/lib/pxf/conf/kerberos $ sudo chown gpadmin:gpadmin /var/lib/pxf/conf/kerberos -
Подключитесь к мастер-хосту ADB под пользователем
gpadmin:$ sudo su - gpadmin -
На мастер-хосте ADB запустите интерактивную утилиту
ktutil:$ ktutil -
В открывшейся консоли после приглашения
ktutil:поочередно введите следущие команды. После ввода каждой команды требуется указать пароль принципалаadb-to-kafka(в приведенном примере123):add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e aes128-cts-hmac-sha1-96 add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e aes256-cts-hmac-sha1-96 add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e arcfour-hmac add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e camellia256-cts-cmac add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e camellia128-cts-cmac -
В качестве последней команды отправьте запрос на сохранение keytab-файла с введенными данными и нажмите
qдля выхода изktutil:write_kt /var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab -
Для копирования созданного файла adb-to-kafka.service.keytab на сегмент-хосты ADB выполните последовательно следующие команды на мастер-хосте:
-
синхронизация PXF:
$ pxf cluster sync -
рестарт PXF:
$ pxf cluster restart
-
-
Убедитесь в работоспособности принципала
adb-to-kafka, последовательно выполнив следующие команды. Обратите внимание, что после командыkinitтребуется указать пароль принципала (в приведенном примере123):$ kinit adb-to-kafka@ADS-KAFKA.LOCAL$ klistРезультат говорит об успешной выдаче тикета:
Ticket cache: FILE:/tmp/krb5cc_2042 Default principal: adb-to-kafka@ADS-KAFKA.LOCAL Valid starting Expires Service principal 12/14/2023 18:13:16 12/15/2023 18:13:16 krbtgt/ADS-KAFKA.LOCAL@ADS-KAFKA.LOCAL
-
Подключитесь к базе данных
adbна мастер-хосте кластера ADB (например, черезpsql):$ psql adb -
Создайте внешнюю таблицу со следующей структурой в ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_kerberos (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka_kerberos?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_PLAINTEXT&SASL_MECHANISM=GSSAPI&KERBEROS_KEYTAB=/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab&KERBEROS_PRINCIPAL=adb-to-kafka&KERBEROS_SERVICE_NAME=kafka' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export'); -
Добавьте тестовые данные во внешнюю таблицу
ext_adb_to_kafka_kerberos:INSERT INTO ext_adb_to_kafka_kerberos VALUES (1,'kerberos'); -
Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и последовательно выполните следующие команды. После ввода команды
kinitтребуется ввести пароль принципалаadb-to-kafka(в приведенном примере123):$ kinit adb-to-kafka@ADS-KAFKA.LOCAL $ export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/client.jaas" $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka_kerberos --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /tmp/client.propertiesРезультат содержит одно прочитанное сообщение:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA6qBp\xF5;Q8\x035<\xFDB\xBF\xC6\xB3\x02\x18\x02\x02\x02\x10kerberos\xA6qBp\xF5;Q8\x035<\xFDB\xBF\xC6\xB3 ^C[2023-12-14 18:18:35,250] WARN [Principal=null]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin) Processed a total of 1 messages
|
ПРИМЕЧАНИЕ
В опции |
Протокол SASL_SSL с использованием механизма GSSAPI (Kerberos)
|
ПРИМЕЧАНИЕ
Действия с сертификатами, описанные ниже на шагах 2—8, могут быть автоматизированы при помощи bash-скрипта (см. generate.sh в статье Защита канала по протоколу SSL), однако использование скрипта не рекомендуется для продуктовой среды. |
-
Выполните шаги 1—15 из приведенного выше примера С использованием механизма GSSAPI (Kerberos) (если они не были пройдены ранее). Убедитесь в наличии действующего тикета для принципала
adb-to-kafkaна стороне ADB. -
Сгенерируйте пару ключей по алгоритму RSA и создайте хранилище ключей keystore.jks на каждом хосте кластеров ADB и ADS:
$ keytool -genkeypair -noprompt -keyalg RSA -ext san=dns:<hostname> -alias <hostname> -dname "CN=<hostname>, OU=AD, O=AD, L=MSK, S=MO, C=RU" -keystore /tmp/keystore.jks -storepass bigdata -keypass bigdata -validity 360 -keysize 2048где
<hostname>— FQDN хоста, на котором запускается команда. -
Экспортируйте SSL-сертификаты, запустив следующую команду на каждом хосте ADB и ADS:
$ keytool -exportcert -file /tmp/<hostname>.crt -keystore /tmp/keystore.jks -storepass bigdata -alias <hostname> -rfcгде
<hostname>— FQDN хоста, на котором запускается команда.Результат может выглядеть следующим образом (для хоста с FQDN
bds-mdw.ru-central1.internal):Certificate stored in file </tmp/bds-mdw.ru-central1.internal.crt>
-
Скопируйте созданные сертификаты с каждого хоста ADB и ADS на локальную машину. Ниже показан пример команды для Windows:
$ pscp <user>@<hostname>:/tmp/<hostname>.crt /<local_folder>/где:
-
<user>— имя для подключения к хосту ADB/ADS по SSH; -
<hostname>— FQDN хоста ADB/ADS; -
<local_folder>— директория, созданная на локальной машине для сохранения всех сертификатов.
Например, для хоста ADB
bds-mdw.ru-central1.internalкоманда может выглядеть следующим образом:$ pscp dasha@bds-mdw:/tmp/bds-mdw.ru-central1.internal.crt /cert/В результате все сертификаты должны быть сохранены в одном месте:
Directory: C:\cert Mode LastWriteTime Length Name ---- ------------- ------ ---- -a---- 12/25/2023 10:58 AM 1275 bds-ads1.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1275 bds-ads2.ru-central1.internal.crt -a---- 12/25/2023 10:59 AM 1275 bds-ads3.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1271 bds-mdw.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1275 bds-sdw1.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1275 bds-sdw2.ru-central1.internal.crt
-
-
Перенесите все сертификаты с локальной машины в директорию /tmp каждого хоста ADB и ADS, последовательно запустив следующую команду для всех хостов:
$ pscp /<local_folder>/*.crt <user>@<hostname>:/tmp/Например, для хоста ADB
bds-mdw.ru-central1.internalкоманда может выглядеть следующим образом:$ pscp /cert/*.crt dasha@bds-mdw:/tmp/ -
Выполните импорт всех сертификатов в truststore.jks и хранилище CA-сертификатов ca-bundle.crt на каждом хосте ADB/ADS с помощью следующих команд:
$ keytool -importcert -noprompt -alias <hostname> -file /tmp/<hostname>.crt -keystore /tmp/truststore.jks -storepass bigdata $ sudo bash -c "cat /tmp/<hostname>.crt >> /etc/pki/tls/certs/ca-bundle.crt"где
<hostname>— FQDN хоста ADB/ADS. В приведенном примере команды необходимо запустить 6 раз на каждом из 6 хостов ADB/ADS (для каждого из сертификатов). -
Импортируйте trustore.jks в Java CA store, запустив следующую команду на каждом хосте ADB/ADS:
$ sudo keytool -importkeystore -noprompt -srckeystore /tmp/truststore.jks -destkeystore /etc/pki/java/cacerts -deststorepass changeit -srcstorepass bigdataРезультат:
Importing keystore /tmp/truststore.jks to /etc/pki/java/cacerts... Entry for alias bds-sdw1.ru-central1.internal successfully imported. Entry for alias bds-mdw.ru-central1.internal successfully imported. Entry for alias bds-ads1.ru-central1.internal successfully imported. Entry for alias bds-sdw2.ru-central1.internal successfully imported. Entry for alias bds-ads2.ru-central1.internal successfully imported. Entry for alias bds-ads3.ru-central1.internal successfully imported. Import command completed: 6 entries successfully imported, 0 entries failed or cancelled
-
Создайте и импортируйте сертификаты OpenSSL, запустив следующие команды на каждом хосте ADB/ADS:
$ sudo openssl req -new -newkey rsa:4096 -days 365 -nodes -x509 -subj "/C=RU/ST=Denial/L=MSK/O=AD/CN=<hostname>" -keyout /etc/ssl/host_cert.key -out /etc/ssl/certs/host_cert.cert $ sudo bash -c "cat /etc/ssl/certs/host_cert.cert >> /etc/pki/tls/certs/ca-bundle.crt"где
<hostname>— FQDN хоста, на котором запускается команда.Результат первой команды:
Generating a 4096 bit RSA private key .....................................................++ ...............................................................++ writing new private key to '/etc/ssl/host_cert.key'
-
Выполните включение SSL в кластере ADS в соответствии с разделом Включение SSL на кластере ADS при помощи ADCM.
-
Убедитесь, что включение SSL для Kafka выполнено успешно, в соответствии с инструкциями раздела Проверка настроек SSL.
-
На стороне кластера ADS создайте файл /tmp/client_ssl.properties:
$ sudo vi /tmp/client_ssl.propertiesУкажите следующее содержимое файла:
security.protocol=SASL_SSL sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka -
Подключитесь к базе данных
adbна мастер-хосте кластера ADB под пользователемgpadmin(например, черезpsql):$ sudo su - gpadmin$ psql adb -
Создайте внешнюю таблицу со следующей структурой в ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_ssl (a INT, b TEXT) LOCATION ( 'pxf://topic_ssl?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092&SECURITY_PROTOCOL=SASL_SSL&SASL_MECHANISM=GSSAPI&KERBEROS_KEYTAB=/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab&KERBEROS_PRINCIPAL=adb-to-kafka&KERBEROS_SERVICE_NAME=kafka&TRUSTSTORE_LOCATION=/tmp/truststore.jks&TRUSTSTORE_PASSWORD=bigdata' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export'); -
Добавьте тестовые данные во внешнюю таблицу
ext_adb_to_kafka_ssl:INSERT INTO ext_adb_to_kafka_ssl VALUES (1,'ssl-test-data'); -
Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и последовательно выполните следующие команды. После ввода команды
kinitтребуется ввести пароль принципалаadb-to-kafka(в приведенном примере123):$ kinit adb-to-kafka@ADS-KAFKA.LOCAL $ export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/client.jaas" $ bin/kafka-console-consumer.sh --topic topic_ssl --from-beginning --bootstrap-server bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /tmp/client_ssl.propertiesРезультат содержит одно прочитанное сообщение:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00 1I\xCDJ6|U\xBEw\x14\xD5\xA8\x5C>?\x02"\x02\x02\x02\x1Assl-test-data 1I\xCDJ6|U\xBEw\x14\xD5\xA8\x5C>? ^C[2023-12-26 08:30:14,242] WARN [Principal=null]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin) Processed a total of 1 messages
|
ПРИМЕЧАНИЕ
В опции |