Использование коннектора

Для чтения данных из кластера Kafka в кластер ADB необходимо воспользоваться внешней таблицей (WEB EXTERNAL TABLE), обращающейся к установленному consumer’у. Пример:

create external web table <table_name> (<column_name> <data_type> , ...)
EXECUTE '/usr/lib/adbkafka/adb-kafka-consumer -kafka-topic <topic_name> -zookeeper-hosts <zookeeper_hosts> 2>><log_file>_$GP_SEGMENT_ID.log' ON ALL
FORMAT 'CSV';

Директива ON ALL указывает, что из кластера Kafka данные запрашиваются всеми сегментами СУБД (максимальный уровень параллельности).

Important

Число партиций в топике Kafka должно быть равно или больше числа сегментов ADB, запрашивающих данные. В случае, если партиций в топике меньше, чем сегментов в кластере, необходимо скорректировать число сегментов при создании внешней таблицы. Например, ON 2

При необходимости указываются пользовательские значения разделителей и escape-символов. При этом доступны следующие опции consumer’а:

  • -consumer-group string – название общей группы всех consumer’ов ADB. В рамках одной группы используется read once – каждая строка читается ровно один раз (по умолчанию ADB_consumers);
  • -kafka-topic string – название топика Kafka (обязательно);
  • -key – флаг выдачи ключа каждой записи (опционально);
  • -timeout int – таймаут ожидания данных;
  • *-timestamp – флаг выдачи времени создания каждой записи (опционально);
  • -verbose – флаг расширенного логирования (опционально);
  • -zookeeper-hosts string – разделенные запятыми хост:порт узлов Zookeeper (по умолчанию localhost:2181).

Для записи используется producer и внешняя таблица на запись (WRITABLE EXTERNAL WEB TABLE), соответственно:

create writable external web table <table_name> (<column_name> <data_type> , ...)
EXECUTE '/usr/lib/adbkafka/adb-kafka-producer -kafka-brokers <brokers> -kafka-topic <topic> 2>>/tmp/<log_file>_$GP_SEGMENT_ID.log'
FORMAT 'CSV'
DISTRIBUTED BY (<distribution_key>);

Доступны следующие опции producer’а:

  • -batch int – размер батча при записи данных (по умолчанию 5000);
  • -kafka-brokers string – разделенные запятыми хост:порт Kafka брокеров (по умолчанию localhost:9092);
  • -kafka-topic string – имя топика Kafka;
  • -pause int – длительность паузы между записью батчей данных (по умолчанию 0).