Kafka to ADB Connector configuration

Objects to work with a connector

To send data from Kafka to ADB via Kafka to ADB Connector, you should first create the following objects on the ADB cluster side:

  1. Server — encapsulates connection information that a foreign data wrapper uses to access an external data source.

  2. Foreign table — a table in ADB that defines the remote data structure. A foreign table has no storage in ADB, but can be used in queries just like a normal table.

To see how to use the listed objects to send data from ADS to ADB, refer to Kafka to ADB Connector usage examples.

IMPORTANT
  • For each Kafka cluster, it is sufficient to create one server. However, there may be several foreign tables if you need, for example, to read data from multiple Kafka topics.

  • Options that are listed below can be defined both on the server and foreign table sides. Table-level options take precedence.

  • Values of all options should be defined in the text format (i.e. in single quotes ').

  • Option names that contain # or . should be surrounded with double quotation marks ". Otherwise, quotes are not required for option names.

Server

To create a server, use the CREATE SERVER command. The basic command syntax is listed below:

CREATE SERVER <server_name> [ TYPE '<server_type>' ] [ VERSION '<server_version>' ]
    FOREIGN DATA WRAPPER <fdw_name>
    [ OPTIONS ( [ <option> '<value>' [, ... ]] ) ]

where:

  • <server_name> — a server name in ADB. Should be unique within the current ADB database.

  • <server_type> — an optional server type.

  • <server_version> — an optional server version.

  • <fdw_name> — a foreign data wrapper name. You should use the kadb_fdw foreign data wrapper, which is automatically created after the connector installation (see step 4 in Kafka to ADB Connector installation).

  • <option> — server options that define the connection details. The options that are available for Kafka to ADB Connector are listed in the Server options table. Note that these options can be defined both at the server level and at the foreign table level. Options marked as required should be specified at least in one definition (server or table). Table-level options take precedence.

  • <value> — option values.

NOTE
  • To create a server, you need the USAGE privilege on the kadb_fdw foreign data wrapper. The user who creates the server becomes its owner.

  • The full description of the CREATE SERVER command syntax is available in the Greenplum documentation.

  • To edit a server definition, use the ALTER SERVER command. To delete a server, use DROP SERVER.

Server options
Name Description Default Required

k_brokers

A comma-separated list of Kafka brokers, each of which is specified in the following format: <hostname>:<port> or <IP>:<port>.

Alias of the librdkafka property bootstrap.servers

 — 

Yes

k_topic

A Kafka topic identifier (see Storage concepts in Kafka)

 — 

Yes

k_consumer_group

A Kafka consumer group identifier.

Alias of the librdkafka property group.id

 — 

Yes

k_seg_batch

A maximum number of Kafka messages that can be retrieved by each ADB segment.

Note that when queries with LIMIT conditions are executed, messages are still requested from Kafka in batches. As a result, offsets in the kadb.offsets table are set to the latest offsets of retrieved messages for each partition even if data of these messages are not present in the query result.

Since the k_seg_batch option limits the number of messages that can be read, there may be partitions from which no messages are retrieved by a single particular SELECT against a foreign table.

Positive integers are allowed

 — 

Yes

k_timeout_ms

A timeout for a message consumption request to Kafka (in milliseconds). Only messages available in Kafka during the period from the start of SELECT to the end of k_timeout_ms will be retrieved and presented as the SELECT result. Each segment consumes messages from all partitions assigned to it simultaneously.

Some SELECT queries can finish sooner if the specified Kafka partition contains enough messages (as soon as k_seg_batch messages are read by each segment). Most of all, the query duration is affected by the Kafka partitions, in which there are not enough messages: the connector expects new messages to appear in these partitions for k_timeout_ms milliseconds.

Pay attention that the actual duration of each SELECT query can be longer than the k_timeout_ms value.

At some stages of SELECT queries, you cannot terminate them until the k_timeout_ms interval expires.

Non-negative integers are allowed

 — 

Yes

format

Data serialization format. Possible values (case-insensitive):

 — 

Yes

k_latency_ms

A timeout for metadata requests to Kafka.

Set the value of this option to the maximum expected latency (across all ADB segments) of any metadata request to Kafka. The longest metadata request is made by the kadb.offsets_to_committed(<OID>) function — you can use it to determine the minimal k_latency_ms value.

Note that there are several metadata queries when SELECT to a foreign table is executed. If the timeout is exceeded, the query fails with an error.

Non-negative integers are allowed

2000

No

k_initial_offset

The initial offset that is used for partitions that have no corresponding entries in the kadb.offsets table yet. The option is used when the k_automatic_offsets flag is set and by offset management functions.

Non-negative integers are allowed

0

No

k_automatic_offsets

A flag the true value of which means the following:

  • Immediately before each SELECT to a foreign table:

    • Add partitions present in Kafka and absent in the kadb.offsets table to a set of partitions to read data from.

    • Automatically increase an initial offset of any partition to the lowest (earliest) offset available in Kafka. A NOTICE occurs when such increase takes place.

  • Immediately after each SELECT to a foreign table:

    • Update the kadb.offsets table by adding partitions present in Kafka and absent in ADB (via INSERT).

If k_automatic_offsets=false, an ERROR is raised when the smallest offset of an existing message in Kafka is greater than the offset in the kadb.offsets table (for any partition).

Note that partitions present in Kafka and absent in the kadb.offsets table are not visible for users if CURSOR is used: as mentioned above, data is inserted into kadb.offsets after SELECT execution, while CURSOR queries are constantly in progress.

After a successful SELECT query, offsets in the kadb.offsets table are updated regardless the k_automatic_offsets value — to reflect the number of messages read from Kafka

true

No

avro_schema

An AVRO schema that should be used for deserialization. Requires the JSON format.

Incoming messages are deserialized by Kafka to ADB Connector in one of the following ways:

  • If the avro_schema option is set, the user-provided schema is used for deserialization (incoming messages should still be in the AVRO OCF format).

  • Otherwise, a schema is extracted directly from incoming messages in the AVRO OCF format.

Note that a user-provided schema cannot be validated. If actual and provided schemas do not correspond, deserialization fails with the following error: ERROR: invalid memory alloc request size. Due to this, the avro_schema option should be used only for performance reasons and after careful analyze

 — 

No

csv_quote

A character that is used as a quotation mark when CSV is parsed.

Use a single character represented by one byte in the current encoding

"

No

csv_delimeter

A character that is used as a delimiter when CSV is parsed.

Use a single character represented by one byte in the current encoding

,

No

csv_null

A string that represents NULL values in CSV

Empty string

No

csv_ignore_header

A flag that indicates whether to ignore (do not parse) the first record of each CSV message, i.e. consider it as a header

false

 — 

csv_attribute_trim_whitespace

A flag that indicates whether to trim trailing whitespaces at the beginning and the end of each attribute (field) in CSV records

true

 — 

librdkafka properties

 
In addition to abovementioned options, when declaring a server and a foreign table, you can specify librdkafka properties.

A name of each librdkafka property should be started with # and enclosed into double quotes ". For example, "#bootstrap.servers", "#security.protocol", and so on.

librdkafka configuration properties take precedence over their aliases mentioned above.

The following librdkafka properties are forbidden to be set:

  • enable.auto.commit;

  • enable.partition.eof;

  • plugin.library.paths;

  • interceptors;

  • all properties whose names end with _cb.

Foreign table

To create a foreign table, use the CREATE FOREIGN TABLE command. The basic command syntax is listed below:

CREATE FOREIGN TABLE [ IF NOT EXISTS ] <table_name> ( [
    <column_name> <data_type> [ COLLATE <collation> ] [ <column_constraint> [ ... ] ]
      [, ... ]
] )
    SERVER <server_name>
  [ OPTIONS ( <option> '<value>' [, ... ] ) ]

where:

  • <table_name> — a foreign table name in ADB.

  • <column_name> — a column name.

  • <data_type> — a column data type.

  • <collation> — a column collation.

  • <column_constraint> — a constraint defined at the column level. The name <constraint_name> is given optionally. The syntax:

    [ CONSTRAINT <constraint_name> ]
    { NOT NULL |
      NULL |
      DEFAULT <default_expr> }

    Possible constraints:

    • NOT NULL — specifies that the column is not allowed to contain null values.

    • NULL — specifies that the column is allowed to contain null values. It is the default behavior (unless NOT NULL is specified).

    • DEFAULT — sets the default column value equal to <default_expr>.

  • <server_name> — a server name.

  • <option> — foreign table options. For Kafka to ADB Connector, all options defined at the server level can be overwritten at the foreign table level (partially or fully). When you specify an option for a server and a foreign table at the same time, the table level takes precedence.

  • <value> — option values.

NOTE
  • To create a foreign table, you need the USAGE privilege on the corresponding server and on all column data types that are used in the table definition. The user who creates the foreign table becomes its owner.

  • The full description of the CREATE FOREIGN TABLE command syntax is available in the Greenplum documentation.

  • To edit a foreign table definition, use the ALTER FOREIGN TABLE command. To drop a foreign table, use DROP FOREIGN TABLE.

kadb schema

During the Kafka to ADB Connector installation, the kadb schema is added to the default ADB database. This schema contains the kadb.offsets table and a set of functions that are used to synchronize offsets.

kadb.offsets table

The kadb.offsets table stores partition/offset mappings for each foreign table that has ever been created in the current ADB database. The kadb.offsets table provides the ability to store partitions outside of Kafka — on the consumer side. The kadb.offsets table structure is listed below.

Column Description

ftoid

A foreign table OID

prt

A Kafka partition identifier

off

An offset

OID is used to identify each foreign table. To define a foreign table OID, use the following query:

SELECT '<schema>.<table>'::regclass::oid;

where:

  • <schema> — a name of the schema, in which a foreign table is created. Can be omitted if a schema search path is properly configured.

  • <table> — a foreign table name.

On each SELECT query to a foreign table with the given OID, a request is issued to kadb.offsets. Then messages are read from Kafka starting with the offset specified in the kadb.offsets table for the selected table OID and partition. For example, if the offset for some partition is set to 15, the message with offset 15 will be read first from this Kafka partition.

A set of partitions and their offsets can be changed via common SQL queries to kadb.offsets. Additionally, you can use special functions for this purpose.

For new partitions that are not yet present in the kadb.offsets table, the initial offset is set to 0 by default. You can change that value via the k_initial_offset option.

After a successfull SELECT query against a foreign table, offsets are updated in the kadb.offsets table automatically based on the data obtained from Kafka. For example, if the last message read from some Kafka partition has offset 25, the kadb.offsets.off column for the selected partition and foreign table will contain 26. And the next time the data is read from Kafka, 26 will be used as the initial offset.

CAUTION

Currently, the kadb.offsets table is not backed up via gpbackup since it is not considered as a configuration table. When using Kafka to ADB Connector, you need to reserve the offsets on your own.

Functions

Kafka to ADB Connector supports several functions to synchronize offsets in Kafka and appropriate offsets in the kadb.offsets table. Functions are located in the kadb schema. When calling the functions, remember the following:

  • None of the functions provides transaction guarantees for Kafka — no assumptions can be made about what happens with offsets in Kafka before or after a function is called, even if that call is combined with a SELECT query to a foreign table in the same transaction.

  • Some functions are not atomic — they do not create a "snapshot" of all Kafka partitions, the result is obtained from each partition independently at slightly different points in time.

Functions for offset management
Function Parameters Description Atomic

kadb.commit_offsets(OID)

A foreign table OID

For a foreign table with the specified OID, commits offsets stored in kadb.offsets to Kafka

Yes

kadb.load_partitions(OID)

A foreign table OID

For a foreign table with the specified OID, loads partitions that exist in Kafka.

The function result contains the following fields:

  • ftoid — a foreign table OID;

  • prt — a partition identifier;

  • off — the k_initial_offset option value.

No

kadb.partitions_obtain(OID)

A foreign table OID

For a foreign table with the specified OID, adds partitions returned by the kadb.load_partitions(OID) function to kadb.offsets. Only new partitions are added; existing ones are left unchanged

No

kadb.partitions_clean(OID)

A foreign table OID

For a foreign table with the specified OID, deletes all entries about partitions, that do not exist in Kafka, from the kadb.offsets table

No

kadb.partitions_reset(OID)

A foreign table OID

For a foreign table with the specified OID, deletes all entries from the kadb.offsets table and adds entries returned by kadb.load_partitions(OID) instead

No

kadb.load_offsets_at_timestamp(OID, BIGINT)

  • A foreign table OID.

  • A timestamp — number of milliseconds since UNIX Epoch (UTC).

For a foreign table with the specified OID, loads the earliest offsets from Kafka, whose timestamps are greater or equal to the given timestamp. The result is returned only for partitions already present in the kadb.offsets table for the specified foreign table.

The function result contains the following fields:

  • ftoid — a foreign table OID;

  • prt — a partition identifier;

  • off — the offset value found in Kafka.

Yes

kadb.offsets_to_timestamp(OID, BIGINT)

  • A foreign table OID.

  • A timestamp — number of milliseconds since UNIX Epoch (UTC).

For a foreign table with the specified OID, changes offsets in the kadb.offsets table to the earliest offsets found in Kafka, whose timestamps are greater or equal to the given timestamp. Affects only partitions already present in kadb.offsets for the specified foreign table

Yes

kadb.load_offsets_earliest(OID)

Foreign table OID

For a foreign table with the specified OID, loads the earliest offsets from Kafka. The result is returned only for partitions already present in kadb.offsets for the specified foreign table.

The function result contains the following fields:

  • ftoid — a foreign table OID;

  • prt — a partition identifier;

  • off — the offset value found in Kafka.

No

kadb.offsets_to_earliest(OID)

Foreign table OID

For a foreign table with the specified OID, changes offsets in the kadb.offsets table to the earliest offsets found in Kafka. Affects only partitions already present in kadb.offsets for the specified foreign table

No

kadb.load_offsets_latest(OID)

Foreign table OID

For a foreign table with the specified OID, loads the latest offsets from Kafka. The result is returned only for partitions already present in kadb.offsets for the specified foreign table.

The function result contains the following fields:

  • ftoid — a foreign table OID;

  • prt — a partition identifier;

  • off — the offset value found in Kafka.

No

kadb.offsets_to_latest(OID)

Foreign table OID

For a foreign table with the specified OID, changes offsets in the kadb.offsets table to the latest offsets found in Kafka. Affects only partitions already present in kadb.offsets for the specified foreign table.

As a result, subsequent SELECT queries to the selected foreign table will return messages added to the corresponding Kafka partitions after the kadb.offsets_to_latest(OID) function call

No

kadb.load_offsets_committed(OID)

Foreign table OID

For a foreign table with the specified OID, loads the latest committed offsets from Kafka. The result is returned only for partitions already present in kadb.offsets for the specified foreign table.

The function result contains the following fields:

  • ftoid — a foreign table OID;

  • prt — a partition identifier;

  • off — the offset value found in Kafka.

Yes

kadb.offsets_to_committed(OID)

Foreign table OID

For a foreign table with the specified OID, changes offsets in the kadb.offsets table to the latest committed offsets found in Kafka. Affects only partitions already present in kadb.offsets for the specified foreign table

Yes

Supported formats

Kafka to ADB Connector supports deserialization of messages that were created in one of the following formats:

A deserialization method should be explicitly specified via the format option.

NOTE

Regardless of the format selected, only values of Kafka messages are deserialized. Message keys are ignored.

AVRO OCF

Kafka to ADB Connector supports the AVRO OCF serialization format with some limitations:

  • AVRO schemas should contain only primitive data types. Complex types are not supported. However, there are two exceptions:

    • union of any primitive type with null (except for such unions with themselves, i.e. unions of union and null);

    • fixed type, which is processed in the same way as bytes.

  • Logical types are supported.

  • The foreign table definition in ADB should match the actual AVRO schema. It is also important that the order of table columns matches the order of schema fields.

The table below shows possible options for converting AVRO data types into PostgreSQL types in Kafka to ADB Connector.

Supported data type mappings
AVRO data type PostgreSQL data type

string

TEXT, BPCHAR, VARCHAR

string

Custom PostgreSQL data type (e.g. MONEY). The conversion is the same as for user-provided textual input

null

Any PostgreSQL data type in a column without NOT NULL constraints

boolean

BOOLEAN

int

INTEGER

long

BIGINT

float

REAL

double

DOUBLE PRECISION

bytes, fixed

BYTEA

decimal

NUMERIC

date

DATE

time-millis, time-micros

TIME

timestamp-millis

TIMESTAMP(<N>), where <N> is equal to 1, 2, or 3

timestamp-micros

TIMESTAMP; TIMESTAMP(<N>), where <N> >= 4

duration

INTERVAL

The following are examples of AVRO schemas that are valid for Kafka to ADB Connector.

Examples of valid AVRO schemas
{
  "name":"doc",
  "type":"record",
  "fields":[
    {
      "name":"id",
      "type":"int"
    },
    {
      "name":"text",
      "type":[
        "string",
        "null"
      ]
    },
    {
      "name":"issued_on",
      "type":"int",
      "logicalType":"date"
    }
  ]
}
{
  "name":"doc",
  "type":"record",
  "fields":[
    {
      "name":"d",
      "type":"int",
      "logicalType":"date"
    },
    {
      "name":"t_ms",
      "type":"int",
      "logicalType":"time-millis"
    },
    {
      "name":"t_us",
      "type":"long",
      "logicalType":"time-micros"
    },
    {
      "name":"ts_ms",
      "type":"long",
      "logicalType":"timestamp-millis"
    },
    {
      "name":"ts_us",
      "type":"long",
      "logicalType":"timestamp-micros"
    },
    {
      "name":"dur",
      "type":{
        "name":"dur_fixed",
        "type":"fixed",
        "size":12,
        "logicalType":"duration"
      }
    },
    {
      "name":"dec_1",
      "type":{
        "name":"dec_2_fixed",
        "type":"fixed",
        "size":6,
        "logicalType":"decimal"
      }
    },
    {
      "name":"dec_2",
      "type":{
        "name":"dec_2_fixed",
        "type":"bytes",
        "logicalType":"decimal",
        "precision":14,
        "scale":4
      }
    }
  ]
}

CSV

CSV format support is provided in Kafka to ADB Connector by lbcsv. As a result, all conventions about CSV used by that library are applied. The CSV specification is defined in RFC 4180.

Taking into account the abovementioned recommendations, Kafka to ADB Connector uses the following CSV parsing rules:

  • CSV fields (attributes) are separated by a character that is specified via the csv_delimeter option.

  • CSV rows (records) are separated by a newline character sequence.

  • CSV fields can be enclosed into quotes specified via the csv_quote option.

  • CSV fields that contain a delimiter, a quote, or a newline character should be quoted.

  • Each quotation mark that is used inside of CSV fields should be escaped with a preceding quote character.

  • Empty fields are parsed as NULL values.

  • Empty rows are skipped.

  • Leading and trailing whitespaces are removed from non-quoted fields (if the csv_attribute_trim_whitespace option is set).

CSV values can be converted to any PostgreSQL datatype. The conversion is the same as for psql textual input.

Text

The text format is used to deserialize data presented in Kafka messages as raw text. When you select that format, messages are processed by connector as follows:

  • Each message read from Kafka is treated as one column (attribute) of a single tuple (row) in a foreign table.

  • The data is parsed by PostgreSQL as user-provided textual data.

  • Empty Kafka messages (with 0 length) are parsed as NULL values.

CAUTION

In order to use the text format for data deserialization, define exactly one column in the foreign table (of any PostgreSQL data type).

Implementation details

Below are the features of Kafka to ADB Connector implementation that have a significant impact on its usage.

Concurrent SELECT and global deadloack detector

As mentioned above, Kafka to ADB Connector refers to kadb.offsets at each SELECT query against foreign tables. The kadb.offsets table has a DISTRIBUTED REPLICATED type in ADB. Both INSERT and UPDATE queries are possible to that table. Thus, Greenplum limitations on concurrent transactions can impact SELECT queries to foreign tables.

The way concurrent (parallel) transactions are processed in Greenplum is defined by the current state of global deadlock detector.

When the global deadlock detector is disabled, each UPDATE query requires ExclusiveLock, which actually locks the entire table being updated. In Kafka to ADB Connector, this means that concurrent SELECT queries to different foreign tables are not possible. Such SELECT queries are executed sequentially, one at a time.

To allow multiple concurrent SELECT queries against different foreign tables, enable the global deadlock detector. With the detector enabled, each UPDATE query requires RowExclusiveLock, thus permitting multiple concurrent UPDATE queries to the kadb.offsets table, and hence multiple SELECT queries against foreign tables.

To enable the global deadlock detector, set the gp_enable_global_deadlock_detector GUC value to on:

$ gpconfig -c gp_enable_global_deadlock_detector -v on

Concurrent SELECT queries against a single foreign table are not possible. In some cases, such queries may succeed and even return correct results (the same for all concurrent queries). However, this is not guaranteed.

Distribution of partitions across segments

Each SELECT query against a foreign table considers only partitions that are present in the kadb.offsets table. The kadb.offsets contents can be modified before SELECT execution — automatically if the k_automatic_offsets option is set or manually by means of functions.

Partitions are distributed among ADB segments according to the following rules:

  • Distribution is performed evenly across all segments. The number of partitions assigned to a segment does not differ by more than one with respect to other segments.

  • The order of partitions (as returned by Kafka) and the order of ADB segments to which they are assigned match.

For example, in the ADB cluster that consists of 3 segments, partitions [0, 2, 3, 4, 1] returned from Kafka will be distributed as follows.

Segment Partitions

1

[0, 2]

2

[3, 4]

3

[1]

Query termination

Each query that involves librdkafka functions or SELECT calls against a foreign table can be terminated by users before it completes. A query is guaranteed to terminate in less than one second after all ADB segments receive a query termination signal.

However, active connections to Kafka are not closed immediately after query termination. Under normal conditions, connections will be closed within the time specified in the k_latency_ms option. But when Kafka does not respond (e.g. if Kerberos authentication fails), the connection may remain active for indefinite period of time.

To ensure all resources are released, close the ADB session within which the query was sent.

Duration of SELECT queries

If a SELECT query against a foreign table is not interrupted, its maximum duration can be estimated as follows:

The k_latency_ms and k_timeout_ms options affect the SELECT duration significantly. Thus, these options should be set properly.

Found a mistake? Seleсt text and press Ctrl+Enter to report it