Примеры использования ADB to Kafka Connector
В статье показаны примеры передачи данных из таблицы ADB в топик Kafka (в составе ADS) через ADB to Kafka Connector. При выполнении примера соблюдены следующие условия:
-
Кластер ADS установлен согласно руководству Online-установка на следующих хостах: bds-ads1, bds-ads2, bds-ads3.
-
Кластер ADB установлен согласно руководству Online-установка на следующих хостах: bds-mdw, bds-sdw1, bds-sdw2.
-
В кластере ADB добавлены и установлены сервисы PXF и ADB to Kafka.
-
База данных с именем
adb
существует в ADB. -
Все ноды кластера ADB имеют доступ к порту
9092
на всех нодах кластера ADS, через которые планируется производить загрузку данных.
ПРИМЕЧАНИЕ
Поскольку ADB to Kafka Connector использует для сериализации данных формат AVRO OCF, выполнить десериализацию при помощи kafka-avro-console-consumer невозможно. Во всех примерах, приведенных ниже, для чтения пришедших со стороны ADB данных используется скрипт kafka-console-consumer.sh, возвращающий схему данных и сами данные в HEX-представлении. Десериализацию можно выполнить с помощью коннектора Kafka to ADB (см. Примеры использования Kafka to ADB Connector). |
Протокол PLAINTEXT (по умолчанию)
Без использования опции SERVER
-
Подключитесь к любому хосту кластера ADS, на котором установлен сервис Kafka. Перейдите в корневую директорию и создайте тестовый топик, используя следующую команду:
$ bin/kafka-topics.sh --create --topic topic_adb_to_kafka --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092
Результат:
Created topic topic_adb_to_kafka.
РЕКОМЕНДАЦИЯ-
Дополнительную информацию о создании и чтении топиков в Kafka можно получить в статье документации ADS Начало работы c Kafka.
-
Создание топика является необязательным шагом. Если на шаге 3 будет указан несуществующий топик, он будет создан автоматически на стороне ADS при условии, что значение опции TOPIC_AUTO_CREATE_FLAG не изменено на
false
.
-
-
Подключитесь к базе данных
adb
на master-хосте кластера ADB под пользователемgpadmin
(например, через psql):$ sudo su - gpadmin
$ psql adb
-
Создайте внешнюю таблицу со следующей структурой в ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Добавьте тестовые данные во внешнюю таблицу:
INSERT INTO ext_adb_to_kafka VALUES (1,'test1'), (2,'test2'), (3,'test3'), (4,'test4'), (5,'test5');
-
Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и запустите скрипт bin/kafka-console-consumer.sh:
$ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true
В выводе команды присутствует информация о 5 прочитанных сообщениях (с номером смещения в очереди
Offset
):Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E\x02\x12\x02\x02\x02\x0Atest1\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E Offset:1 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4\x02\x12\x02\x04\x02\x0Atest2\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4 Offset:2 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr\x02\x12\x02\x0A\x02\x0Atest5H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr Offset:3 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13\x02\x12\x02\x06\x02\x0Atest3\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13 Offset:4 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809\x02\x12\x02\x08\x02\x0Atest4\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809 ^CProcessed a total of 5 messages
С использованием опции SERVER
-
Выполните шаги 1-6, указанные в разделе Использование опции SERVER статьи Настройка ADB to Kafka Connector. В качестве имени сервера
<server_name>
используйтеkafka_test
. -
На стороне ADB создайте внешнюю таблицу
ext_adb_to_kafka2
. В ней, в отличие от таблицы из предыдущего примера, необходимо заполнить опциюSERVER
:CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka2 (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka?PROFILE=kafka&SERVER=kafka_test' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Добавьте тестовые данные во внешнюю таблицу
ext_adb_to_kafka2
:INSERT INTO ext_adb_to_kafka2 VALUES (1,'test1');
-
На стороне ADS-кластера убедитесь в поступлении нового сообщения в топик
topic_adb_to_kafka
:$ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true
Результат содержит 6 сообщений:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E\x02\x12\x02\x02\x02\x0Atest1\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E Offset:1 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4\x02\x12\x02\x04\x02\x0Atest2\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4 Offset:2 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr\x02\x12\x02\x0A\x02\x0Atest5H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr Offset:3 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13\x02\x12\x02\x06\x02\x0Atest3\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13 Offset:4 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809\x02\x12\x02\x08\x02\x0Atest4\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809 Offset:5 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00%\xA7\xB4\x18 \xE3\x8C\xCDx\xB4\xEE\xE4'Cjs\x02\x12\x02\x02\x02\x0Atest1%\xA7\xB4\x18 \xE3\x8C\xCDx\xB4\xEE\xE4'Cjs ^CProcessed a total of 6 messages
Протокол SASL_PLAINTEXT
С использованием механизма PLAIN
-
Подключитесь к любому хосту кластера ADS, на котором установлен сервис Kafka, и выполните настройку аутентификации по протоколу SASL_PLAINTEXT для кластера ADS в соответствии со статьей SASL PLAINTEXT.
-
Убедитесь в корректной настройке аутентификации для сервиса Kafka, выполнив шаги из раздела Проверка включения аутентификации SASL PLAINTEXT.
-
Создайте пользователя с именем
adb-to-kafka
и паролем123
на стороне ADS, следуя шагам раздела Создание пользователя. -
В соответствии с инструкциями раздела Аутентификация пользователя в среде Kafka обновите содержимое файла /etc/kafka/conf/client.properties следующим образом:
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN # Uncomment and set necessary username/password sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="adb-to-kafka" \ password="123";
-
Подключитесь к базе данных
adb
на master-хосте кластера ADB под пользователемgpadmin
(например, через psql):$ sudo su - gpadmin
$ psql adb
-
Создайте внешнюю таблицу со следующей структурой в ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_sasl (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka_sasl?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_PLAINTEXT&SASL_MECHANISM=PLAIN&SASL_USER=adb-to-kafka&SASL_USER_PASSWORD=123' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Добавьте тестовые данные во внешнюю таблицу
ext_adb_to_kafka_sasl
:INSERT INTO ext_adb_to_kafka_sasl VALUES (1,'test1');
-
Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и запустите скрипт bin/kafka-console-consumer.sh.
ПРИМЕЧАНИЕВ отличие от представленных выше примеров, при использовании протокола SASL_PLAINTEXT требуется дополнительно заполнить опцию
--consumer.config
. Укажите в ней путь к файлу client.properties, описанному выше на шаге 4.$ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka_sasl --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /etc/kafka/conf/client.properties
В выводе команды успешно отображается одно прочитанное сообщение:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00c\xA9\x9A\xC6ZoI\xA2br\xCD\xFBi=@\x81\x02\x12\x02\x02\x02\x0Atest1c\xA9\x9A\xC6ZoI\xA2br\xCD\xFBi=@\x81 ^CProcessed a total of 1 messages
С использованием механизма GSSAPI (Kerberos)
ВНИМАНИЕ
Если ранее была выполнена настройка аутентификации по протоколу SASL_PLAINTEXT для кластера ADS в соответствии со статьей SASL PLAINTEXT (например, при запуске предыдущего примера), необходимо предварительно отключить ее перед керберизацией ADS. Для этого переведите в неактивное состояние переключатель SASL_PLAINTEXT Authentication на конфигурационной странице кластера ADS и сохраните изменения, нажав на кнопку Save. |
-
Выполните настройку MIT Kerberos KDC на отдельном хосте (в нашем примере bds-mit-kerberos.ru-central1.internal) и проведите керберизацию кластера ADS, следуя шагам руководства MIT Kerberos. В качестве имени realm используйте
ADS-KAFKA.LOCAL
. -
Убедитесь в корректной настройке аутентификации для сервиса Kafka, выполнив шаги из раздела Проверка установленного Kerberos SASL.
-
На хосте, где развернут MIT Kerberos KDC, создайте новый принципал
adb-to-kafka
, который будет использоваться коннектором ADB to Kafka. В качестве пароля укажите123
:$ sudo kadmin.local -q "add_principal -pw 123 adb-to-kafka@ADS-KAFKA.LOCAL"
Результат:
Principal "adb-to-kafka@ADS-KAFKA.LOCAL" created.
-
Следуя инструкциям раздела Создание файла конфигурации .properties для пользователя, создайте файл /tmp/client.properties на стороне ADS.
-
Выполните шаги, описанные в разделе Создание JAAS-файла для пользователя.
-
Выполните установку необходимых пакетов на мастере и сегмент-хостах кластера ADB:
$ sudo yum install -y krb5-libs krb5-server krb5-workstation openldap-clients cyrus-sasl-gssapi
-
На мастере и каждом сегмент-хосте кластера ADB отредактируйте содержимое файла /etc/krb5.conf аналогично тому, как это сделано при настройке MIT Kerberos KDC:
$ sudo vi /etc/krb5.conf
Содержимое файла:
[logging] default = FILE:/var/log/krb5libs.log kdc = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log [libdefaults] default_realm = ADS-KAFKA.LOCAL kdc_timesync = 1 ticket_lifetime = 24h [realms] ADS-KAFKA.LOCAL = { admin_server = bds-mit-kerberos.ru-central1.internal kdc = bds-mit-kerberos.ru-central1.internal }
-
На мастере и каждом сегмент-хосте кластера ADB отредактируйте содержимое файла /var/kerberos/krb5kdc/kdc.conf аналогично тому, как это сделано при настройке MIT Kerberos KDC:
$ sudo vi /var/kerberos/krb5kdc/kdc.conf
Содержимое файла:
[kdcdefaults] kdc_ports = 88 kdc_tcp_ports = 88 default_realm = ADS-KAFKA.LOCAL [realms] ADS-KAFKA.LOCAL = { #master_key_type = aes256-cts acl_file = /var/kerberos/krb5kdc/kadm5.acl dict_file = /usr/share/dict/words admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal }
-
На мастер-хосте ADB создайте директорию для хранения keytab-файла и назначьте для него владельца и необходимые разрешения:
$ sudo mkdir /var/lib/pxf/conf/kerberos $ sudo chmod 744 /var/lib/pxf/conf/kerberos $ sudo chown gpadmin:gpadmin /var/lib/pxf/conf/kerberos
-
Подключитесь к мастер-хосту ADB под пользователем
gpadmin
:$ sudo su - gpadmin
-
На мастер-хосте ADB запустите интерактивную утилиту
ktutil
:$ ktutil
-
В открывшейся консоли после приглашения
ktutil:
поочередно введите следущие команды. После ввода каждой команды требуется указать пароль принципалаadb-to-kafka
(в нашем примере123
):add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e aes128-cts-hmac-sha1-96 add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e aes256-cts-hmac-sha1-96 add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e arcfour-hmac add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e camellia256-cts-cmac add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e camellia128-cts-cmac
-
В качестве последней команды отправьте запрос на сохранение keytab-файла с введенными данными и нажмите
q
для выхода изktutil
:write_kt /var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab
-
Для копирования созданного файла adb-to-kafka.service.keytab на сегмент-хосты ADB выполните последовательно следующие команды на мастер-хосте:
-
синхронизация PXF:
$ pxf cluster sync
-
рестарт PXF:
$ pxf cluster restart
-
-
Убедитесь в работоспособности принципала
adb-to-kafka
, последовательно выполнив следующие команды. Обратите внимание, что после командыkinit
требуется указать пароль принципала (в нашем примере123
):$ kinit adb-to-kafka@ADS-KAFKA.LOCAL
$ klist
Результат говорит об успешной выдаче тикета:
Ticket cache: FILE:/tmp/krb5cc_2042 Default principal: adb-to-kafka@ADS-KAFKA.LOCAL Valid starting Expires Service principal 12/14/2023 18:13:16 12/15/2023 18:13:16 krbtgt/ADS-KAFKA.LOCAL@ADS-KAFKA.LOCAL
-
Подключитесь к базе данных
adb
на master-хосте кластера ADB (например, через psql):$ psql adb
-
Создайте внешнюю таблицу со следующей структурой в ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_kerberos (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka_kerberos?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_PLAINTEXT&SASL_MECHANISM=GSSAPI&KERBEROS_KEYTAB=/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab&KERBEROS_PRINCIPAL=adb-to-kafka&KERBEROS_SERVICE_NAME=kafka' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Добавьте тестовые данные во внешнюю таблицу
ext_adb_to_kafka_kerberos
:INSERT INTO ext_adb_to_kafka_kerberos VALUES (1,'kerberos');
-
Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и последовательно выполните следующие команды. После ввода команды
kinit
требуется ввести пароль принципалаadb-to-kafka
(в нашем примере123
):$ kinit adb-to-kafka@ADS-KAFKA.LOCAL $ export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/client.jaas" $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka_kerberos --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /tmp/client.properties
Результат содержит одно прочитанное сообщение:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA6qBp\xF5;Q8\x035<\xFDB\xBF\xC6\xB3\x02\x18\x02\x02\x02\x10kerberos\xA6qBp\xF5;Q8\x035<\xFDB\xBF\xC6\xB3 ^C[2023-12-14 18:18:35,250] WARN [Principal=null]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin) Processed a total of 1 messages
ПРИМЕЧАНИЕ
В опции |
Протокол SASL_SSL с использованием механизма GSSAPI (Kerberos)
ПРИМЕЧАНИЕ
Действия с сертификатами, описанные ниже на шагах 2-8, могут быть автоматизированы при помощи bash-скрипта (см. generate.sh в статье Защита канала по протоколу SSL), однако использование скрипта не рекомендуется для продуктовой среды. |
-
Выполните шаги 1-15 из приведенного выше примера С использованием механизма GSSAPI (Kerberos) (если они не были пройдены ранее). Убедитесь в наличии действующего тикета для принципала
adb-to-kafka
на стороне ADB. -
Сгенерируйте пару ключей по алгоритму RSA и создайте хранилище ключей keystore.jks на каждом хосте кластеров ADB и ADS:
$ keytool -genkeypair -noprompt -keyalg RSA -ext san=dns:<hostname> -alias <hostname> -dname "CN=<hostname>, OU=AD, O=AD, L=MSK, S=MO, C=RU" -keystore /tmp/keystore.jks -storepass bigdata -keypass bigdata -validity 360 -keysize 2048
где
<hostname>
— FQDN хоста, на котором запускается команда. -
Экспортируйте SSL-сертификаты, запустив следующую команду на каждом хосте ADB и ADS:
$ keytool -exportcert -file /tmp/<hostname>.crt -keystore /tmp/keystore.jks -storepass bigdata -alias <hostname> -rfc
где
<hostname>
— FQDN хоста, на котором запускается команда.Результат может выглядеть следующим образом (для хоста с FQDN
bds-mdw.ru-central1.internal
):Certificate stored in file </tmp/bds-mdw.ru-central1.internal.crt>
-
Скопируйте созданные сертификаты с каждого хоста ADB и ADS на локальную машину. Ниже показан пример команды для Windows:
$ pscp <user>@<hostname>:/tmp/<hostname>.crt /<local_folder>/
где:
-
<user>
— имя для подключения к хосту ADB/ADS по SSH; -
<hostname>
— FQDN хоста ADB/ADS; -
<local_folder>
— директория, созданная на локальной машине для сохранения всех сертификатов.
Например, для хоста ADB
bds-mdw.ru-central1.internal
команда может выглядеть следующим образом:$ pscp dasha@bds-mdw:/tmp/bds-mdw.ru-central1.internal.crt /cert/
В результате все сертификаты должны быть сохранены в одном месте:
Directory: C:\cert Mode LastWriteTime Length Name ---- ------------- ------ ---- -a---- 12/25/2023 10:58 AM 1275 bds-ads1.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1275 bds-ads2.ru-central1.internal.crt -a---- 12/25/2023 10:59 AM 1275 bds-ads3.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1271 bds-mdw.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1275 bds-sdw1.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1275 bds-sdw2.ru-central1.internal.crt
-
-
Перенесите все сертификаты с локальной машины в директорию /tmp каждого хоста ADB и ADS, последовательно запустив следующую команду для всех хостов:
$ pscp /<local_folder>/*.crt <user>@<hostname>:/tmp/
Например, для хоста ADB
bds-mdw.ru-central1.internal
команда может выглядеть следующим образом:$ pscp /cert/*.crt dasha@bds-mdw:/tmp/
-
Выполните импорт всех сертификатов в truststore.jks и хранилище CA-сертификатов ca-bundle.crt на каждом хосте ADB/ADS с помощью следующих команд:
$ keytool -importcert -noprompt -alias <hostname> -file /tmp/<hostname>.crt -keystore /tmp/truststore.jks -storepass bigdata $ sudo bash -c "cat /tmp/<hostname>.crt >> /etc/pki/tls/certs/ca-bundle.crt"
где
<hostname>
— FQDN хоста ADB/ADS. В нашем примере команды необходимо запустить 6 раз на каждом из 6 хостов ADB/ADS (для каждого из сертификатов). -
Импортируйте trustore.jks в Java CA store, запустив следующую команду на каждом хосте ADB/ADS:
$ sudo keytool -importkeystore -noprompt -srckeystore /tmp/truststore.jks -destkeystore /etc/pki/java/cacerts -deststorepass changeit -srcstorepass bigdata
Результат:
Importing keystore /tmp/truststore.jks to /etc/pki/java/cacerts... Entry for alias bds-sdw1.ru-central1.internal successfully imported. Entry for alias bds-mdw.ru-central1.internal successfully imported. Entry for alias bds-ads1.ru-central1.internal successfully imported. Entry for alias bds-sdw2.ru-central1.internal successfully imported. Entry for alias bds-ads2.ru-central1.internal successfully imported. Entry for alias bds-ads3.ru-central1.internal successfully imported. Import command completed: 6 entries successfully imported, 0 entries failed or cancelled
-
Создайте и импортируйте сертификаты OpenSSL, запустив следующие команды на каждом хосте ADB/ADS:
$ sudo openssl req -new -newkey rsa:4096 -days 365 -nodes -x509 -subj "/C=RU/ST=Denial/L=MSK/O=AD/CN=<hostname>" -keyout /etc/ssl/host_cert.key -out /etc/ssl/certs/host_cert.cert $ sudo bash -c "cat /etc/ssl/certs/host_cert.cert >> /etc/pki/tls/certs/ca-bundle.crt"
где
<hostname>
— FQDN хоста, на котором запускается команда.Результат первой команды:
Generating a 4096 bit RSA private key .....................................................++ ...............................................................++ writing new private key to '/etc/ssl/host_cert.key'
-
Выполните включение SSL в кластере ADS в соответствии с разделом Включение SSL на кластере ADS при помощи ADCM.
-
Убедитесь, что включение SSL для Kafka выполнено успешно, в соответствии с инструкциями раздела Проверка настроек SSL.
-
На стороне кластера ADS создайте файл /tmp/client_ssl.properties:
$ sudo vi /tmp/client_ssl.properties
Укажите следующее содержимое файла:
security.protocol=SASL_SSL sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka
-
Подключитесь к базе данных
adb
на master-хосте кластера ADB под пользователемgpadmin
(например, через psql):$ sudo su - gpadmin
$ psql adb
-
Создайте внешнюю таблицу со следующей структурой в ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_ssl (a INT, b TEXT) LOCATION ( 'pxf://topic_ssl?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092&SECURITY_PROTOCOL=SASL_SSL&SASL_MECHANISM=GSSAPI&KERBEROS_KEYTAB=/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab&KERBEROS_PRINCIPAL=adb-to-kafka&KERBEROS_SERVICE_NAME=kafka&TRUSTSTORE_LOCATION=/tmp/truststore.jks&TRUSTSTORE_PASSWORD=bigdata' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Добавьте тестовые данные во внешнюю таблицу
ext_adb_to_kafka_ssl
:INSERT INTO ext_adb_to_kafka_ssl VALUES (1,'ssl-test-data');
-
Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и последовательно выполните следующие команды. После ввода команды
kinit
требуется ввести пароль принципалаadb-to-kafka
(в нашем примере123
):$ kinit adb-to-kafka@ADS-KAFKA.LOCAL $ export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/client.jaas" $ bin/kafka-console-consumer.sh --topic topic_ssl --from-beginning --bootstrap-server bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /tmp/client_ssl.properties
Результат содержит одно прочитанное сообщение:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00 1I\xCDJ6|U\xBEw\x14\xD5\xA8\x5C>?\x02"\x02\x02\x02\x1Assl-test-data 1I\xCDJ6|U\xBEw\x14\xD5\xA8\x5C>? ^C[2023-12-26 08:30:14,242] WARN [Principal=null]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin) Processed a total of 1 messages
ПРИМЕЧАНИЕ
В опции |