Elasticsearch

In this example, we demonstrate how to import data into Elasticsearch using PolarDB-X with the aid of Flink-CDC.

PolarDB-X CDC:

PolarDB-X is compatible with the standard Binlog protocol and can be used as a standalone MySQL counterpart. It now supports mainstream message queues, stream computing engines, and log service subscriptions, including Kafka and Flink.

image.png

Preparing Components Required for the Tutorial

We assume you are running on a macOS or Linux machine and have already installed Docker.

Configure and Start Containers

Set up docker-compose.yml.

version: '2.1'
services:
  polardbx:
    polardbx:
    image: polardbx/polardb-x:2.0.1
    container_name: polardbx
    ports:
      - "8527:8527"
  elasticsearch:
    image: 'elastic/elasticsearch:7.6.0'
    container_name: elasticsearch
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
      - discovery.type=single-node
    ports:
      - '9200:9200'
      - '9300:9300'
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: 'elastic/kibana:7.6.0'
    container_name: kibana
    ports:
      - '5601:5601'
    volumes:
      - '/var/run/docker.sock:/var/run/docker.sock'

The Docker Compose file includes the following containers:

  • The product table products and the order table orders will be stored in this database. These two tables will be joined to produce a more informative order table enriched_orders.
  • Elasticsearch: The final order table enriched_orders will be written to Elasticsearch.
  • Kibana: Used to visualize data in Elasticsearch.

Execute the command below in the directory where docker-compose.yml is located to start the components needed for this tutorial:

docker-compose up -d

This command will automatically start all containers defined in the Docker Compose configuration in detached mode. You can observe whether the above containers have started normally using docker ps, or check if Kibana is running properly by visiting http://localhost:5601/.

Preparing Data:

Log in to PolarDB-X using the created username and password.

mysql -h127.0.0.1 -P8527 -upolardbx_root -p"123456"
CREATE DATABASE mydb;
USE mydb;

-- Create a product table and insert some data
CREATE TABLE products (
                          id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
                          name VARCHAR(255) NOT NULL,
                          description VARCHAR(512)
) AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");


-- Create an order table and insert some data
CREATE TABLE orders (
                        order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
                        order_date DATETIME NOT NULL,
                        customer_name VARCHAR(255) NOT NULL,
                        price DECIMAL(10, 5) NOT NULL,
                        product_id INTEGER NOT NULL,
                        order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
  1. Download Flink 1.13.2 and extract it to the directory flink-1.13.2
  2. Download the dependencies listed below and place them in the directory flink-1.13.2/lib/.

The download links are only valid for released versions; SNAPSHOT versions need to be compiled locally.

We can visit http://localhost:8081/ to see Flink running normally:

Flink UI

  1. Start the Flink SQL CLI:
    ./bin/sql-client.sh
    
-- Set interval time to 3 seconds                      
Flink SQL> SET execution.checkpointing.interval = 3s;

-- Create source1 - orders table
Flink SQL> CREATE TABLE orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '127.0.0.1',
    'port' = '8527',
    'username' = 'polardbx_root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'orders'
 );

-- Create source2 - products table
CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '127.0.0.1',
    'port' = '8527',
    'username' = 'polardbx_root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'products'
);

-- Create sink - enriched orders result table
Flink SQL> CREATE TABLE enriched_orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'enriched_orders'
 );

-- Execute read and write operations 
Flink SQL> INSERT INTO enriched_orders
  SELECT o.order_id,
    o.order_date,
    o.customer_name,
    o.price,
    o.product_id,
    o.order_status,
    p.name,
    p.description
 FROM orders AS o
 LEFT JOIN products AS p ON o.product_id = p.id;

View Data in Kibana

Visit http://localhost:5601/app/kibana#/management/kibana/index_pattern to create an index pattern enriched_orders.

Afterward, you can see the data written at http://localhost:5601/app/kibana#/discover.

Modify Data in the Monitored Tables and Observe Incremental Changes

Execute the following modifications in PolarDB-X one by one, refreshing Kibana after each step. You will notice that the order data displayed in Kibana updates in real time.

INSERT INTO orders VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

UPDATE orders SET order_status = true WHERE order_id = 10004;

DELETE FROM orders WHERE order_id = 10004;

Environment Cleanup

Execute the following command in the directory where the docker-compose.yml file is located

docker-compose down

Enter the deployment directory of Flink and stop the Flink cluster:

./bin/stop-cluster.sh

results matching ""

    No results matching ""