Kafka
本篇内容介绍如何通过Canal订阅PolarDB-X增量数据,投递至Kafka进行消费。
PolarDB-X CDC:
PolarDB-X兼容标准Binlog协议,可以把它当做一个单机版的MySQL来使用,现支持Kafka、Flink等主流消息队列、流计算引擎、日志服务订阅。
环境准备:
建议通过MacOS或者Linux机器来进行操作。
环境说明:
实例 | 版本说明 | 更多 |
---|---|---|
PolarDB-X | 2.0.1 | 关于PolarDB-X |
Kafka | 2.13 | 关于Kafka |
Canal | 1.1.5 | 关于Canal |
准备PolarDB-X:
这里假设你已经安装了Docker,执行下列脚本即可完成单机版PolarDB-X的安装,大概需要1-2分钟。
# 获取PolarDB-X 镜像
docker pull polardbx/polardb-x:2.0.1
# 启动PolarDB-X, 并暴露8527端口, 这里可能需要1-2分钟
docker run -d --name polardbx-play -p 8527:8527 polardbx/polardb-x
# 通过MySQL客户端验证启动
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
Notice:可通过PXD、K8S等方式搭建PolarDB-X集群,更多可前往:PolarDB-X安装.
数据准备:
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
-- 创建测试库
create database canal;
use canal;
-- 创建测试表
create table `trades` (
id integer auto_increment NOT NULL,
shop_id integer comment '店铺id',
pay_amount decimal comment '支付金额',
stat_date date comment '统计时间',
primary key(id)
);
-- 写入一些数据
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
准备kafka:
下载 最新的Kafka安装包:
# 解压
tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0
# 启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka:
bin/kafka-server-start.sh config/server.properties
kafka server 启动完成...
准备Canal:
下载安装:
下载 最新的Canal安装包:
# 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
# 解压
tar -xzf canal.deployer-1.1.5.tar.gz
修改配置文件:
Canal的官方配置文件有很多配置项,其中需要改动配置如下:
- 编辑 vi conf/canal/canal.properties,用于探测Canal实例配置:
canal.instance.tsdb.enable = false canal.destinations = example canal.conf.dir = ../conf # 配置kafka连接信息 kafka.bootstrap.servers = 127.0.0.1:9092
- 编辑 vi conf/example/instance.properties,订阅PolarDB-X增量binlog,并写入Kafka:
#配置canal源为PolarDB-X的连接信息,用于订阅PolarDB-X的binlog canal.instance.tsdb.enable=false canal.instance.master.address=127.0.0.1:8527 canal.instance.dbUsername=polardbx canal.instance.dbPassword=123456 #定义写入kafka的topic canal.mq.topic=example
启动Canal:
#启动Canal
./bin/startup.sh
#查看Canal日志
tail -f logs/canal/canal.log
#查看Canal instance日志:
tail -f logs/example/example.log
Canal启动成功...
PS: 遇到下面这个警告,可以忽略,不影响正常运行:
Kafka Topic 消费:
Kafka Consumer消费:
# 订阅topic为"example"的消息
bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server localhost:9092
再次写入一些数据:
-- 登录PolarDB-X
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
insert trades values(default, 1001, 10, '2022-03-15');
Kafka消息数据结构:
{
"data":[
{
"id":"100008",
"shop_id":"1001",
"pay_amount":"10",
"stat_date":"2022-03-15"
}
],
"database":"canal",
"es":1647950609000,
"id":4,
"isDdl":false,
"mysqlType":{
"id":"int(11)",
"shop_id":"int(11)",
"pay_amount":"decimal(10,0)",
"stat_date":"date"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"shop_id":4,
"pay_amount":3,
"stat_date":91
},
"table":"trades",
"ts":1647950609988,
"type":"INSERT"
}