Запись данных

Запись данных в ADTM обеспечивает консистентное обновление (вставка, обновление, удаление) объектов в Системе.

К записи данных предъявляются следующие ограничения и допущения:

  • Дельты загружаются строго последовательно;
  • Данные из сообщений одной дельты загружаются параллельно;
  • Дельты со стороны ETL ИС Поставщика выгружаются строго последовательно.

Запись данных имеет следующий процесс:

  • Горячие записи загружаются в таблицу данных;
  • Старые записи из таблицы данных перемещаются в таблицу истории.

Массивно-параллельная загрузка дельты

Массивно-параллельная загрузка дельты в ADTM обеспечивает массивно-параллельное консистентное обновление (вставка, обновление, удаление) объектов в Системе.

Массивно-параллельная загрузка дельты выполняет следующие назначения:

  • Создание внешних UPLOAD таблиц загрузки;
  • Определение начала и окончания загрузки дельты;
  • Загрузка данных дельты через внешние UPLOAD таблицы загрузки в физические таблицы хранилища данных, включая перенос старых записей из таблицы данных в таблицу истории.

К массивно-параллельной загрузке дельты предъявляются следующие ограничения и допущения:

  • Не предполагается нумерация входящих сообщений Kafka;
  • Входящие сообщения Kafka для каждого удаленного объекта содержат признак sys_op(int)=1;
  • Гарантируется, что данные из прочитанных (коммит чтения) сообщений Kafka загружены в Систему;
  • Окончание потока сообщений Kafka определяется по заданному в конфигурации таймауту ожидания сообщений kafka (максимально допустимый интервал времени ожидания сообщений kafka в рамках потока).

Входные данные:

  • JDBC подключение к Системе;
  • Топик Kafka, содержащий загружаемые данные.

Выходные данные SUCCESS:

  • Загружены данные в физические таблицы хранилища данных, включая перенос старых записей из таблицы данных в таблицу истории;
  • Выполнен коммит чтения для входящих сообщений Kafka;
  • Увеличен номер последней загруженной дельты в Системе.

Выходные данные FAIL:

  • Частично загружены данные в физические таблицы хранилища данных, включая перенос старых записей из таблицы данных в таблицу истории;
  • Выполнен коммит чтения для части входящих сообщения Kafka, которые были загружены в СУБД хранилища данных;
  • Не изменен номер последней загруженной дельты в Системе;
  • Сформировано сообщение об ошибке в ответ на входящий JDBC запрос;
  • Зафиксирована ошибка загрузки дельты.

Загрузка данных в ADB

Загрузка данных в ADB в ПО ADTM обеспечивает массивно-параллельную загрузку данных из Kafka в физические таблицы ADB.

Загрузка данных в ADB выполняет следующие назначения:

  • Загрузка горячих записей из топика Kafka в таблицу данных;
  • Перенос старых записей из таблицы данных в таблицу истории в рамках транзакции.

Входные данные:

  • Номер последней успешно загруженной дельты DELTA_OK;
  • Номер загружаемой дельты DELTA_HOT;
  • Kafka topicY, содержащий данные для загрузки;
  • JDBC подключение к ADB;
  • readable external table (tblExt), читающая данные из Kafka topicY;
  • Таблица staging (tbl_staging);
  • Таблица данных (tbl_actual);
  • Таблица истории (tbl_history).

Выходные данные:

  • Таблица данных содержит только актуальные данные (для момента времени DELTA_HOT);
  • Таблица истории содержит старые записи (для момента времени DELTA_HOT);
  • Таблица staging пуста.

К загрузке данных в ADB предъявляются следующие ограничения и допущения:

  • В таблице staging могут оказаться дублированные записи;
  • Данные в Kafka topicY содержат признак SysOp=1 для каждого удаленного объекта;
  • Перенос данных (удаление/вставка) выполняется в рамках каждого сегмента (не происходит перемещения данных между сегментами);
  • Старая запись может иметь только два состояния:
  • полностью перенесена (отсутствует в таблице данных, присутствует в таблице истории с требуемыми значениями системных полей);
  • не перенесена (присутствует в таблице данных, отсутствует в таблице истории).

Загрузка данных в ADQM

Загрузка данных в ADQM в ПО ADTM обеспечивает массивно-параллельную загрузку данных из Kafka в физические таблицы ADQM.

Загрузка данных в ADQM выполняет следующие назначения:

  • Загрузка горячих записей из заданного топика Kafka в таблицу данных;
  • Обновление состояния старых записей в таблице данных;
  • Обеспечение консистентной выборки в любой момент времени;
  • Перенос старых записей в отдельное физическое хранилище (диск, том).

К загрузке данных в ADQM предъявляются следующие ограничения и допущения:

  • Отсутствует таблица истории. Перенос старых записей в специальное хранилище осуществляется функционалом СУБД TTL;
  • Момент определения записей подлежащих переносу можно настраивать;
  • В момент загрузки дельты выборка может выполняться дольше (используется ключевое слово FINAL);
  • Запросы, начатые до момента начала загрузки дельты, не затрагивают записи, созданные в процессе загрузки дельты, в том числе записи сторно;
  • Повторные записи (полное совпадение полей) дедуплицируются при завершении загрузки дельты;
  • В таблице данных отсутствует признак удаления sys_op;
  • Предусмотрена работа в кластере.

Входные данные:

  • Номер актуальной дельты – @delta_ok;
  • Номер загружаемой дельты – @delta_hot;
  • Имя топика – topicY;
  • Константа @ttl_wait_sec – количество секунд после фактического устаревания записи (вставка записи) до готовности к переносу старых записей на другое хранилище;
  • Формат данных (AVRO);
  • ADQM:
  • таблица данных с указанием выражения TTL – tbl_actual (CollapsingMergeTree):
  • [ columns ];
  • sys_from (Int64);
  • sys_to (Int64);
  • close_date (DateTime64) – дата, с момента которой отсчитывается готовность к переносу на другое хранилище;
  • sign (Int8) – признак отмены записи (сторно).

Загрузка данных в ADG

К загрузке данных в ADG предъявляются следующие ограничения и допущения:

  • Движок таблиц memtx;
  • Требуется точное совпадение состава, типа и порядка полей во входящих сообщениях и структуре таблиц staging;
  • Допустим перенос данных в историческую таблицу и таблицу данных в процессе пополнения таблицы staging;
  • Перенос данных из таблицы staging в таблицу данных и перенос старых записей в таблицу истории должен происходить в рамках одной транзакции.

Входные данные:

  • @delta_hot – номер загружаемой дельты;
  • Входящие сообщения в топике Kafka topicY;
  • Кластер ADG:
  • таблица staging tbl1_staging:
  • (поля логической таблицы);
  • sys_op (int);
  • bucket_id (unsigned);
  • таблица данных tbl1_actual:
  • (поля логической таблицы);
  • sys_from (int) – последнее поле PK;
  • sys_to (int);
  • sys_op (int);
  • bucket_id (unsigned);
  • таблица истории tbl1_history:
  • (поля логической таблицы);
  • sys_from (int) – последнее поле PK;
  • sys_to (int);
  • sys_op (int);
  • bucket_id (unsigned).

Выходные данные:

  • Таблица данных содержит только актуальные данные (для момента времени @delta_hot);
  • Таблица истории содержит старые записи (для момента времени @delta_hot);
  • Таблица staging пуста.

Запись дельты (репликации)

Запись данных (репликация) в ADTM обеспечивает репликацию в части массивно-параллельной загрузки данных в реплику.

Запись данных (репликация) выполняет следующие назначения:

  • Принимает от Агента ПОДД (Подсистема обеспечения доступа к данным) входящий поток сообщений delta.in, содержащий дельту и системные данные дельты ( – номер дельты, ẟ-time – дата-время дельты);
  • Определяет состав и целостность полученной дельты;
  • Выполняет массивно-параллельную загрузку данных;
  • Устанавливает номер и дату-время загруженной дельты, равные номеру и дате-времени дельты оригинала;
  • Формирует ответ delta.in.rs для Агента ПОДД.

К записи данных (репликации) предъявляются следующие ограничения и допущения:

  • Загрузка дельт происходит строго последовательно – новая дельта после подтверждения загрузки предыдущей (delta.in.rs);
  • Загрузка дельты происходит в потоковом режиме – загрузка дельты в реплику начинается после получения первого сообщения delta.in и заканчивается в момент, когда загружены данные из всех сообщений дельты;
  • delta.in может содержать дубликаты сообщений;
  • Дата-время одинакового номера дельты оригинала и реплики совпадают;
  • Логическая схема данных реплики должна соответствовать загружаемым данным (мета-данные загружаемых данных определяются по логической схеме данных реплики);
  • ПОДД поддерживает протокол обмена в формате delta.rq, delta.rs, delta.in;
  • ПОДД является драйвером репликации.

Входные данные:

  • DM.JDBC подключение;
  • Поток сообщений delta.in, каждое из которых содержит:
  • – номер дельты;
  • ẟ-time – дата-время дельты;
  • sql-запрос подписки;
  • порядковый номер таблицы (номер потока);
  • общее количество таблиц (количество потоков);
  • порядковый номер сообщения в рамках таблицы (в рамках потока);
  • признак последнего сообщения в рамках таблицы (в рамках потока);
  • [ подмножество данных таблицы ].

Выходные данные:

  • Загруженная в реплику дельта (, ẟ-time);
  • Исходящее сообщение delta.in.rs.