Kafka
This article introduces how to subscribe to PolarDB-X incremental data using Canal and deliver it to Kafka for consumption.
PolarDB-X CDC:
PolarDB-X is compatible with the standard Binlog protocol and can be used as a standalone MySQL. It now supports mainstream message queues, stream computing engines, and log services such as Kafka and Flink for subscription.
Environment Preparation:
It is recommended to operate on MacOS or Linux machines.
Environment Description:
Instance | Version Information | More |
---|---|---|
PolarDB-X | 2.0.1 | About PolarDB-X |
Kafka | 2.13 | About Kafka |
Canal | 1.1.5 | About Canal |
Prepare PolarDB-X:
Assuming you have already installed Docker, execute the following script to complete the installation of a standalone version of PolarDB-X, which will take approximately 1-2 minutes.
# Get the PolarDB-X image
docker pull polardbx/polardb-x:2.0.1
# Start PolarDB-X and expose port 8527, this may take 1-2 minutes
docker run -d --name polardbx-play -p 8527:8527 polardbx/polardb-x
# Verify the start with MySQL client
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
Notice: You can build a PolarDB-X cluster using PXD, K8S, etc. For more information, visit: PolarDB-X Installation.
Data Preparation:
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
-- Create test database
create database canal;
use canal;
-- Create test table
create table `trades` (
id integer auto_increment NOT NULL,
shop_id integer comment '店铺id',
pay_amount decimal comment '支付金额',
stat_date date comment '统计时间',
primary key(id)
);
-- Insert some data
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
Prepare Kafka:
Download the latest Kafka package:
# Unzip
tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0
# start zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
# start kafka:
bin/kafka-server-start.sh config/server.properties
Kafka server started successfully...
Prepare Canal:
Download and Install:
Download the latest Canal package:
# download
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
# unzip
tar -xzf canal.deployer-1.1.5.tar.gz
Modify Configuration Files:
There are many configuration items in the official configuration file of Canal. The following changes are required:
- Edit vi conf/canal/canal.properties to probe the Canal instance configuration:
canal.instance.tsdb.enable = false canal.destinations = example canal.conf.dir = ../conf # Configure Kafka connection information kafka.bootstrap.servers = 127.0.0.1:9092
- Edit vi conf/example/instance.properties to subscribe to PolarDB-X incremental binlog and write to Kafka:
# Configure Canal source as PolarDB-X connection information to subscribe to PolarDB-X binlog canal.instance.tsdb.enable=false canal.instance.master.address=127.0.0.1:8527 canal.instance.dbUsername=polardbx canal.instance.dbPassword=123456 # Define Kafka topic for writing canal.mq.topic=example
Start Canal:
# Start Canal
./bin/startup.sh
# View Canal logs
tail -f logs/canal/canal.log
#查看Canal instance日志:
tail -f logs/example/example.log
Canal started successfully...
PS: You can ignore the following warning, it does not affect normal operation:
Kafka Topic Consumption:
Kafka Consumer Consumption:
# Subscribe to the messages of topic "example"
bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server localhost:9092
Insert some more data:
-- Log in to PolarDB-X
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
insert trades values(default, 1001, 10, '2022-03-15');
Kafka Message Data Structure:
{
"data":[
{
"id":"100008",
"shop_id":"1001",
"pay_amount":"10",
"stat_date":"2022-03-15"
}
],
"database":"canal",
"es":1647950609000,
"id":4,
"isDdl":false,
"mysqlType":{
"id":"int(11)",
"shop_id":"int(11)",
"pay_amount":"decimal(10,0)",
"stat_date":"date"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"shop_id":4,
"pay_amount":3,
"stat_date":91
},
"table":"trades",
"ts":1647950609988,
"type":"INSERT"
}