Использование коннектора¶
Для чтения данных из кластера Kafka в кластер ADB необходимо воспользоваться внешней таблицей (WEB EXTERNAL TABLE), обращающейся к установленному consumer’у. Пример:
create external web table <table_name> (<column_name> <data_type> , ...) EXECUTE '/usr/lib/adbkafka/adb-kafka-consumer -kafka-topic <topic_name> -zookeeper-hosts <zookeeper_hosts> 2>><log_file>_$GP_SEGMENT_ID.log' ON ALL FORMAT 'CSV';
Директива ON ALL указывает, что из кластера Kafka данные запрашиваются всеми сегментами СУБД (максимальный уровень параллельности).
Important
Число партиций в топике Kafka должно быть равно или больше числа сегментов ADB, запрашивающих данные. В случае, если партиций в топике меньше, чем сегментов в кластере, необходимо скорректировать число сегментов при создании внешней таблицы. Например, ON 2
При необходимости указываются пользовательские значения разделителей и escape-символов. При этом доступны следующие опции consumer’а:
-consumer-group string
– название общей группы всех consumer’ов ADB. В рамках одной группы используется read once – каждая строка читается ровно один раз (по умолчанию ADB_consumers);-kafka-topic string
– название топика Kafka (обязательно);-key
– флаг выдачи ключа каждой записи (опционально);-timeout int
– таймаут ожидания данных;*-timestamp
– флаг выдачи времени создания каждой записи (опционально);-verbose
– флаг расширенного логирования (опционально);-zookeeper-hosts string
– разделенные запятыми хост:порт узлов Zookeeper (по умолчанию localhost:2181).
Для записи используется producer и внешняя таблица на запись (WRITABLE EXTERNAL WEB TABLE), соответственно:
create writable external web table <table_name> (<column_name> <data_type> , ...) EXECUTE '/usr/lib/adbkafka/adb-kafka-producer -kafka-brokers <brokers> -kafka-topic <topic> 2>>/tmp/<log_file>_$GP_SEGMENT_ID.log' FORMAT 'CSV' DISTRIBUTED BY (<distribution_key>);
Доступны следующие опции producer’а:
-batch int
– размер батча при записи данных (по умолчанию 5000);-kafka-brokers string
– разделенные запятыми хост:порт Kafka брокеров (по умолчанию localhost:9092);-kafka-topic string
– имя топика Kafka;-pause int
– длительность паузы между записью батчей данных (по умолчанию 0).