Пример интеграции

  1. Создание таблицы для тестовых данных:
create table kafka_test_data (
id int,
text1 text,
text2 text,
some_number real,
dttm timestamp with time zone
) with (appendonly=true, orientation=column, compresstype=zstd, compresslevel=1)
distributed by (id);

create table kafka_test_data_input (like kafka_test_data) with (appendonly=true, orientation=column, compresstype=zstd, compresslevel=1)
distributed by (id);
  1. Генерация тестовых данных:
insert into kafka_test_data select gen, 'Some text #' || gen::text, 'Some another text #' || gen::text, gen/(gen+1)::real, timeofday()::timestamp from generate_series(1, 1000000) gen;
  1. Создание топика в Kafka с необходимым числом партиций (выполняется на сервере Kafka):
bin/kafka-topics.sh --create --zookeeper 10.0.214.15:2181 --replication-factor 1 --partitions 30 --topic mytopic7
  1. Создание WRITABLE-внешней партиции:
create writable external web table kafka_write_example (like kafka_test_data)
EXECUTE '/usr/lib/adbkafka/adb-kafka-producer -kafka-brokers 10.0.214.20:9092,10.0.214.21:9092 -kafka-topic mytopic7 -batch 100000 -pause 0 2>>/tmp/adb_kafka_producer$(date "+%Y-%m-%d")_$GP_SEGMENT_ID.log'
FORMAT 'CSV'
DISTRIBUTED BY (id);
  1. Перенос данных в Kafka:
insert into kafka_write_example select * from kafka_test_data;
  1. Создание READABLE-внешней таблицы:
create external web table kafka_read_example (like kafka_test_data)
EXECUTE '/usr/lib/adbkafka/adb-kafka-consumer -kafka-topic mytopic7 -zookeeper-hosts 10.0.214.15:2181 -consumer-group adb7 -timeout 20000 2>>/tmp/adb_kafka_consumer$(date "+%Y-%m-%d")_$GP_SEGMENT_ID.log' ON ALL
FORMAT 'CSV';
  1. Чтение данных:
select count(*) from kafka_read_example;