Запись данных¶
Запись данных в ADTM обеспечивает консистентное обновление (вставка, обновление, удаление) объектов в Системе.
К записи данных предъявляются следующие ограничения и допущения:
- Дельты загружаются строго последовательно;
- Данные из сообщений одной дельты загружаются параллельно;
- Дельты со стороны ETL ИС Поставщика выгружаются строго последовательно.
Запись данных имеет следующий процесс:
- Горячие записи загружаются в таблицу данных;
- Старые записи из таблицы данных перемещаются в таблицу истории.
Массивно-параллельная загрузка дельты¶
Массивно-параллельная загрузка дельты в ADTM обеспечивает массивно-параллельное консистентное обновление (вставка, обновление, удаление) объектов в Системе.
Массивно-параллельная загрузка дельты выполняет следующие назначения:
- Создание внешних UPLOAD таблиц загрузки;
- Определение начала и окончания загрузки дельты;
- Загрузка данных дельты через внешние UPLOAD таблицы загрузки в физические таблицы хранилища данных, включая перенос старых записей из таблицы данных в таблицу истории.
К массивно-параллельной загрузке дельты предъявляются следующие ограничения и допущения:
- Не предполагается нумерация входящих сообщений Kafka;
- Входящие сообщения Kafka для каждого удаленного объекта содержат признак
sys_op(int)=1
; - Гарантируется, что данные из прочитанных (коммит чтения) сообщений Kafka загружены в Систему;
- Окончание потока сообщений Kafka определяется по заданному в конфигурации таймауту ожидания сообщений kafka (максимально допустимый интервал времени ожидания сообщений kafka в рамках потока).
Входные данные:
- JDBC подключение к Системе;
- Топик Kafka, содержащий загружаемые данные.
Выходные данные SUCCESS:
- Загружены данные в физические таблицы хранилища данных, включая перенос старых записей из таблицы данных в таблицу истории;
- Выполнен коммит чтения для входящих сообщений Kafka;
- Увеличен номер последней загруженной дельты в Системе.
Выходные данные FAIL:
- Частично загружены данные в физические таблицы хранилища данных, включая перенос старых записей из таблицы данных в таблицу истории;
- Выполнен коммит чтения для части входящих сообщения Kafka, которые были загружены в СУБД хранилища данных;
- Не изменен номер последней загруженной дельты в Системе;
- Сформировано сообщение об ошибке в ответ на входящий JDBC запрос;
- Зафиксирована ошибка загрузки дельты.
Загрузка данных в ADB¶
Загрузка данных в ADB в ПО ADTM обеспечивает массивно-параллельную загрузку данных из Kafka в физические таблицы ADB.
Загрузка данных в ADB выполняет следующие назначения:
- Загрузка горячих записей из топика Kafka в таблицу данных;
- Перенос старых записей из таблицы данных в таблицу истории в рамках транзакции.
Входные данные:
- Номер последней успешно загруженной дельты
DELTA_OK
; - Номер загружаемой дельты
DELTA_HOT
; - Kafka topicY, содержащий данные для загрузки;
- JDBC подключение к ADB;
- readable external table (tblExt), читающая данные из Kafka topicY;
- Таблица staging (tbl_staging);
- Таблица данных (tbl_actual);
- Таблица истории (tbl_history).
Выходные данные:
- Таблица данных содержит только актуальные данные (для момента времени
DELTA_HOT
); - Таблица истории содержит старые записи (для момента времени
DELTA_HOT
); - Таблица staging пуста.
К загрузке данных в ADB предъявляются следующие ограничения и допущения:
- В таблице staging могут оказаться дублированные записи;
- Данные в Kafka topicY содержат признак
SysOp=1
для каждого удаленного объекта; - Перенос данных (удаление/вставка) выполняется в рамках каждого сегмента (не происходит перемещения данных между сегментами);
- Старая запись может иметь только два состояния:
- полностью перенесена (отсутствует в таблице данных, присутствует в таблице истории с требуемыми значениями системных полей);
- не перенесена (присутствует в таблице данных, отсутствует в таблице истории).
Загрузка данных в ADQM¶
Загрузка данных в ADQM в ПО ADTM обеспечивает массивно-параллельную загрузку данных из Kafka в физические таблицы ADQM.
Загрузка данных в ADQM выполняет следующие назначения:
- Загрузка горячих записей из заданного топика Kafka в таблицу данных;
- Обновление состояния старых записей в таблице данных;
- Обеспечение консистентной выборки в любой момент времени;
- Перенос старых записей в отдельное физическое хранилище (диск, том).
К загрузке данных в ADQM предъявляются следующие ограничения и допущения:
- Отсутствует таблица истории. Перенос старых записей в специальное хранилище осуществляется функционалом СУБД TTL;
- Момент определения записей подлежащих переносу можно настраивать;
- В момент загрузки дельты выборка может выполняться дольше (используется ключевое слово FINAL);
- Запросы, начатые до момента начала загрузки дельты, не затрагивают записи, созданные в процессе загрузки дельты, в том числе записи сторно;
- Повторные записи (полное совпадение полей) дедуплицируются при завершении загрузки дельты;
- В таблице данных отсутствует признак удаления
sys_op
; - Предусмотрена работа в кластере.
Входные данные:
- Номер актуальной дельты –
@delta_ok
; - Номер загружаемой дельты –
@delta_hot
; - Имя топика –
topicY
; - Константа
@ttl_wait_sec
– количество секунд после фактического устаревания записи (вставка записи) до готовности к переносу старых записей на другое хранилище; - Формат данных (AVRO);
- ADQM:
- таблица данных с указанием выражения TTL – tbl_actual (CollapsingMergeTree):
[ columns ]
;sys_from
(Int64);sys_to
(Int64);close_date
(DateTime64) – дата, с момента которой отсчитывается готовность к переносу на другое хранилище;sign
(Int8) – признак отмены записи (сторно).
Загрузка данных в ADG¶
К загрузке данных в ADG предъявляются следующие ограничения и допущения:
- Движок таблиц memtx;
- Требуется точное совпадение состава, типа и порядка полей во входящих сообщениях и структуре таблиц staging;
- Допустим перенос данных в историческую таблицу и таблицу данных в процессе пополнения таблицы staging;
- Перенос данных из таблицы staging в таблицу данных и перенос старых записей в таблицу истории должен происходить в рамках одной транзакции.
Входные данные:
@delta_hot
– номер загружаемой дельты;- Входящие сообщения в топике Kafka topicY;
- Кластер ADG:
- таблица staging
tbl1_staging
:
- (поля логической таблицы);
sys_op
(int);bucket_id
(unsigned);
- таблица данных
tbl1_actual
:
- (поля логической таблицы);
sys_from
(int) – последнее поле PK;sys_to
(int);sys_op
(int);bucket_id
(unsigned);
- таблица истории
tbl1_history
:
- (поля логической таблицы);
sys_from
(int) – последнее поле PK;sys_to
(int);sys_op
(int);bucket_id
(unsigned).
Выходные данные:
- Таблица данных содержит только актуальные данные (для момента времени
@delta_hot
); - Таблица истории содержит старые записи (для момента времени
@delta_hot
); - Таблица staging пуста.
Запись дельты (репликации)¶
Запись данных (репликация) в ADTM обеспечивает репликацию в части массивно-параллельной загрузки данных в реплику.
Запись данных (репликация) выполняет следующие назначения:
- Принимает от Агента ПОДД (Подсистема обеспечения доступа к данным) входящий поток сообщений
delta.in
, содержащий дельту и системные данные дельты (ẟ
– номер дельты,ẟ-time
– дата-время дельты); - Определяет состав и целостность полученной дельты;
- Выполняет массивно-параллельную загрузку данных;
- Устанавливает номер и дату-время загруженной дельты, равные номеру и дате-времени дельты оригинала;
- Формирует ответ
delta.in.rs
для Агента ПОДД.
К записи данных (репликации) предъявляются следующие ограничения и допущения:
- Загрузка дельт происходит строго последовательно – новая дельта после подтверждения загрузки предыдущей (
delta.in.rs
); - Загрузка дельты происходит в потоковом режиме – загрузка дельты в реплику начинается после получения первого сообщения
delta.in
и заканчивается в момент, когда загружены данные из всех сообщений дельты; delta.in
может содержать дубликаты сообщений;- Дата-время одинакового номера дельты оригинала и реплики совпадают;
- Логическая схема данных реплики должна соответствовать загружаемым данным (мета-данные загружаемых данных определяются по логической схеме данных реплики);
- ПОДД поддерживает протокол обмена в формате
delta.rq
,delta.rs
,delta.in
; - ПОДД является драйвером репликации.
Входные данные:
- DM.JDBC подключение;
- Поток сообщений
delta.in
, каждое из которых содержит:
ẟ
– номер дельты;ẟ-time
– дата-время дельты;- sql-запрос подписки;
- порядковый номер таблицы (номер потока);
- общее количество таблиц (количество потоков);
- порядковый номер сообщения в рамках таблицы (в рамках потока);
- признак последнего сообщения в рамках таблицы (в рамках потока);
- [ подмножество данных таблицы ].
Выходные данные:
- Загруженная в реплику дельта (
ẟ
,ẟ-time
); - Исходящее сообщение
delta.in.rs
.