Elasticsearch
In this example, we demonstrate how to import data into Elasticsearch using PolarDB-X with the aid of Flink-CDC.
PolarDB-X CDC:
PolarDB-X is compatible with the standard Binlog protocol and can be used as a standalone MySQL counterpart. It now supports mainstream message queues, stream computing engines, and log service subscriptions, including Kafka and Flink.
Preparing Components Required for the Tutorial
We assume you are running on a macOS or Linux machine and have already installed Docker.
Configure and Start Containers
Set up docker-compose.yml
.
version: '2.1'
services:
polardbx:
polardbx:
image: polardbx/polardb-x:2.0.1
container_name: polardbx
ports:
- "8527:8527"
elasticsearch:
image: 'elastic/elasticsearch:7.6.0'
container_name: elasticsearch
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- ES_JAVA_OPTS=-Xms512m -Xmx512m
- discovery.type=single-node
ports:
- '9200:9200'
- '9300:9300'
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: 'elastic/kibana:7.6.0'
container_name: kibana
ports:
- '5601:5601'
volumes:
- '/var/run/docker.sock:/var/run/docker.sock'
The Docker Compose file includes the following containers:
- The product table
products
and theorder
table orders will be stored in this database. These two tables will be joined to produce a more informative order table enriched_orders. - Elasticsearch: The final order table
enriched_orders
will be written to Elasticsearch. - Kibana: Used to visualize data in Elasticsearch.
Execute the command below in the directory where docker-compose.yml
is located to start the components needed for this tutorial:
docker-compose up -d
This command will automatically start all containers defined in the Docker Compose configuration in detached mode. You can observe whether the above containers have started normally using docker ps, or check if Kibana is running properly by visiting http://localhost:5601/.
Preparing Data:
Log in to PolarDB-X using the created username and password.
mysql -h127.0.0.1 -P8527 -upolardbx_root -p"123456"
CREATE DATABASE mydb;
USE mydb;
-- Create a product table and insert some data
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
) AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
-- Create an order table and insert some data
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
Download Flink and Required Dependencies
- Download Flink 1.13.2 and extract it to the directory
flink-1.13.2
- Download the dependencies listed below and place them in the directory flink-1.13.2/lib/.
The download links are only valid for released versions; SNAPSHOT versions need to be compiled locally.
- For subscribing to PolarDB-X Binlog: flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar
- For writing to Elasticsearch: flink-sql-connector-elasticsearch7_2.11-1.13.2.jar
- Start the Flink service:
./bin/start-cluster.sh
We can visit http://localhost:8081/ to see Flink running normally:
- Start the Flink SQL CLI:
./bin/sql-client.sh
Use Flink DDL in Flink SQL CLI to Create Tables
-- Set interval time to 3 seconds
Flink SQL> SET execution.checkpointing.interval = 3s;
-- Create source1 - orders table
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '8527',
'username' = 'polardbx_root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
-- Create source2 - products table
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '8527',
'username' = 'polardbx_root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
-- Create sink - enriched orders result table
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);
-- Execute read and write operations
Flink SQL> INSERT INTO enriched_orders
SELECT o.order_id,
o.order_date,
o.customer_name,
o.price,
o.product_id,
o.order_status,
p.name,
p.description
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id;
View Data in Kibana
Visit http://localhost:5601/app/kibana#/management/kibana/index_pattern to create an index pattern enriched_orders
.
Afterward, you can see the data written at http://localhost:5601/app/kibana#/discover.
Modify Data in the Monitored Tables and Observe Incremental Changes
Execute the following modifications in PolarDB-X one by one, refreshing Kibana after each step. You will notice that the order data displayed in Kibana updates in real time.
INSERT INTO orders VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
UPDATE orders SET order_status = true WHERE order_id = 10004;
DELETE FROM orders WHERE order_id = 10004;
Environment Cleanup
Execute the following command in the directory where the docker-compose.yml
file is located
docker-compose down
Enter the deployment directory of Flink and stop the Flink cluster:
./bin/stop-cluster.sh