Kafka

本篇内容介绍如何通过Canal订阅PolarDB-X增量数据,投递至Kafka进行消费。

PolarDB-X CDC:

PolarDB-X兼容标准Binlog协议,可以把它当做一个单机版的MySQL来使用,现支持Kafka、Flink等主流消息队列、流计算引擎、日志服务订阅。

image.png

环境准备:

建议通过MacOS或者Linux机器来进行操作。

环境说明:

实例 版本说明 更多
PolarDB-X 2.0.1 关于PolarDB-X
Kafka 2.13 关于Kafka
Canal 1.1.5 关于Canal

准备PolarDB-X:

这里假设你已经安装了Docker,执行下列脚本即可完成单机版PolarDB-X的安装,大概需要1-2分钟。

# 获取PolarDB-X 镜像
docker pull polardbx/polardb-x:2.0.1

# 启动PolarDB-X, 并暴露8527端口, 这里可能需要1-2分钟
docker run -d --name polardbx-play -p 8527:8527 polardbx/polardb-x

# 通过MySQL客户端验证启动
mysql -h127.1 -P8527 -upolardbx_root -p"123456"

Notice:可通过PXD、K8S等方式搭建PolarDB-X集群,更多可前往:PolarDB-X安装.

数据准备:

mysql -h127.1 -P8527 -upolardbx_root -p"123456"

-- 创建测试库
create database canal;

use canal;

-- 创建测试表
create table `trades` (
  id integer auto_increment NOT NULL,
  shop_id integer comment '店铺id',
  pay_amount decimal  comment '支付金额', 
  stat_date date comment '统计时间',
  primary key(id)
);

-- 写入一些数据
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');

准备kafka:

下载 最新的Kafka安装包:

# 解压
tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0

# 启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动kafka:
bin/kafka-server-start.sh config/server.properties

kafka server 启动完成...

准备Canal:

下载安装:

下载 最新的Canal安装包:

# 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

# 解压
tar -xzf canal.deployer-1.1.5.tar.gz

修改配置文件:

Canal的官方配置文件有很多配置项,其中需要改动配置如下:

  • 编辑 vi conf/canal/canal.properties,用于探测Canal实例配置:
    canal.instance.tsdb.enable = false 
    canal.destinations = example
    canal.conf.dir = ../conf
    # 配置kafka连接信息
    kafka.bootstrap.servers = 127.0.0.1:9092
    
  • 编辑 vi conf/example/instance.properties,订阅PolarDB-X增量binlog,并写入Kafka:
    #配置canal源为PolarDB-X的连接信息,用于订阅PolarDB-X的binlog
    canal.instance.tsdb.enable=false
    canal.instance.master.address=127.0.0.1:8527
    canal.instance.dbUsername=polardbx
    canal.instance.dbPassword=123456
    #定义写入kafka的topic
    canal.mq.topic=example
    

启动Canal:

#启动Canal
./bin/startup.sh

#查看Canal日志
tail -f logs/canal/canal.log

#查看Canal instance日志:
tail -f logs/example/example.log

Canal启动成功...

PS: 遇到下面这个警告,可以忽略,不影响正常运行:

image.png

Kafka Topic 消费:

Kafka Consumer消费:

# 订阅topic为"example"的消息
bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server localhost:9092

再次写入一些数据:

-- 登录PolarDB-X
mysql -h127.1 -P8527 -upolardbx_root -p"123456"

insert trades values(default, 1001, 10, '2022-03-15');

Kafka消息数据结构:

{
  "data":[
    {
      "id":"100008",
      "shop_id":"1001",
      "pay_amount":"10",
      "stat_date":"2022-03-15"
    }
  ],
  "database":"canal",
  "es":1647950609000,
  "id":4,
  "isDdl":false,
  "mysqlType":{
    "id":"int(11)",
    "shop_id":"int(11)",
    "pay_amount":"decimal(10,0)",
    "stat_date":"date"
  },
  "old":null, 
  "pkNames":[
    "id"
  ],
  "sql":"",
  "sqlType":{
    "id":4,
    "shop_id":4,
    "pay_amount":3,
    "stat_date":91
  },
  "table":"trades",
  "ts":1647950609988,
  "type":"INSERT"
}

results matching ""

    No results matching ""