This article introduces how to subscribe to PolarDB-X incremental data using Canal and deliver it to Kafka for consumption.

PolarDB-X CDC:

PolarDB-X is compatible with the standard Binlog protocol and can be used as a standalone MySQL. It now supports mainstream message queues, stream computing engines, and log services such as Kafka and Flink for subscription.


Environment Preparation:

It is recommended to operate on MacOS or Linux machines.

Environment Description:

Instance Version Information
PolarDB-X 2.0.1 About PolarDB-X
Kafka 2.13 About Kafka
Canal 1.1.5 About Canal

Prepare PolarDB-X:

Assuming you have already installed Docker, execute the following script to complete the installation of a standalone version of PolarDB-X, which will take approximately 1-2 minutes.

# Get the PolarDB-X image
docker pull polardbx/polardb-x:2.0.1

# Start PolarDB-X and expose port 8527, this may take 1-2 minutes
docker run -d --name polardbx-play -p 8527:8527 polardbx/polardb-x

# Verify the start with MySQL client
mysql -h127.1 -P8527 -upolardbx_root -p"123456"

Notice: You can build a PolarDB-X cluster using PXD, K8S, etc. For more information, visit: PolarDB-X Installation.

Data Preparation:

mysql -h127.1 -P8527 -upolardbx_root -p"123456"

-- Create test database
create database canal;

use canal;

-- Create test table
create table `trades` (
  id integer auto_increment NOT NULL,
  shop_id integer comment '店铺id',
  pay_amount decimal  comment '支付金额', 
  stat_date date comment '统计时间',
  primary key(id)

-- Insert some data
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');

Prepare Kafka:

Download the latest Kafka package:

# Unzip
tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0

# start zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties

# start kafka:
bin/kafka-server-start.sh config/server.properties

Kafka server started successfully...

Prepare Canal:

Download and Install:

Download the latest Canal package:

# download
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

# unzip
tar -xzf canal.deployer-1.1.5.tar.gz

Modify Configuration Files:

There are many configuration items in the official configuration file of Canal. The following changes are required:

  • Edit vi conf/canal/canal.properties to probe the Canal instance configuration:
    canal.instance.tsdb.enable = false 
    canal.destinations = example
    canal.conf.dir = ../conf
    # Configure Kafka connection information
    kafka.bootstrap.servers =
  • Edit vi conf/example/instance.properties to subscribe to PolarDB-X incremental binlog and write to Kafka:
    # Configure Canal source as PolarDB-X connection information to subscribe to PolarDB-X binlog
    # Define Kafka topic for writing

Start Canal:

# Start Canal

# View Canal logs
tail -f logs/canal/canal.log

#查看Canal instance日志:
tail -f logs/example/example.log

Canal started successfully...

PS: You can ignore the following warning, it does not affect normal operation:


Kafka Topic Consumption:

Kafka Consumer Consumption:

# Subscribe to the messages of topic "example"
bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server localhost:9092

Insert some more data:

-- Log in to PolarDB-X
mysql -h127.1 -P8527 -upolardbx_root -p"123456"

insert trades values(default, 1001, 10, '2022-03-15');

Kafka Message Data Structure:


