Запись данных =============== Запись данных в **ADTM** обеспечивает консистентное обновление (вставка, обновление, удаление) объектов в Системе. К записи данных предъявляются следующие ограничения и допущения: + Дельты загружаются строго последовательно; + Данные из сообщений одной дельты загружаются параллельно; + Дельты со стороны ETL ИС Поставщика выгружаются строго последовательно. Запись данных имеет следующий процесс: + Горячие записи загружаются в таблицу данных; + Старые записи из таблицы данных перемещаются в таблицу истории. Массивно-параллельная загрузка дельты --------------------------------------- Массивно-параллельная загрузка дельты в **ADTM** обеспечивает массивно-параллельное консистентное обновление (вставка, обновление, удаление) объектов в Системе. Массивно-параллельная загрузка дельты выполняет следующие назначения: + Создание внешних UPLOAD таблиц загрузки; + Определение начала и окончания загрузки дельты; + Загрузка данных дельты через внешние UPLOAD таблицы загрузки в физические таблицы хранилища данных, включая перенос старых записей из таблицы данных в таблицу истории. К массивно-параллельной загрузке дельты предъявляются следующие ограничения и допущения: + Не предполагается нумерация входящих сообщений Kafka; + Входящие сообщения Kafka для каждого удаленного объекта содержат признак ``sys_op(int)=1``; + Гарантируется, что данные из прочитанных (коммит чтения) сообщений Kafka загружены в Систему; + Окончание потока сообщений Kafka определяется по заданному в конфигурации таймауту ожидания сообщений kafka (максимально допустимый интервал времени ожидания сообщений kafka в рамках потока). Входные данные: + JDBC подключение к Системе; + Топик Kafka, содержащий загружаемые данные. Выходные данные *SUCCESS*: + Загружены данные в физические таблицы хранилища данных, включая перенос старых записей из таблицы данных в таблицу истории; + Выполнен коммит чтения для входящих сообщений Kafka; + Увеличен номер последней загруженной дельты в Системе. Выходные данные *FAIL*: + Частично загружены данные в физические таблицы хранилища данных, включая перенос старых записей из таблицы данных в таблицу истории; + Выполнен коммит чтения для части входящих сообщения Kafka, которые были загружены в СУБД хранилища данных; + Не изменен номер последней загруженной дельты в Системе; + Сформировано сообщение об ошибке в ответ на входящий JDBC запрос; + Зафиксирована ошибка загрузки дельты. Загрузка данных в ADB ---------------------- Загрузка данных в **ADB** в ПО **ADTM** обеспечивает массивно-параллельную загрузку данных из **Kafka** в физические таблицы **ADB**. Загрузка данных в **ADB** выполняет следующие назначения: + Загрузка горячих записей из топика Kafka в таблицу данных; + Перенос старых записей из таблицы данных в таблицу истории в рамках транзакции. Входные данные: + Номер последней успешно загруженной дельты ``DELTA_OK``; + Номер загружаемой дельты ``DELTA_HOT``; + Kafka topicY, содержащий данные для загрузки; + JDBC подключение к ADB; + readable external table (tblExt), читающая данные из Kafka topicY; + Таблица staging (*tbl_staging*); + Таблица данных (*tbl_actual*); + Таблица истории (*tbl_history*). Выходные данные: + Таблица данных содержит только актуальные данные (для момента времени ``DELTA_HOT``); + Таблица истории содержит старые записи (для момента времени ``DELTA_HOT``); + Таблица staging пуста. К загрузке данных в **ADB** предъявляются следующие ограничения и допущения: + В таблице staging могут оказаться дублированные записи; + Данные в Kafka topicY содержат признак ``SysOp=1`` для каждого удаленного объекта; + Перенос данных (удаление/вставка) выполняется в рамках каждого сегмента (не происходит перемещения данных между сегментами); + Старая запись может иметь только два состояния: + полностью перенесена (отсутствует в таблице данных, присутствует в таблице истории с требуемыми значениями системных полей); + не перенесена (присутствует в таблице данных, отсутствует в таблице истории). Загрузка данных в ADQM ----------------------- Загрузка данных в **ADQM** в ПО **ADTM** обеспечивает массивно-параллельную загрузку данных из **Kafka** в физические таблицы **ADQM**. Загрузка данных в **ADQM** выполняет следующие назначения: + Загрузка горячих записей из заданного топика Kafka в таблицу данных; + Обновление состояния старых записей в таблице данных; + Обеспечение консистентной выборки в любой момент времени; + Перенос старых записей в отдельное физическое хранилище (диск, том). К загрузке данных в **ADQM** предъявляются следующие ограничения и допущения: + Отсутствует таблица истории. Перенос старых записей в специальное хранилище осуществляется функционалом СУБД TTL; + Момент определения записей подлежащих переносу можно настраивать; + В момент загрузки дельты выборка может выполняться дольше (используется ключевое слово *FINAL*); + Запросы, начатые до момента начала загрузки дельты, не затрагивают записи, созданные в процессе загрузки дельты, в том числе записи сторно; + Повторные записи (полное совпадение полей) дедуплицируются при завершении загрузки дельты; + В таблице данных отсутствует признак удаления ``sys_op``; + Предусмотрена работа в кластере. Входные данные: + Номер актуальной дельты -- ``@delta_ok``; + Номер загружаемой дельты -- ``@delta_hot``; + Имя топика -- ``topicY``; + Константа ``@ttl_wait_sec`` -- количество секунд после фактического устаревания записи (вставка записи) до готовности к переносу старых записей на другое хранилище; + Формат данных (AVRO); + ADQM: + таблица данных с указанием выражения TTL – tbl_actual (CollapsingMergeTree): + ``[ columns ]``; + ``sys_from`` (Int64); + ``sys_to`` (Int64); + ``close_date`` (DateTime64) -- дата, с момента которой отсчитывается готовность к переносу на другое хранилище; + ``sign`` (Int8) -- признак отмены записи (сторно). Загрузка данных в ADG ---------------------- К загрузке данных в **ADG** предъявляются следующие ограничения и допущения: + Движок таблиц memtx; + Требуется точное совпадение состава, типа и порядка полей во входящих сообщениях и структуре таблиц staging; + Допустим перенос данных в историческую таблицу и таблицу данных в процессе пополнения таблицы staging; + Перенос данных из таблицы staging в таблицу данных и перенос старых записей в таблицу истории должен происходить в рамках одной транзакции. Входные данные: + ``@delta_hot`` -- номер загружаемой дельты; + Входящие сообщения в топике Kafka topicY; + Кластер ADG: + таблица staging ``tbl1_staging``: + (поля логической таблицы); + ``sys_op`` (int); + ``bucket_id`` (unsigned); + таблица данных ``tbl1_actual``: + (поля логической таблицы); + ``sys_from`` (int) -- последнее поле PK; + ``sys_to`` (int); + ``sys_op`` (int); + ``bucket_id`` (unsigned); + таблица истории ``tbl1_history``: + (поля логической таблицы); + ``sys_from`` (int) -- последнее поле PK; + ``sys_to`` (int); + ``sys_op`` (int); + ``bucket_id`` (unsigned). Выходные данные: + Таблица данных содержит только актуальные данные (для момента времени ``@delta_hot``); + Таблица истории содержит старые записи (для момента времени ``@delta_hot``); + Таблица staging пуста. Запись дельты (репликации) --------------------------- Запись данных (репликация) в **ADTM** обеспечивает репликацию в части массивно-параллельной загрузки данных в реплику. Запись данных (репликация) выполняет следующие назначения: + Принимает от Агента ПОДД (Подсистема обеспечения доступа к данным) входящий поток сообщений ``delta.in``, содержащий дельту и системные данные дельты (``ẟ`` -- номер дельты, ``ẟ-time`` -- дата-время дельты); + Определяет состав и целостность полученной дельты; + Выполняет массивно-параллельную загрузку данных; + Устанавливает номер и дату-время загруженной дельты, равные номеру и дате-времени дельты оригинала; + Формирует ответ ``delta.in.rs`` для Агента ПОДД. К записи данных (репликации) предъявляются следующие ограничения и допущения: + Загрузка дельт происходит строго последовательно -- новая дельта после подтверждения загрузки предыдущей (``delta.in.rs``); + Загрузка дельты происходит в потоковом режиме -- загрузка дельты в реплику начинается после получения первого сообщения ``delta.in`` и заканчивается в момент, когда загружены данные из всех сообщений дельты; + ``delta.in`` может содержать дубликаты сообщений; + Дата-время одинакового номера дельты оригинала и реплики совпадают; + Логическая схема данных реплики должна соответствовать загружаемым данным (мета-данные загружаемых данных определяются по логической схеме данных реплики); + ПОДД поддерживает протокол обмена в формате ``delta.rq``, ``delta.rs``, ``delta.in``; + ПОДД является драйвером репликации. Входные данные: + DM.JDBC подключение; + Поток сообщений ``delta.in``, каждое из которых содержит: + ``ẟ`` -- номер дельты; + ``ẟ-time`` -- дата-время дельты; + sql-запрос подписки; + порядковый номер таблицы (номер потока); + общее количество таблиц (количество потоков); + порядковый номер сообщения в рамках таблицы (в рамках потока); + признак последнего сообщения в рамках таблицы (в рамках потока); + [ подмножество данных таблицы ]. Выходные данные: + Загруженная в реплику дельта (``ẟ``, ``ẟ-time``); + Исходящее сообщение ``delta.in.rs``.