Kafka

This article introduces how to subscribe to PolarDB-X incremental data using Canal and deliver it to Kafka for consumption.

PolarDB-X CDC:

PolarDB-X is compatible with the standard Binlog protocol and can be used as a standalone MySQL. It now supports mainstream message queues, stream computing engines, and log services such as Kafka and Flink for subscription.

image.png

Environment Preparation:

It is recommended to operate on MacOS or Linux machines.

Environment Description:

Instance Version Information More
PolarDB-X 2.0.1 About PolarDB-X
Kafka 2.13 About Kafka
Canal 1.1.5 About Canal

Prepare PolarDB-X:

Assuming you have already installed Docker, execute the following script to complete the installation of a standalone version of PolarDB-X, which will take approximately 1-2 minutes.

# Get the PolarDB-X image
docker pull polardbx/polardb-x:2.0.1

# Start PolarDB-X and expose port 8527, this may take 1-2 minutes
docker run -d --name polardbx-play -p 8527:8527 polardbx/polardb-x

# Verify the start with MySQL client
mysql -h127.1 -P8527 -upolardbx_root -p"123456"

Notice: You can build a PolarDB-X cluster using PXD, K8S, etc. For more information, visit: PolarDB-X Installation.

Data Preparation:

mysql -h127.1 -P8527 -upolardbx_root -p"123456"

-- Create test database
create database canal;

use canal;

-- Create test table
create table `trades` (
  id integer auto_increment NOT NULL,
  shop_id integer comment '店铺id',
  pay_amount decimal  comment '支付金额', 
  stat_date date comment '统计时间',
  primary key(id)
);

-- Insert some data
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');

Prepare Kafka:

Download the latest Kafka package:

# Unzip
tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0

# start zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties

# start kafka:
bin/kafka-server-start.sh config/server.properties

Kafka server started successfully...

Prepare Canal:

Download and Install:

Download the latest Canal package:

# download
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

# unzip
tar -xzf canal.deployer-1.1.5.tar.gz

Modify Configuration Files:

There are many configuration items in the official configuration file of Canal. The following changes are required:

  • Edit vi conf/canal/canal.properties to probe the Canal instance configuration:
    canal.instance.tsdb.enable = false 
    canal.destinations = example
    canal.conf.dir = ../conf
    # Configure Kafka connection information
    kafka.bootstrap.servers = 127.0.0.1:9092
    
  • Edit vi conf/example/instance.properties to subscribe to PolarDB-X incremental binlog and write to Kafka:
    # Configure Canal source as PolarDB-X connection information to subscribe to PolarDB-X binlog
    canal.instance.tsdb.enable=false
    canal.instance.master.address=127.0.0.1:8527
    canal.instance.dbUsername=polardbx
    canal.instance.dbPassword=123456
    # Define Kafka topic for writing
    canal.mq.topic=example
    

Start Canal:

# Start Canal
./bin/startup.sh

# View Canal logs
tail -f logs/canal/canal.log

#查看Canal instance日志:
tail -f logs/example/example.log

Canal started successfully...

PS: You can ignore the following warning, it does not affect normal operation:

image.png

Kafka Topic Consumption:

Kafka Consumer Consumption:

# Subscribe to the messages of topic "example"
bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server localhost:9092

Insert some more data:

-- Log in to PolarDB-X
mysql -h127.1 -P8527 -upolardbx_root -p"123456"

insert trades values(default, 1001, 10, '2022-03-15');

Kafka Message Data Structure:

{
  "data":[
    {
      "id":"100008",
      "shop_id":"1001",
      "pay_amount":"10",
      "stat_date":"2022-03-15"
    }
  ],
  "database":"canal",
  "es":1647950609000,
  "id":4,
  "isDdl":false,
  "mysqlType":{
    "id":"int(11)",
    "shop_id":"int(11)",
    "pay_amount":"decimal(10,0)",
    "stat_date":"date"
  },
  "old":null, 
  "pkNames":[
    "id"
  ],
  "sql":"",
  "sqlType":{
    "id":4,
    "shop_id":4,
    "pay_amount":3,
    "stat_date":91
  },
  "table":"trades",
  "ts":1647950609988,
  "type":"INSERT"
}

results matching ""

    No results matching ""