Skip to content

Latest commit

 

History

History
538 lines (486 loc) · 20.4 KB

oceanbase-cdc.md

File metadata and controls

538 lines (486 loc) · 20.4 KB

OceanBase CDC Connector

The OceanBase CDC connector allows for reading snapshot data and incremental data from OceanBase. This document describes how to setup the OceanBase CDC connector to run SQL queries against OceanBase.

Dependencies

In order to setup the OceanBase CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-oceanbase-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.3-SNAPSHOT</version>
</dependency>

SQL Client JAR

Download link is available only for stable releases.

Download flink-sql-connector-oceanbase-cdc-2.3-SNAPSHOT.jar and put it under <FLINK_HOME>/lib/.

Note: flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as flink-sql-connector-oceanbase-cdc-XXX.jar, the released version will be available in the Maven central warehouse.

Setup OceanBase and LogProxy Server

  1. Setup the OceanBase cluster following the deployment doc.

  2. Create a user with password in sys tenant, this user is used in OceanBase LogProxy. See user management doc.

    mysql -h${host} -P${port} -uroot
    
    mysql> SHOW TENANT;
    mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}';
    mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
  3. Create a user in the tenant you want to monitor, this is used to read data for snapshot and change event.

  4. For users of OceanBase Community Edition, you need to get the rootserver-list. You can use the following command to get the value:

    mysql> show parameters like 'rootservice_list';

    For users of OceanBase Enterprise Edition, you need to get the config-url. You can use the following command to get the value:

    mysql> show parameters like 'obconfig_url';
  5. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the quick start.

How to create a OceanBase CDC table

The OceanBase CDC table can be defined as following:

-- checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';

-- register a OceanBase table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (
    order_id     INT,
    order_date   TIMESTAMP(0),
    customer_name STRING,
    price        DECIMAL(10, 5),
    product_id   INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase-cdc',
    'scan.startup.mode' = 'initial',
    'username' = 'user@test_tenant',
    'password' = 'pswd',
    'tenant-name' = 'test_tenant',
    'database-name' = 'test_db',
    'table-name' = 'orders',
    'hostname' = '127.0.0.1',
    'port' = '2881',
    'rootserver-list' = '127.0.0.1:2882:2881',
    'logproxy.host' = '127.0.0.1',
    'logproxy.port' = '2983');

-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;

Connector Options

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'oceanbase-cdc'.
scan.startup.mode required (none) String Specify the startup mode for OceanBase CDC consumer, valid enumerations are 'initial','latest-offset' or 'timestamp'.
scan.startup.timestamp optional (none) Long Timestamp in seconds of the start point, only used for 'timestamp' startup mode.
username required (none) String Username to be used when connecting to OceanBase.
password required (none) String Password to be used when connecting to OceanBase.
tenant-name required (none) String Tenant name of OceanBase to monitor.
database-name required (none) String Database name of OceanBase to monitor.
table-name required (none) String Table name of OceanBase to monitor.
hostname optional (none) String IP address or hostname of the OceanBase database server or OceanBase Proxy server.
port optional (none) Integer Integer port number to connect to OceanBase. It can be the SQL port of OceanBase server, which is 2881 by default, or the port of OceanBase proxy service, which is 2883 by default.
connect.timeout optional 30s Duration The maximum time that the connector should wait after trying to connect to the OceanBase database server before timing out.
server-time-zone optional UTC String The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in OceanBase converted to STRING in snapshot reading, please make sure to set it same with the timezone of `oblogproxy` deployment.
logproxy.host required (none) String Hostname or IP address of OceanBase log proxy service.
logproxy.port required (none) Integer Port number of OceanBase log proxy service.
logproxy.client.id optional By rule. String Id of a log proxy client connection, will be in format {flink_ip}_{process_id}_{timestamp}_{tenant}.{db}.{table} by default.
rootserver-list optional (none) String The semicolon-separated list of OceanBase root servers in format `ip:rpc_port:sql_port`, required for OceanBase CE.
config-url optional (none) String The url to get the server info from the config server, required for OceanBase EE.
working-mode optional storage String Working mode of `obcdc` in LogProxy, can be `storage` or `memory`.

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key DataType Description
tenant_name STRING NOT NULL Name of the tenant that contains the row.
database_name STRING NOT NULL Name of the database that contains the row.
table_name STRING NOT NULL Name of the table that contains the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE products (
    tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
   'connector' = 'oceanbase-cdc',
   'scan.startup.mode' = 'initial',
   'username' = 'user@test_tenant',
   'password' = 'pswd',
   'tenant-name' = 'test_tenant',
   'database-name' = 'test_db',
   'table-name' = 'orders',
   'hostname' = '127.0.0.1',
   'port' = '2881',
   'rootserver-list' = '127.0.0.1:2882:2881',
   'logproxy.host' = '127.0.0.1',
   'logproxy.port' = '2983');

Features

At-Least-Once Processing

The OceanBase CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with at-least-once processing.

OceanBase is a kind of distributed database whose log files are distributed on different servers. As there is no position information like MySQL binlog offset, we can only use timestamp as the position mark. In order to ensure the completeness of reading data, liboblog (a C++ library to read OceanBase log record) might read some log data before the given timestamp. So in this way we may read duplicate data whose timestamp is around the start point, and only 'at-least-once' can be guaranteed.

Startup Reading Position

The config option scan.startup.mode specifies the startup mode for OceanBase CDC consumer. The valid enumerations are:

  • initial: Performs an initial snapshot on the monitored table upon first startup, and continue to read the latest commit log.
  • latest-offset: Never to perform snapshot on the monitored table upon first startup and just read the latest commit log since the connector is started.
  • timestamp: Never to perform snapshot on the monitored table upon first startup and just read the commit log from the given scan.startup.timestamp.

Consume Commit Log

The OceanBase CDC Connector using oblogclient to consume commit log from OceanBase LogProxy.

DataStream Source

The OceanBase CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

public class OceanBaseSourceExample {

  public static void main(String[] args) throws Exception {
    SourceFunction<String> oceanBaseSource =
        OceanBaseSource.<String>builder()
            .rsList("127.0.0.1:2882:2881")  // set root server list
            .startupMode(StartupMode.INITIAL) // set startup mode
            .username("user@test_tenant")  // set cluster username
            .password("pswd")  // set cluster password
            .tenantName("test_tenant")  // set captured tenant name, do not support regex
            .databaseName("test_db")  // set captured database, support regex
            .tableName("test_table")  // set captured table, support regex
            .hostname("127.0.0.1")  // set hostname of OceanBase server or proxy
            .port(2881)  // set the sql port for OceanBase server or proxy
            .logProxyHost("127.0.0.1")  // set the hostname of log proxy
            .logProxyPort(2983)  // set the port of log proxy
            .deserializer(new JsonDebeziumDeserializationSchema())  // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint
    env.enableCheckpointing(3000);
    
    env.addSource(oceanBaseSource).print().setParallelism(1);

    env.execute("Print OceanBase Snapshot + Commit Log");
  }
}

Data Type Mapping

When the startup mode is not INITIAL, we will not be able to get the precision and scale of a column. In order to be compatible with different startup modes, we will not map one OceanBase type of different precision to different FLink types.

For example, you can get a boolean from a column with type BOOLEAN, TINYINT(1) or BIT(1). BOOLEAN is equivalent to TINYINT(1) in OceanBase, so columns of BOOLEAN and TINYINT types will be mapped to TINYINT in Flink, and BIT(1) will be mapped to BINARY(1) in Flink.

OceanBase type Flink SQL type NOTE
BOOLEAN
TINYINT
TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
where p <= 38
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
where 38 < p <=65
STRING DECIMAL is equivalent to NUMERIC. The precision for DECIMAL data type is up to 65 in OceanBase, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.
DATE DATE
TIME [(p)] TIME [(p)]
TIMESTAMP [(p)]
DATETIME [(p)]
TIMESTAMP [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈n/8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
STRING
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES
YEAR INT
ENUM STRING
SET STRING