Kafka to ADB Connector configuration
Objects to work with a connector
To send data from Kafka to ADB via Kafka to ADB Connector, you should first create the following objects on the ADB cluster side:
-
Server — encapsulates connection information that a foreign data wrapper uses to access an external data source.
-
Foreign table — a table in ADB that defines the remote data structure. A foreign table has no storage in ADB, but can be used in queries just like a normal table.
To see how to use the listed objects to send data from ADS to ADB, refer to Kafka to ADB Connector usage examples.
IMPORTANT
|
Server
To create a server, use the CREATE SERVER
command. The basic command syntax is listed below:
CREATE SERVER <server_name> [ TYPE '<server_type>' ] [ VERSION '<server_version>' ]
FOREIGN DATA WRAPPER <fdw_name>
[ OPTIONS ( [ <option> '<value>' [, ... ]] ) ]
where:
-
<server_name>
— a server name in ADB. Should be unique within the current ADB database. -
<server_type>
— an optional server type. -
<server_version>
— an optional server version. -
<fdw_name>
— a foreign data wrapper name. You should use thekadb_fdw
foreign data wrapper, which is automatically created after the connector installation (see step 4 in Kafka to ADB Connector installation). -
<option>
— server options that define the connection details. The options that are available for Kafka to ADB Connector are listed in the Server options table. Note that these options can be defined both at the server level and at the foreign table level. Options marked as required should be specified at least in one definition (server or table). Table-level options take precedence. -
<value>
— option values.
NOTE
|
Name | Description | Default | Required |
---|---|---|---|
k_brokers |
A comma-separated list of Kafka brokers, each of which is specified in the following format: Alias of the librdkafka property |
— |
Yes |
k_topic |
A Kafka topic identifier (see Storage concepts in Kafka) |
— |
Yes |
k_consumer_group |
A Kafka consumer group identifier. Alias of the librdkafka property |
— |
Yes |
k_seg_batch |
A maximum number of Kafka messages that can be retrieved by each ADB segment. Note that when queries with Since the Positive integers are allowed |
— |
Yes |
k_timeout_ms |
A timeout for a message consumption request to Kafka (in milliseconds). Only messages available in Kafka during the period from the start of Some Pay attention that the actual duration of each At some stages of Non-negative integers are allowed |
— |
Yes |
format |
— |
Yes |
|
k_latency_ms |
A timeout for metadata requests to Kafka. Set the value of this option to the maximum expected latency (across all ADB segments) of any metadata request to Kafka. The longest metadata request is made by the Note that there are several metadata queries when Non-negative integers are allowed |
2000 |
No |
k_initial_offset |
The initial offset that is used for partitions that have no corresponding entries in the kadb.offsets table yet. The option is used when the Non-negative integers are allowed |
0 |
No |
k_automatic_offsets |
A flag the
If Note that partitions present in Kafka and absent in the kadb.offsets table are not visible for users if After a successful |
true |
No |
avro_schema |
An AVRO schema that should be used for deserialization. Requires the JSON format. Incoming messages are deserialized by Kafka to ADB Connector in one of the following ways: Note that a user-provided schema cannot be validated. If actual and provided schemas do not correspond, deserialization fails with the following error: |
— |
No |
csv_quote |
A character that is used as a quotation mark when CSV is parsed. Use a single character represented by one byte in the current encoding |
" |
No |
csv_delimeter |
A character that is used as a delimiter when CSV is parsed. Use a single character represented by one byte in the current encoding |
, |
No |
csv_null |
A string that represents |
Empty string |
No |
csv_ignore_header |
A flag that indicates whether to ignore (do not parse) the first record of each CSV message, i.e. consider it as a header |
false |
— |
csv_attribute_trim_whitespace |
A flag that indicates whether to trim trailing whitespaces at the beginning and the end of each attribute (field) in CSV records |
true |
— |
Foreign table
To create a foreign table, use the CREATE FOREIGN TABLE
command. The basic command syntax is listed below:
CREATE FOREIGN TABLE [ IF NOT EXISTS ] <table_name> ( [
<column_name> <data_type> [ COLLATE <collation> ] [ <column_constraint> [ ... ] ]
[, ... ]
] )
SERVER <server_name>
[ OPTIONS ( <option> '<value>' [, ... ] ) ]
where:
-
<table_name>
— a foreign table name in ADB. -
<column_name>
— a column name. -
<data_type>
— a column data type. -
<collation>
— a column collation. -
<column_constraint>
— a constraint defined at the column level. The name<constraint_name>
is given optionally. The syntax:[ CONSTRAINT <constraint_name> ] { NOT NULL | NULL | DEFAULT <default_expr> }
Possible constraints:
-
NOT NULL
— specifies that the column is not allowed to contain null values. -
NULL
— specifies that the column is allowed to contain null values. It is the default behavior (unlessNOT NULL
is specified). -
DEFAULT
— sets the default column value equal to<default_expr>
.
-
-
<server_name>
— a server name. -
<option>
— foreign table options. For Kafka to ADB Connector, all options defined at the server level can be overwritten at the foreign table level (partially or fully). When you specify an option for a server and a foreign table at the same time, the table level takes precedence. -
<value>
— option values.
NOTE
|
kadb schema
During the Kafka to ADB Connector installation, the kadb
schema is added to the default ADB database. This schema contains the kadb.offsets table and a set of functions that are used to synchronize offsets.
kadb.offsets table
The kadb.offsets
table stores partition/offset mappings for each foreign table that has ever been created in the current ADB database. The kadb.offsets
table provides the ability to store partitions outside of Kafka — on the consumer side. The kadb.offsets
table structure is listed below.
Column | Description |
---|---|
ftoid |
A foreign table OID |
prt |
A Kafka partition identifier |
off |
An offset |
OID is used to identify each foreign table. To define a foreign table OID, use the following query:
SELECT '<schema>.<table>'::regclass::oid;
where:
-
<schema>
— a name of the schema, in which a foreign table is created. Can be omitted if a schema search path is properly configured. -
<table>
— a foreign table name.
On each SELECT
query to a foreign table with the given OID, a request is issued to kadb.offsets
. Then messages are read from Kafka starting with the offset specified in the kadb.offsets
table for the selected table OID and partition. For example, if the offset for some partition is set to 15
, the message with offset 15
will be read first from this Kafka partition.
A set of partitions and their offsets can be changed via common SQL queries to kadb.offsets
. Additionally, you can use special functions for this purpose.
For new partitions that are not yet present in the kadb.offsets
table, the initial offset is set to 0
by default. You can change that value via the k_initial_offset
option.
After a successfull SELECT
query against a foreign table, offsets are updated in the kadb.offsets
table automatically based on the data obtained from Kafka. For example, if the last message read from some Kafka partition has offset 25
, the kadb.offsets.off
column for the selected partition and foreign table will contain 26
. And the next time the data is read from Kafka, 26
will be used as the initial offset.
CAUTION
Currently, the |
Functions
Kafka to ADB Connector supports several functions to synchronize offsets in Kafka and appropriate offsets in the kadb.offsets table. Functions are located in the kadb
schema. When calling the functions, remember the following:
-
None of the functions provides transaction guarantees for Kafka — no assumptions can be made about what happens with offsets in Kafka before or after a function is called, even if that call is combined with a
SELECT
query to a foreign table in the same transaction. -
Some functions are not atomic — they do not create a "snapshot" of all Kafka partitions, the result is obtained from each partition independently at slightly different points in time.
Function | Parameters | Description | Atomic |
---|---|---|---|
kadb.commit_offsets(OID) |
A foreign table OID |
For a foreign table with the specified OID, commits offsets stored in |
Yes |
kadb.load_partitions(OID) |
A foreign table OID |
For a foreign table with the specified OID, loads partitions that exist in Kafka. The function result contains the following fields:
|
No |
kadb.partitions_obtain(OID) |
A foreign table OID |
For a foreign table with the specified OID, adds partitions returned by the |
No |
kadb.partitions_clean(OID) |
A foreign table OID |
For a foreign table with the specified OID, deletes all entries about partitions, that do not exist in Kafka, from the |
No |
kadb.partitions_reset(OID) |
A foreign table OID |
For a foreign table with the specified OID, deletes all entries from the |
No |
kadb.load_offsets_at_timestamp(OID, BIGINT) |
|
For a foreign table with the specified OID, loads the earliest offsets from Kafka, whose timestamps are greater or equal to the given timestamp. The result is returned only for partitions already present in the The function result contains the following fields:
|
Yes |
kadb.offsets_to_timestamp(OID, BIGINT) |
|
For a foreign table with the specified OID, changes offsets in the |
Yes |
kadb.load_offsets_earliest(OID) |
Foreign table OID |
For a foreign table with the specified OID, loads the earliest offsets from Kafka. The result is returned only for partitions already present in The function result contains the following fields:
|
No |
kadb.offsets_to_earliest(OID) |
Foreign table OID |
For a foreign table with the specified OID, changes offsets in the |
No |
kadb.load_offsets_latest(OID) |
Foreign table OID |
For a foreign table with the specified OID, loads the latest offsets from Kafka. The result is returned only for partitions already present in The function result contains the following fields:
|
No |
kadb.offsets_to_latest(OID) |
Foreign table OID |
For a foreign table with the specified OID, changes offsets in the As a result, subsequent |
No |
kadb.load_offsets_committed(OID) |
Foreign table OID |
For a foreign table with the specified OID, loads the latest committed offsets from Kafka. The result is returned only for partitions already present in The function result contains the following fields:
|
Yes |
kadb.offsets_to_committed(OID) |
Foreign table OID |
For a foreign table with the specified OID, changes offsets in the |
Yes |
Supported formats
Kafka to ADB Connector supports deserialization of messages that were created in one of the following formats:
A deserialization method should be explicitly specified via the format
option.
NOTE
Regardless of the format selected, only values of Kafka messages are deserialized. Message keys are ignored. |
AVRO OCF
Kafka to ADB Connector supports the AVRO OCF serialization format with some limitations:
-
AVRO schemas should contain only primitive data types. Complex types are not supported. However, there are two exceptions:
-
union
of any primitive type withnull
(except for such unions with themselves, i.e. unions ofunion
andnull
); -
fixed
type, which is processed in the same way asbytes
.
-
-
Logical types are supported.
-
The foreign table definition in ADB should match the actual AVRO schema. It is also important that the order of table columns matches the order of schema fields.
The table below shows possible options for converting AVRO data types into PostgreSQL types in Kafka to ADB Connector.
AVRO data type | PostgreSQL data type |
---|---|
string |
TEXT, BPCHAR, VARCHAR |
string |
Custom PostgreSQL data type (e.g. |
null |
Any PostgreSQL data type in a column without |
boolean |
BOOLEAN |
int |
INTEGER |
long |
BIGINT |
float |
REAL |
double |
DOUBLE PRECISION |
bytes, fixed |
BYTEA |
decimal |
NUMERIC |
date |
DATE |
time-millis, time-micros |
TIME |
timestamp-millis |
TIMESTAMP(<N>), where |
timestamp-micros |
TIMESTAMP; TIMESTAMP(<N>), where |
duration |
INTERVAL |
The following are examples of AVRO schemas that are valid for Kafka to ADB Connector.
{
"name":"doc",
"type":"record",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"text",
"type":[
"string",
"null"
]
},
{
"name":"issued_on",
"type":"int",
"logicalType":"date"
}
]
}
{
"name":"doc",
"type":"record",
"fields":[
{
"name":"d",
"type":"int",
"logicalType":"date"
},
{
"name":"t_ms",
"type":"int",
"logicalType":"time-millis"
},
{
"name":"t_us",
"type":"long",
"logicalType":"time-micros"
},
{
"name":"ts_ms",
"type":"long",
"logicalType":"timestamp-millis"
},
{
"name":"ts_us",
"type":"long",
"logicalType":"timestamp-micros"
},
{
"name":"dur",
"type":{
"name":"dur_fixed",
"type":"fixed",
"size":12,
"logicalType":"duration"
}
},
{
"name":"dec_1",
"type":{
"name":"dec_2_fixed",
"type":"fixed",
"size":6,
"logicalType":"decimal"
}
},
{
"name":"dec_2",
"type":{
"name":"dec_2_fixed",
"type":"bytes",
"logicalType":"decimal",
"precision":14,
"scale":4
}
}
]
}
CSV
CSV format support is provided in Kafka to ADB Connector by lbcsv. As a result, all conventions about CSV used by that library are applied. The CSV specification is defined in RFC 4180.
Taking into account the abovementioned recommendations, Kafka to ADB Connector uses the following CSV parsing rules:
-
CSV fields (attributes) are separated by a character that is specified via the
csv_delimeter
option. -
CSV rows (records) are separated by a newline character sequence.
-
CSV fields can be enclosed into quotes specified via the
csv_quote
option. -
CSV fields that contain a delimiter, a quote, or a newline character should be quoted.
-
Each quotation mark that is used inside of CSV fields should be escaped with a preceding quote character.
-
Empty fields are parsed as
NULL
values. -
Empty rows are skipped.
-
Leading and trailing whitespaces are removed from non-quoted fields (if the
csv_attribute_trim_whitespace
option is set).
CSV values can be converted to any PostgreSQL datatype. The conversion is the same as for psql textual input.
Text
The text format is used to deserialize data presented in Kafka messages as raw text. When you select that format, messages are processed by connector as follows:
-
Each message read from Kafka is treated as one column (attribute) of a single tuple (row) in a foreign table.
-
The data is parsed by PostgreSQL as user-provided textual data.
-
Empty Kafka messages (with
0
length) are parsed asNULL
values.
CAUTION
In order to use the |
Implementation details
Below are the features of Kafka to ADB Connector implementation that have a significant impact on its usage.
Concurrent SELECT and global deadloack detector
As mentioned above, Kafka to ADB Connector refers to kadb.offsets
at each SELECT
query against foreign tables. The kadb.offsets
table has a DISTRIBUTED REPLICATED
type in ADB. Both INSERT
and UPDATE
queries are possible to that table. Thus, Greenplum limitations on concurrent transactions can impact SELECT
queries to foreign tables.
The way concurrent (parallel) transactions are processed in Greenplum is defined by the current state of global deadlock detector.
When the global deadlock detector is disabled, each UPDATE
query requires ExclusiveLock
, which actually locks the entire table being updated. In Kafka to ADB Connector, this means that concurrent SELECT
queries to different foreign tables are not possible. Such SELECT
queries are executed sequentially, one at a time.
To allow multiple concurrent SELECT
queries against different foreign tables, enable the global deadlock detector. With the detector enabled, each UPDATE
query requires RowExclusiveLock
, thus permitting multiple concurrent UPDATE
queries to the kadb.offsets
table, and hence multiple SELECT
queries against foreign tables.
To enable the global deadlock detector, set the gp_enable_global_deadlock_detector
GUC value to on
:
$ gpconfig -c gp_enable_global_deadlock_detector -v on
Concurrent SELECT
queries against a single foreign table are not possible. In some cases, such queries may succeed and even return correct results (the same for all concurrent queries). However, this is not guaranteed.
Distribution of partitions across segments
Each SELECT
query against a foreign table considers only partitions that are present in the kadb.offsets table. The kadb.offsets
contents can be modified before SELECT
execution — automatically if the k_automatic_offsets
option is set or manually by means of functions.
Partitions are distributed among ADB segments according to the following rules:
-
Distribution is performed evenly across all segments. The number of partitions assigned to a segment does not differ by more than one with respect to other segments.
-
The order of partitions (as returned by Kafka) and the order of ADB segments to which they are assigned match.
For example, in the ADB cluster that consists of 3
segments, partitions [0, 2, 3, 4, 1]
returned from Kafka will be distributed as follows.
Segment | Partitions |
---|---|
1 |
[0, 2] |
2 |
[3, 4] |
3 |
[1] |
Query termination
Each query that involves librdkafka functions or SELECT
calls against a foreign table can be terminated by users before it completes. A query is guaranteed to terminate in less than one second after all ADB segments receive a query termination signal.
However, active connections to Kafka are not closed immediately after query termination. Under normal conditions, connections will be closed within the time specified in the k_latency_ms
option. But when Kafka does not respond (e.g. if Kerberos authentication fails), the connection may remain active for indefinite period of time.
To ensure all resources are released, close the ADB session within which the query was sent.
Duration of SELECT queries
If a SELECT
query against a foreign table is not interrupted, its maximum duration can be estimated as follows:
The k_latency_ms
and k_timeout_ms
options affect the SELECT
duration significantly. Thus, these options should be set properly.