Использование коннектора¶
Для чтения данных из кластера Kafka в кластер ADB необходимо воспользоваться внешней таблицей (WEB EXTERNAL TABLE), обращающейся к установленному consumer’у. Пример:
create external web table <table_name> (<column_name> <data_type> , ...) EXECUTE '/usr/lib/adbkafka/adb-kafka-consumer -kafka-topic <topic_name> -zookeeper-hosts <zookeeper_hosts> 2>><log_file>_$GP_SEGMENT_ID.log' ON ALL FORMAT 'CSV';
Директива ON ALL указывает, что из кластера Kafka данные запрашиваются всеми сегментами СУБД (максимальный уровень параллельности).
Important
Число партиций в топике Kafka должно быть равно или больше числа сегментов ADB, запрашивающих данные. В случае, если партиций в топике меньше, чем сегментов в кластере, необходимо скорректировать число сегментов при создании внешней таблицы. Например, ON 2
При необходимости указываются пользовательские значения разделителей и escape-символов. При этом доступны следующие опции consumer’а:
- -consumer-group string – название общей группы всех consumer’ов ADB. В рамках одной группы используется read once – каждая строка читается ровно один раз (по умолчанию ADB_consumers);
- -kafka-topic string – название топика Kafka (обязательно);
- -key – флаг выдачи ключа каждой записи (опционально);
- -timeout int – таймаут ожидания данных;
- -timestamp – флаг выдачи времени создания каждой записи (опционально);
- -verbose – флаг расширенного логирования (опционально);
- -zookeeper-hosts string – разделенные запятыми хост:порт узлов Zookeeper (по умолчачнию localhost:2181).
Для записи используется producer и внешняя таблица на запись (WRITABLE EXTERNAL WEB TABLE), соответственно:
create writable external web table <table_name> (<column_name> <data_type> , ...) EXECUTE '/usr/lib/adbkafka/adb-kafka-producer -kafka-brokers <brokers> -kafka-topic <topic> 2>>/tmp/<log_file>_$GP_SEGMENT_ID.log' FORMAT 'CSV' DISTRIBUTED BY (<distribution_key>);
Доступны следующие опции producer’а:
- -batch int – размер батча при заиси данных (по умолчачнию 5000);
- -kafka-brokers string – разделенные запятыми хост:порт Kafka брокеров (по умолчачнию localhost:9092);
- -kafka-topic string – имя топика Kafka;
- -pause int – длительность паузы между записью батчей данных (по умолчачнию 0).