Flink
This article explains how to subscribe to real-time incremental data from PolarDB-X through Flink, and set up a real-time data dashboard with a simple configuration example (zero code).
PolarDB-X CDC:
PolarDB-X is compatible with the standard Binlog protocol and can be used as a standalone version of MySQL. It now supports mainstream message queues, stream processing engines, and log service subscriptions such as Kafka and Flink.
Demonstration Environment Description:
It is recommended to operate on a macOS or Linux machine.
Environment Version Description:
Instance | Version Description |
---|---|
PolarDB-X | >= 2.0.1 |
Flink | >=1.13.6 |
Flink-CDC | >= 2.2 |
Prepare the PolarDB-X Environment:
PolarDB-X Installation:
Assuming you have already installed Docker, execute the following script to complete the installation of PolarDB-X, which takes about 1-2 minutes.
# Get PolarDB-X image
docker pull polardbx/polardb-x:2.0.1
# Start PolarDB-X and expose port 8527, this may take 1-2 minutes
docker run -d --name polardbx-play -p 8527:8527 polardbx/polardb-x
# Verify the startup via MySQL client
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
Notice: PolarDB-X cluster deployment methods include PXD and K8S, for more refer to: PolarDB-X Installation。
Prepare the Flink Environment:
- Install Flink
- Download Flink Connector(jar包):
- Connector for MySQL-CDC: Subscribe to PolarDB-X Binlog
- Connector For Jdbc: For Jdbc writing
- Connector For MySQL: For MySQL writing
```shell
Download the Flink package and unzip
wget https://dlcdn.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz tar -xzvf flink-1.13.6-bin-scala_2.11.tgz
Flink plugins are stored as jar files in the ${FLINK_HOME}/lib directory
cd flink-1.13.6/lib
Download the Flink-CDC plugin, used to subscribe to incremental logs
Download Flink-JDBC plugin: Flink-MySQL driver, for writing to PolarDB-X
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.6/flink-connector-jdbc_2.11-1.13.6.jar wget https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
Go back to ${FLINK_HOME}, and start the Flink server
cd .. ./bin/start-cluster.sh
For more about Flink, visit the [Flink official website](https://flink.apache.org/) and [Flink-cdc official website](https://ververica.github.io/flink-cdc-connectors).
### Prepare Data:
#### PolarDB-X Data:
- trades table: Simulate a trading table for user payment behavior
- shop_gmv_d table: Simulate a real-time statistics table for daily transaction amount
```sql
# Log in to PolarDB-X:
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
# Create a database:
create database flink_cdc_demo;
use flink_cdc_demo;
# Orders table.
create table `trades` (
id integer auto_increment NOT NULL,
shop_id integer comment 'shop id',
pay_amount decimal comment 'payment amount',
stat_date date comment 'statistics date',
primary key(id)
) comment ='Trade Table' dbpartition by hash(id);
# Simulate user payment behavior, each order amount is 10.
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
# For Flink to write into gmv.
create table `shop_gmv_d` (
stat_date date comment 'statistics date',
shop_id integer comment 'shop id',
gmv decimal comment 'total turnover',
primary key(stat_date, shop_id)
) comment = 'Daily Real-time Transaction Amount';
Flink Test Tables:
# Log in to the Flink client
./bin/sql-client.sh
# Set the checkpoint interval to 3 seconds
SET 'execution.checkpointing.interval' = '3s';
# Create a Flink source table to subscribe to PolarDB-X incremental data:
CREATE TABLE trades (
id integer,
shop_id integer,
pay_amount decimal,
stat_date date,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '8527',
'username' = 'polardbx_root',
'password' = '123456',
'database-name' = 'flink_cdc_demo',
'table-name' = 'trades'
);
# Observe changes in PolarDB-X data:
select * from trades;
# Create a Flink sink table to write the statistical data back to PolarDB-X:
CREATE TABLE shop_gmv_d (
stat_date date,
shop_id integer,
gmv decimal,
primary key(stat_date, shop_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:8527/flink_cdc_demo',
'username' = 'polardbx_root',
'password' = '123456',
'table-name' = 'shop_gmv_d'
);
# Calculate the total turnover for the day in real-time and write it back to PolarDB-X
insert into shop_gmv_d
select stat_date, shop_id, sum(pay_amount) as gmv
from trades group by stat_date, shop_id;
Example Screenshot:
Log in to PolarDB-X to verify the Flink write results:
# Log in to PolarDB-X
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
# Change the database
use flink_cdc_demo;
# Observe the changes in transaction amount
select * from shop_gmv_d;
# Continue to simulate user order placement behavior
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
# Observe the changes in transaction amount
select * from shop_gmv_d;
Continue to simulate the payment behavior in trades, and you can observe that the value of gmv is constantly increasing:
Grafana Configuration:
Data changes can be observed more intuitively through visualization tools:
Download Grafana:
# Download Grafana
wget https://mirrors.huaweicloud.com/grafana/7.1.5/grafana-7.1.5.darwin-amd64.tar.gz
# Unzip
tar -xzvf grafana-7.1.5.darwin-amd64.tar.gz
cd grafana-7.1.5
Modify Parameters:
Change the default refresh interval from 5s to 60ms to enable real-time chart refresh:
vi conf/defaults.ini
# Modify the parameter
min_refresh_interval = 60ms
# Start Grafana
./bin/grafana-server web
Access the Grafana dashboard at http://localhost:3000 with the default username and password both being 'admin'.
Configure Dashboard
Add a new data source: Settings -> Data Source -> Add Data Source
Add a new Dashboard and modify the configuration:
- Visualization: Choose "Stat"
- SQL: "select gmv from shop_gmv_d"
- Format: "Table"
Refresh the data in real-time: