Примеры использования ADB to Kafka Connector

В статье показаны примеры передачи данных из таблицы ADB в топик Kafka (в составе ADS) через ADB to Kafka Connector. При выполнении примера соблюдены следующие условия:

  • Кластер ADS установлен согласно руководству Online-установка на следующих хостах: bds-ads1, bds-ads2, bds-ads3.

  • Кластер ADB установлен согласно руководству Online-установка на следующих хостах: bds-mdw, bds-sdw1, bds-sdw2.

  • В кластере ADB добавлены и установлены сервисы PXF и ADB to Kafka.

  • База данных с именем adb существует в ADB.

  • Все ноды кластера ADB имеют доступ к порту 9092 на всех нодах кластера ADS, через которые планируется производить загрузку данных.

ПРИМЕЧАНИЕ

Поскольку ADB to Kafka Connector использует для сериализации данных формат AVRO OCF, выполнить десериализацию при помощи kafka-avro-console-consumer невозможно. Во всех примерах, приведенных ниже, для чтения пришедших со стороны ADB данных используется скрипт kafka-console-consumer.sh, возвращающий схему данных и сами данные в HEX-представлении. Десериализацию можно выполнить с помощью коннектора Kafka to ADB (см. Примеры использования Kafka to ADB Connector).

Протокол PLAINTEXT (по умолчанию)

Без использования опции SERVER

  1. Подключитесь к любому хосту кластера ADS, на котором установлен сервис Kafka. Перейдите в корневую директорию и создайте тестовый топик, используя следующую команду:

    $ bin/kafka-topics.sh --create --topic topic_adb_to_kafka --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092

    Результат:

    Created topic topic_adb_to_kafka.
    РЕКОМЕНДАЦИЯ
    • Дополнительную информацию о создании и чтении топиков в Kafka можно получить в статье документации ADS Начало работы c Kafka.

    • Создание топика является необязательным шагом. Если на шаге 3 будет указан несуществующий топик, он будет создан автоматически на стороне ADS при условии, что значение опции TOPIC_AUTO_CREATE_FLAG не изменено на false.

  2. Подключитесь к базе данных adb на master-хосте кластера ADB под пользователем gpadmin (например, через psql):

    $ sudo su - gpadmin
    $ psql adb
  3. Создайте внешнюю таблицу со следующей структурой в ADB:

    CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka (a INT, b TEXT)
    LOCATION (
        'pxf://topic_adb_to_kafka?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  4. Добавьте тестовые данные во внешнюю таблицу:

    INSERT INTO ext_adb_to_kafka VALUES
    (1,'test1'),
    (2,'test2'),
    (3,'test3'),
    (4,'test4'),
    (5,'test5');
  5. Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и запустите скрипт bin/kafka-console-consumer.sh:

    $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true

    В выводе команды присутствует информация о 5 прочитанных сообщениях (с номером смещения в очереди Offset):

    Offset:0        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E\x02\x12\x02\x02\x02\x0Atest1\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E
    Offset:1        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4\x02\x12\x02\x04\x02\x0Atest2\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4
    Offset:2        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr\x02\x12\x02\x0A\x02\x0Atest5H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr
    Offset:3        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13\x02\x12\x02\x06\x02\x0Atest3\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13
    Offset:4        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809\x02\x12\x02\x08\x02\x0Atest4\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809
    ^CProcessed a total of 5 messages

С использованием опции SERVER

  1. Выполните шаги 1-6, указанные в разделе Использование опции SERVER статьи Настройка ADB to Kafka Connector. В качестве имени сервера <server_name> используйте kafka_test.

  2. На стороне ADB создайте внешнюю таблицу ext_adb_to_kafka2. В ней, в отличие от таблицы из предыдущего примера, необходимо заполнить опцию SERVER:

    CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka2 (a INT, b TEXT)
    LOCATION (
        'pxf://topic_adb_to_kafka?PROFILE=kafka&SERVER=kafka_test'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  3. Добавьте тестовые данные во внешнюю таблицу ext_adb_to_kafka2:

    INSERT INTO ext_adb_to_kafka2 VALUES
    (1,'test1');
  4. На стороне ADS-кластера убедитесь в поступлении нового сообщения в топик topic_adb_to_kafka:

    $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true

    Результат содержит 6 сообщений:

    Offset:0        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E\x02\x12\x02\x02\x02\x0Atest1\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E
    Offset:1        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4\x02\x12\x02\x04\x02\x0Atest2\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4
    Offset:2        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr\x02\x12\x02\x0A\x02\x0Atest5H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr
    Offset:3        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13\x02\x12\x02\x06\x02\x0Atest3\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13
    Offset:4        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809\x02\x12\x02\x08\x02\x0Atest4\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809
    Offset:5        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00%\xA7\xB4\x18 \xE3\x8C\xCDx\xB4\xEE\xE4'Cjs\x02\x12\x02\x02\x02\x0Atest1%\xA7\xB4\x18 \xE3\x8C\xCDx\xB4\xEE\xE4'Cjs
    ^CProcessed a total of 6 messages

Протокол SASL_PLAINTEXT

С использованием механизма PLAIN

  1. Подключитесь к любому хосту кластера ADS, на котором установлен сервис Kafka, и выполните настройку аутентификации по протоколу SASL_PLAINTEXT для кластера ADS в соответствии со статьей SASL PLAINTEXT.

  2. Убедитесь в корректной настройке аутентификации для сервиса Kafka, выполнив шаги из раздела Проверка включения аутентификации SASL PLAINTEXT.

  3. Создайте пользователя с именем adb-to-kafka и паролем 123 на стороне ADS, следуя шагам раздела Создание пользователя.

  4. В соответствии с инструкциями раздела Аутентификация пользователя в среде Kafka обновите содержимое файла /etc/kafka/conf/client.properties следующим образом:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN
    # Uncomment and set necessary username/password
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
      username="adb-to-kafka" \
      password="123";
  5. Подключитесь к базе данных adb на master-хосте кластера ADB под пользователем gpadmin (например, через psql):

    $ sudo su - gpadmin
    $ psql adb
  6. Создайте внешнюю таблицу со следующей структурой в ADB:

    CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_sasl (a INT, b TEXT)
    LOCATION (
        'pxf://topic_adb_to_kafka_sasl?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_PLAINTEXT&SASL_MECHANISM=PLAIN&SASL_USER=adb-to-kafka&SASL_USER_PASSWORD=123'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  7. Добавьте тестовые данные во внешнюю таблицу ext_adb_to_kafka_sasl:

    INSERT INTO ext_adb_to_kafka_sasl VALUES
    (1,'test1');
  8. Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и запустите скрипт bin/kafka-console-consumer.sh.

    ПРИМЕЧАНИЕ

    В отличие от представленных выше примеров, при использовании протокола SASL_PLAINTEXT требуется дополнительно заполнить опцию --consumer.config. Укажите в ней путь к файлу client.properties, описанному выше на шаге 4.

    $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka_sasl --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /etc/kafka/conf/client.properties

    В выводе команды успешно отображается одно прочитанное сообщение:

    Offset:0        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00c\xA9\x9A\xC6ZoI\xA2br\xCD\xFBi=@\x81\x02\x12\x02\x02\x02\x0Atest1c\xA9\x9A\xC6ZoI\xA2br\xCD\xFBi=@\x81
    ^CProcessed a total of 1 messages
Ошибки аутентификации

 

В тех случаях, когда логин или пароль пользователя не указаны при объявлении внешней таблицы ADB либо указаны неверно, при попытке добавления данных в таблицу возвращается ошибка следующего вида:

ERROR:  PXF server error : Cannot acquire list of topics for server: bds-ads1:9092,bds-ads2:9092,bds-ads3:9092!  (seg5 10.92.40.70:10001 pid=15516)
HINT:  Check the PXF logs located in the '/var/lib/pxf/logs' directory on host 'bds-sdw2' or 'set client_min_messages=LOG' for additional details.

Чтобы выяснить исходную ошибку, можно просмотреть лог /var/lib/pxf/logs/pxf-service.log на указанном сегмент-хосте под пользователем gpadmin:

$ sudo su - gpadmin
$ tail /var/lib/pxf/logs/pxf-service.log

Результат говорит о неудачной попытке аутентификации:

Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password

С использованием механизма GSSAPI (Kerberos)

ВНИМАНИЕ

Если ранее была выполнена настройка аутентификации по протоколу SASL_PLAINTEXT для кластера ADS в соответствии со статьей SASL PLAINTEXT (например, при запуске предыдущего примера), необходимо предварительно отключить ее перед керберизацией ADS. Для этого переведите в неактивное состояние переключатель SASL_PLAINTEXT Authentication на конфигурационной странице кластера ADS и сохраните изменения, нажав на кнопку Save.

  1. Выполните настройку MIT Kerberos KDC на отдельном хосте (в нашем примере bds-mit-kerberos.ru-central1.internal) и проведите керберизацию кластера ADS, следуя шагам руководства MIT Kerberos. В качестве имени realm используйте ADS-KAFKA.LOCAL.

  2. Убедитесь в корректной настройке аутентификации для сервиса Kafka, выполнив шаги из раздела Проверка установленного Kerberos SASL.

  3. На хосте, где развернут MIT Kerberos KDC, создайте новый принципал adb-to-kafka, который будет использоваться коннектором ADB to Kafka. В качестве пароля укажите 123:

    $ sudo kadmin.local -q "add_principal -pw 123 adb-to-kafka@ADS-KAFKA.LOCAL"

    Результат:

    Principal "adb-to-kafka@ADS-KAFKA.LOCAL" created.
  4. Следуя инструкциям раздела Создание файла конфигурации .properties для пользователя, создайте файл /tmp/client.properties на стороне ADS.

  5. Выполните шаги, описанные в разделе Создание JAAS-файла для пользователя.

  6. Выполните установку необходимых пакетов на мастере и сегмент-хостах кластера ADB:

    $ sudo yum install -y krb5-libs krb5-server krb5-workstation openldap-clients cyrus-sasl-gssapi
  7. На мастере и каждом сегмент-хосте кластера ADB отредактируйте содержимое файла /etc/krb5.conf аналогично тому, как это сделано при настройке MIT Kerberos KDC:

    $ sudo vi /etc/krb5.conf

    Содержимое файла:

    [logging]
     default = FILE:/var/log/krb5libs.log
     kdc = FILE:/var/log/krb5kdc.log
     admin_server = FILE:/var/log/kadmind.log
    
    [libdefaults]
     default_realm = ADS-KAFKA.LOCAL
     kdc_timesync = 1
     ticket_lifetime = 24h
    
    [realms]
     ADS-KAFKA.LOCAL = {
     admin_server = bds-mit-kerberos.ru-central1.internal
     kdc = bds-mit-kerberos.ru-central1.internal
     }
  8. На мастере и каждом сегмент-хосте кластера ADB отредактируйте содержимое файла /var/kerberos/krb5kdc/kdc.conf аналогично тому, как это сделано при настройке MIT Kerberos KDC:

    $ sudo vi /var/kerberos/krb5kdc/kdc.conf

    Содержимое файла:

    [kdcdefaults]
     kdc_ports = 88
     kdc_tcp_ports = 88
     default_realm = ADS-KAFKA.LOCAL
    
    [realms]
     ADS-KAFKA.LOCAL = {
      #master_key_type = aes256-cts
      acl_file = /var/kerberos/krb5kdc/kadm5.acl
      dict_file = /usr/share/dict/words
      admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
      supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
     }
  9. На мастер-хосте ADB создайте директорию для хранения keytab-файла и назначьте для него владельца и необходимые разрешения:

    $ sudo mkdir /var/lib/pxf/conf/kerberos
    $ sudo chmod 744 /var/lib/pxf/conf/kerberos
    $ sudo chown gpadmin:gpadmin /var/lib/pxf/conf/kerberos
  10. Подключитесь к мастер-хосту ADB под пользователем gpadmin:

    $ sudo su - gpadmin
  11. На мастер-хосте ADB запустите интерактивную утилиту ktutil:

    $ ktutil
  12. В открывшейся консоли после приглашения ktutil: поочередно введите следущие команды. После ввода каждой команды требуется указать пароль принципала adb-to-kafka (в нашем примере 123):

    add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e aes128-cts-hmac-sha1-96
    add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e aes256-cts-hmac-sha1-96
    add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e arcfour-hmac
    add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e camellia256-cts-cmac
    add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1  -e camellia128-cts-cmac
  13. В качестве последней команды отправьте запрос на сохранение keytab-файла с введенными данными и нажмите q для выхода из ktutil:

    write_kt /var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab
  14. Для копирования созданного файла adb-to-kafka.service.keytab на сегмент-хосты ADB выполните последовательно следующие команды на мастер-хосте:

    • синхронизация PXF:

      $ pxf cluster sync
    • рестарт PXF:

      $ pxf cluster restart
  15. Убедитесь в работоспособности принципала adb-to-kafka, последовательно выполнив следующие команды. Обратите внимание, что после команды kinit требуется указать пароль принципала (в нашем примере 123):

    $ kinit adb-to-kafka@ADS-KAFKA.LOCAL
    $ klist

    Результат говорит об успешной выдаче тикета:

    Ticket cache: FILE:/tmp/krb5cc_2042
    Default principal: adb-to-kafka@ADS-KAFKA.LOCAL
    
    Valid starting       Expires              Service principal
    12/14/2023 18:13:16  12/15/2023 18:13:16  krbtgt/ADS-KAFKA.LOCAL@ADS-KAFKA.LOCAL
  16. Подключитесь к базе данных adb на master-хосте кластера ADB (например, через psql):

    $ psql adb
  17. Создайте внешнюю таблицу со следующей структурой в ADB:

    CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_kerberos (a INT, b TEXT)
    LOCATION (
        'pxf://topic_adb_to_kafka_kerberos?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_PLAINTEXT&SASL_MECHANISM=GSSAPI&KERBEROS_KEYTAB=/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab&KERBEROS_PRINCIPAL=adb-to-kafka&KERBEROS_SERVICE_NAME=kafka'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  18. Добавьте тестовые данные во внешнюю таблицу ext_adb_to_kafka_kerberos:

    INSERT INTO ext_adb_to_kafka_kerberos VALUES
    (1,'kerberos');
  19. Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и последовательно выполните следующие команды. После ввода команды kinit требуется ввести пароль принципала adb-to-kafka (в нашем примере 123):

    $ kinit adb-to-kafka@ADS-KAFKA.LOCAL
    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/client.jaas"
    $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka_kerberos --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /tmp/client.properties

    Результат содержит одно прочитанное сообщение:

    Offset:0        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA6qBp\xF5;Q8\x035<\xFDB\xBF\xC6\xB3\x02\x18\x02\x02\x02\x10kerberos\xA6qBp\xF5;Q8\x035<\xFDB\xBF\xC6\xB3
    ^C[2023-12-14 18:18:35,250] WARN [Principal=null]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin)
    Processed a total of 1 messages
ПРИМЕЧАНИЕ

В опции --consumer.config необходимо указать путь к файлу client.properties, описанному выше на шаге 4.

Ошибки аутентификации

 

В тех случаях, когда принципал или путь к keytab-файлу не указаны при объявлении внешней таблицы ADB либо указаны неверно, при попытке добавления данных в таблицу возвращается ошибка следующего вида:

ERROR:  PXF server error : Failed to create new KafkaAdminClient  (seg4 10.92.40.70:10000 pid=25863)
HINT:  Check the PXF logs located in the '/var/lib/pxf/logs' directory on host 'bds-sdw2' or 'set client_min_messages=LOG' for additional details.

Детали можно найти в логах PXF.

Протокол SASL_SSL с использованием механизма GSSAPI (Kerberos)

ПРИМЕЧАНИЕ

Действия с сертификатами, описанные ниже на шагах 2-8, могут быть автоматизированы при помощи bash-скрипта (см. generate.sh в статье Защита канала по протоколу SSL), однако использование скрипта не рекомендуется для продуктовой среды.

  1. Выполните шаги 1-15 из приведенного выше примера С использованием механизма GSSAPI (Kerberos) (если они не были пройдены ранее). Убедитесь в наличии действующего тикета для принципала adb-to-kafka на стороне ADB.

  2. Сгенерируйте пару ключей по алгоритму RSA и создайте хранилище ключей keystore.jks на каждом хосте кластеров ADB и ADS:

    $ keytool -genkeypair -noprompt -keyalg RSA -ext san=dns:<hostname> -alias <hostname> -dname "CN=<hostname>, OU=AD, O=AD, L=MSK, S=MO, C=RU" -keystore /tmp/keystore.jks -storepass bigdata -keypass bigdata -validity 360 -keysize 2048

    где <hostname> — FQDN хоста, на котором запускается команда.

  3. Экспортируйте SSL-сертификаты, запустив следующую команду на каждом хосте ADB и ADS:

    $ keytool -exportcert -file /tmp/<hostname>.crt -keystore /tmp/keystore.jks -storepass bigdata -alias <hostname> -rfc

    где <hostname> — FQDN хоста, на котором запускается команда.

    Результат может выглядеть следующим образом (для хоста с FQDN bds-mdw.ru-central1.internal):

    Certificate stored in file </tmp/bds-mdw.ru-central1.internal.crt>
  4. Скопируйте созданные сертификаты с каждого хоста ADB и ADS на локальную машину. Ниже показан пример команды для Windows:

    $ pscp <user>@<hostname>:/tmp/<hostname>.crt /<local_folder>/

    где:

    • <user> — имя для подключения к хосту ADB/ADS по SSH;

    • <hostname> — FQDN хоста ADB/ADS;

    • <local_folder> — директория, созданная на локальной машине для сохранения всех сертификатов.

    Например, для хоста ADB bds-mdw.ru-central1.internal команда может выглядеть следующим образом:

    $ pscp dasha@bds-mdw:/tmp/bds-mdw.ru-central1.internal.crt /cert/

    В результате все сертификаты должны быть сохранены в одном месте:

        Directory: C:\cert
    
    
    Mode                 LastWriteTime         Length Name
    ----                 -------------         ------ ----
    -a----        12/25/2023  10:58 AM           1275 bds-ads1.ru-central1.internal.crt
    -a----        12/25/2023  10:58 AM           1275 bds-ads2.ru-central1.internal.crt
    -a----        12/25/2023  10:59 AM           1275 bds-ads3.ru-central1.internal.crt
    -a----        12/25/2023  10:58 AM           1271 bds-mdw.ru-central1.internal.crt
    -a----        12/25/2023  10:58 AM           1275 bds-sdw1.ru-central1.internal.crt
    -a----        12/25/2023  10:58 AM           1275 bds-sdw2.ru-central1.internal.crt
  5. Перенесите все сертификаты с локальной машины в директорию /tmp каждого хоста ADB и ADS, последовательно запустив следующую команду для всех хостов:

    $ pscp /<local_folder>/*.crt <user>@<hostname>:/tmp/

    Например, для хоста ADB bds-mdw.ru-central1.internal команда может выглядеть следующим образом:

    $ pscp /cert/*.crt dasha@bds-mdw:/tmp/
  6. Выполните импорт всех сертификатов в truststore.jks и хранилище CA-сертификатов ca-bundle.crt на каждом хосте ADB/ADS с помощью следующих команд:

    $ keytool -importcert -noprompt -alias <hostname> -file /tmp/<hostname>.crt -keystore /tmp/truststore.jks -storepass bigdata
    $ sudo bash -c "cat /tmp/<hostname>.crt >> /etc/pki/tls/certs/ca-bundle.crt"

    где <hostname> — FQDN хоста ADB/ADS. В нашем примере команды необходимо запустить 6 раз на каждом из 6 хостов ADB/ADS (для каждого из сертификатов).

  7. Импортируйте trustore.jks в Java CA store, запустив следующую команду на каждом хосте ADB/ADS:

    $ sudo keytool -importkeystore -noprompt -srckeystore /tmp/truststore.jks -destkeystore /etc/pki/java/cacerts -deststorepass changeit -srcstorepass bigdata

    Результат:

    Importing keystore /tmp/truststore.jks to /etc/pki/java/cacerts...
    Entry for alias bds-sdw1.ru-central1.internal successfully imported.
    Entry for alias bds-mdw.ru-central1.internal successfully imported.
    Entry for alias bds-ads1.ru-central1.internal successfully imported.
    Entry for alias bds-sdw2.ru-central1.internal successfully imported.
    Entry for alias bds-ads2.ru-central1.internal successfully imported.
    Entry for alias bds-ads3.ru-central1.internal successfully imported.
    Import command completed:  6 entries successfully imported, 0 entries failed or cancelled
  8. Создайте и импортируйте сертификаты OpenSSL, запустив следующие команды на каждом хосте ADB/ADS:

    $ sudo openssl req -new -newkey rsa:4096 -days 365 -nodes -x509 -subj "/C=RU/ST=Denial/L=MSK/O=AD/CN=<hostname>" -keyout /etc/ssl/host_cert.key  -out /etc/ssl/certs/host_cert.cert
    $ sudo bash -c "cat /etc/ssl/certs/host_cert.cert >> /etc/pki/tls/certs/ca-bundle.crt"

    где <hostname> — FQDN хоста, на котором запускается команда.

    Результат первой команды:

    Generating a 4096 bit RSA private key
    .....................................................++
    ...............................................................++
    writing new private key to '/etc/ssl/host_cert.key'
  9. Выполните включение SSL в кластере ADS в соответствии с разделом Включение SSL на кластере ADS при помощи ADCM.

  10. Убедитесь, что включение SSL для Kafka выполнено успешно, в соответствии с инструкциями раздела Проверка настроек SSL.

  11. На стороне кластера ADS создайте файл /tmp/client_ssl.properties:

    $ sudo vi /tmp/client_ssl.properties

    Укажите следующее содержимое файла:

    security.protocol=SASL_SSL
    sasl.mechanism=GSSAPI
    sasl.kerberos.service.name=kafka
  12. Подключитесь к базе данных adb на master-хосте кластера ADB под пользователем gpadmin (например, через psql):

    $ sudo su - gpadmin
    $ psql adb
  13. Создайте внешнюю таблицу со следующей структурой в ADB:

    CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_ssl (a INT, b TEXT)
    LOCATION (
        'pxf://topic_ssl?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092&SECURITY_PROTOCOL=SASL_SSL&SASL_MECHANISM=GSSAPI&KERBEROS_KEYTAB=/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab&KERBEROS_PRINCIPAL=adb-to-kafka&KERBEROS_SERVICE_NAME=kafka&TRUSTSTORE_LOCATION=/tmp/truststore.jks&TRUSTSTORE_PASSWORD=bigdata'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');

    В отличие от предыдущих примеров, при использовании SASL_SSL в параметре BOOTSTRAP_SERVERS необходимо указать FQDN хостов ADS. В противном случае вы получите следующую ошибку при попытке добавления данных в таблицу:

    ERROR:  PXF server error : Cannot acquire list of topics for server: bds-ads1:9092,bds-ads2:9092,bds-ads3:9092!  (seg1 10.92.40.81:10001 pid=10510)
    HINT:  Check the PXF logs located in the '/var/lib/pxf/logs' directory on host 'bds-sdw1' or 'set client_min_messages=LOG' for additional details.

    Также FQDN необходимо указать при чтении сообщений на стороне Kafka (см. шаг 15 ниже).

  14. Добавьте тестовые данные во внешнюю таблицу ext_adb_to_kafka_ssl:

    INSERT INTO ext_adb_to_kafka_ssl VALUES
    (1,'ssl-test-data');
  15. Убедитесь, что данные успешно получены на стороне кластера ADS. Для этого на хосте, где установлен сервис Kafka, перейдите в корневую директорию и последовательно выполните следующие команды. После ввода команды kinit требуется ввести пароль принципала adb-to-kafka (в нашем примере 123):

    $ kinit adb-to-kafka@ADS-KAFKA.LOCAL
    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/client.jaas"
    $ bin/kafka-console-consumer.sh --topic topic_ssl --from-beginning --bootstrap-server bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /tmp/client_ssl.properties

    Результат содержит одно прочитанное сообщение:

    Offset:0        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00 1I\xCDJ6|U\xBEw\x14\xD5\xA8\x5C>?\x02"\x02\x02\x02\x1Assl-test-data 1I\xCDJ6|U\xBEw\x14\xD5\xA8\x5C>?
    ^C[2023-12-26 08:30:14,242] WARN [Principal=null]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin)
    Processed a total of 1 messages
ПРИМЕЧАНИЕ

В опции --consumer.config необходимо указать путь к файлу client_ssl.properties, описанному выше на шаге 11.

Ошибки аутентификации

 

В тех случаях, когда при объявлении внешней таблицы ADB параметры для подключения указаны неверно, при попытке добавления данных в таблицу возвращается ошибка следующего вида:

ERROR:  PXF server error : Failed to create new KafkaAdminClient  (seg0 10.92.40.81:10000 pid=10513)
HINT:  Check the PXF logs located in the '/var/lib/pxf/logs' directory on host 'bds-sdw1' or 'set client_min_messages=LOG' for additional details.

Детали можно найти в логах PXF. Например, следующая запись говорит о неверном пароле TRUSTSTORE_PASSWORD:

Caused by: java.security.UnrecoverableKeyException: Password verification failed
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней