Использование коннектора ========================== Для чтения данных из кластера **Kafka** в кластер **ADB** необходимо воспользоваться внешней таблицей (*WEB EXTERNAL TABLE*), обращающейся к установленному consumer'у. Пример: :: create external web table ( , ...) EXECUTE '/usr/lib/adbkafka/adb-kafka-consumer -kafka-topic -zookeeper-hosts 2>>_$GP_SEGMENT_ID.log' ON ALL FORMAT 'CSV'; Директива *ON ALL* указывает, что из кластера **Kafka** данные запрашиваются всеми сегментами СУБД (максимальный уровень параллельности). .. important:: Число партиций в топике Kafka должно быть равно или больше числа сегментов ADB, запрашивающих данные. В случае, если партиций в топике меньше, чем сегментов в кластере, необходимо скорректировать число сегментов при создании внешней таблицы. Например, ``ON 2`` При необходимости указываются пользовательские значения разделителей и escape-символов. При этом доступны следующие опции consumer'а: + ``-consumer-group string`` -- название общей группы всех consumer'ов ADB. В рамках одной группы используется *read once* -- каждая строка читается ровно один раз (по умолчанию *ADB_consumers*); + ``-kafka-topic string`` -- название топика Kafka (обязательно); + ``-key`` -- флаг выдачи ключа каждой записи (опционально); + ``-timeout int`` -- таймаут ожидания данных; + ``*-timestamp`` -- флаг выдачи времени создания каждой записи (опционально); + ``-verbose`` -- флаг расширенного логирования (опционально); + ``-zookeeper-hosts string`` -- разделенные запятыми *хост:порт* узлов Zookeeper (по умолчанию *localhost:2181*). Для записи используется producer и внешняя таблица на запись (*WRITABLE EXTERNAL WEB TABLE*), соответственно: :: create writable external web table ( , ...) EXECUTE '/usr/lib/adbkafka/adb-kafka-producer -kafka-brokers -kafka-topic 2>>/tmp/_$GP_SEGMENT_ID.log' FORMAT 'CSV' DISTRIBUTED BY (); Доступны следующие опции producer'а: + ``-batch int`` -- размер батча при записи данных (по умолчанию *5000*); + ``-kafka-brokers string`` -- разделенные запятыми *хост:порт* Kafka брокеров (по умолчанию *localhost:9092*); + ``-kafka-topic string`` -- имя топика Kafka; + ``-pause int`` -- длительность паузы между записью батчей данных (по умолчанию *0*).