最近更新时间:2024-06-21 10:46:59
准备工作的配置可以参考【KMR Flink同步MySQL数据到StarRocks 】
在MySQL中创建flink_cdc数据库,在库中创建orders和customers表。
通过以下两种方法连接StarRocks:
使用mysql命令连入StarRocks;
使用金山云 Serverless Starrocks Manager UI。
执行以下脚本,脚本的内容流程是,首先创建数据库flink_cdc,然后创建数据表orders,数据表customers,最后,基于ODS表创建DWD视图,基于DWD表创建DWS视图。
CREATE DATABASE IF NOT EXISTS `flink_cdc`;
USE `flink_cdc`;
CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` (
`order_id` INT NOT NULL COMMENT "",
`order_revenue` FLOAT NOT NULL COMMENT "",
`order_region` STRING NOT NULL COMMENT "",
`customer_id` INT NOT NULL COMMENT ""
) ENGINE=olap
PRIMARY KEY(`order_id`)
COMMENT ""
DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` (
`customer_id` INT NOT NULL COMMENT "",
`customer_age` FLOAT NOT NULL COMMENT "",
`customer_name` STRING NOT NULL COMMENT ""
) ENGINE=olap
PRIMARY KEY(`customer_id`)
COMMENT ""
DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
CREATE VIEW flink_cdc.dwd_order_customer_valid (
order_id,
order_revenue,
order_region,
customer_id,
customer_age,
customer_name
)
AS
SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name
FROM flink_cdc.customers c JOIN flink_cdc.orders o
ON c.customer_id=o.customer_id
WHERE c.customer_id != -1;
CREATE VIEW flink_cdc.dws_agg_by_region (
order_region,
order_cnt,
order_total_revenue)
AS
SELECT order_region, count(order_region), sum(order_revenue)
FROM flink_cdc.dwd_order_customer_valid
GROUP BY order_region;
参考【KMR Flink同步MySQL数据到StarRocks 】
在Flink的任务面板可以看出sql脚本中的两个视图插入StarRocks任务执行成功
登录MySQL数据库,在KRDS数据库窗口执行以下命令,向表orders和customers中插入数据。
INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1);
INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1);
INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "kmr_test");
连接StarRocks实例,查看表信息。
use flink_cdc;
select * from orders;
select * from customers;
返回信息如下所示。
MySQL [flink_cdc]> select * from orders;
+----------+---------------+--------------+-------------+
| order_id | order_revenue | order_region | customer_id |
+----------+---------------+--------------+-------------+
| 1 | 10 | beijing | 1 |
| 2 | 10 | beijing | 1 |
+----------+---------------+--------------+-------------+
2 rows in set (0.00 sec)
MySQL [flink_cdc]> select * from customers;
+-------------+--------------+---------------+
| customer_id | customer_age | customer_name |
+-------------+--------------+---------------+
| 1 | 22 | kmr_test |
+-------------+--------------+---------------+
1 row in set (0.00 sec)
执行以下命令,查询DWD层数据。
use flink_cdc;
select * from dwd_order_customer_valid;
返回信息如下所示。
MySQL [flink_cdc]> select * from dwd_order_customer_valid;
+----------+---------------+--------------+-------------+--------------+---------------+
| order_id | order_revenue | order_region | customer_id | customer_age | customer_name |
+----------+---------------+--------------+-------------+--------------+---------------+
| 1 | 10 | beijing | 1 | 22 | kmr_test |
| 2 | 10 | beijing | 1 | 22 | kmr_test |
+----------+---------------+--------------+-------------+--------------+---------------+
2 rows in set (0.00 sec)
执行以下命令,查询DWS层数据。
use flink_cdc;
select * from dws_agg_by_region;
返回信息如下所示。
MySQL [flink_cdc]> select * from dws_agg_by_region;
+--------------+-----------+---------------------+
| order_region | order_cnt | order_total_revenue |
+--------------+-----------+---------------------+
| beijing | 2 | 20 |
+--------------+-----------+---------------------+
1 row in set (0.01 sec)
纯净模式