最近更新时间:2024-06-21 10:46:59
已创建KRDS for MySQL,详情请参见创建KRDS MySQL实例。
已创建Serverless StarRocks实例,创建最新版本3.2.6,详情请参见创建Serverless StarRocks实例。
已创建Serverless Flink集群,详情请参见创建Serverless Flink集群。
进入托管Hadoop的控制台,然后点击创建集群,在基础信息配置的可选组件中选择flink。
您可进入云数据库MySQL控制台创建实例,注意要和Serverless Flink、Serverless StarRocks集群在同一个VPC下。
StarRocks 数据源配置时,StarRocks集群对应的集群信息、数据库用户名密码需填写正确;
填写的数据库用户名信息,必须拥有相应数据库表的读写权限,来保障任务数据能够被正常读取或写入 StarRocks 中。
EMR StarRocks 集群和Serverless Flink,KRDS MySQL的 VPC 必须一致。若 VPC 不一致时,则需要在 StarRocks 集群信息的Manager访问安全配置栏中,选择Manager白名单,并添加入方向规则。
首先连接KRDS MySQL实例,详情请参见登录数据库,然后执行以下命令,创建数据库test_sr,以及数据表test_sr.score_board。
CREATE DATABASE test_sr;
CREATE TABLE test_sr.score_board(
id int(11) NOT NULL COMMENT "",
name varchar(255) NULL DEFAULT "" COMMENT "",
score int(11) NOT NULL DEFAULT "0" COMMENT "",
PRIMARY KEY(id)
);
向score_board表中插入若干条数据。
MySQL [test]> INSERT INTO score_board values
(1, 'Bob', 21),
(2, 'Stan', 21),
(3, 'Sam', 22),
(4, 'Tony', 22),
(5, 'Alice', 22),
(6, 'Lucy', 23),
(7, 'Polly', 23),
(8, 'Tom', 23),
(9, 'Rose', 24),
(10, 'Jerry', 24),
(11, 'Jason', 24),
(12, 'Lily', 25),
(13, 'Stephen', 25),
(14, 'David', 25),
(15, 'Eddie', 26),
(16, 'Kate', 27),
(17, 'Cathy', 27),
(18, 'Judy', 27),
(19, 'Julia', 28),
(20, 'Robert', 28),
(21, 'Jack', 29);
可以通过点击集群信息的Manager UI链接登录StarRocks 集群,创建与上述MySQL集群一样的表结构
CREATE DATABASE test_sr;
USE test_sr;
CREATE TABLE test_sr.score_board(
id int(11) NOT NULL COMMENT "",
name varchar(65533) NULL DEFAULT "" COMMENT "",
score int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id);
使用SSH方式登录KMR集群,下载的Connector用于Flink和StarRocks、KRDS MySQL连接,也可以下载到本地并上传到KMR集群,将Connector放在/opt/kmr/flink/flink-1.17.1/lib目录下。
# 进入lib目录
cd /opt/kmr/flink/flink-1.17.1/lib
# starrocks connector
wget https://repo1.maven.org/maven2/com/starrocks/flink-connector-starrocks/1.2.9_flink-1.17/flink-connector-starrocks-1.2.9_flink-1.17.jar
# mysql connector
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
启动Flink YARN Session并运行SQL客户端。
su hadoop
cd /opt/kmr/flink/flink-1.17.1/lib
yarn-session.sh -d
sql-clint.sh
根据要待导出数据的 MySQL表,在 Flink 中创建一张表,例如 mysql_test
,并配置读取任务属性,包括设置 Flink Connector 和库表的信息:
CREATE TABLE
mysql_test (`id` INT, `name` STRING, `score` INT)
WITH
(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://17x.xx.xxx.xxx:3306/test_sr',
'table-name' = 'score_board',
'username' = 'admin',
'password' = 'xxxxxxxx'
);
根据要待导入数据的 StarRocks 表,在 Flink 中创建一张表,例如 starrocks_test
,并配置读取任务属性,包括设置 Flink Connector 和库表的信息:
CREATE TABLE starrocks_test
(
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://1xx.xx.252.xxx:9030',
'load-url'='1xx.xx.252.xxx:8030',-- 由于starrocks是目标表,所以使用load-url描述
'database-name' = 'test_sr',
'table-name' = 'score_board',
'username' = 'admin',
'password' = 'xxxxxx'
);
编写执行任务语句,执行时记得开启yarn-session。
insert into
starrocks_test
select
*
from
mysql_test;
通过Flink的面板可以看出写入任务执行成功。
验证StarRocks端可以看到MySQL中的数据通过Flink成功写入。
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
connector | 是 | (none) | String | 指定使用的starrocks 连接器或者jdbc连接器。 |
jdbc-url | 是 | (none) | String | 用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式: |
database-name | 是 | (none) | String | 数据库名称。 |
table-name | 是 | (none) | String | 表名称。 |
username | 是 | (none) | String | 用户名称。 |
password | 是 | (none) | String | 用户密码。 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
scan-url | 是 | (none) | String | FE 节点的 IP 和 http 端口信息,如果有多个,需要用逗号(,)分隔。格式为fe_ip1:http_port,fe_ip2:http_port..。 |
scan.connect.timeout-ms | 否 | 1000 | String | 连接 StarRocks 数据仓库的超时时长,单位毫秒。 |
scan.params.keep-alive-min | 否 | 10 | String | 读取任务的保活时长,单位分钟。 |
scan.params.query-timeout-s | 否 | 600 | String | 读取任务的最大超时时长,单位秒。 |
scan.params.mem-limit-byte | 否 | 1073741824 | String | BE 节点中单个查询的内存上限,单位为 bytes。默认值 1073741824,相当于 1GB。 |
scan.max-retries | 否 | 1 | String | 读取任务失败后的最大重试次数 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
load-url | 是 | (none) | String | 用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式: |
sink.semantic | 否 | at-least-once | String | sink 保证的语义。有效值:at-least-once 和 exactly-once。 |
sink.version | 否 | AUTO | String | 数据加载时使用的接口。V1:使用 Stream Load 接口加载数据。V2:使用 Transaction Stream Load 接口加载数据,要求 StarRocks 至少为 2.4 版本。AUTO:判断 StarRocks 是否支持 Transaction Stream Load 接口,然后选择版本。支持则选择 V2,不支持则选择 V1。 |
sink.buffer-flush.max-bytes | 否 | 94371840 | String | 数据写入 StarRocks 前,Buffer 可容纳的最大数据量,范围为[64MB, 10GB]。 默认值 94371840,相当于 90MB。 |
sink.buffer-flush.max-rows | 否 | 500000 | String | 数据写入 StarRocks 前,Buffer 可容纳的最大数据行数。 |
sink.buffer-flush.interval-ms | 否 | 300000 | String | Buffer 刷新时间间隔,单位为毫秒,取值范围 [64000, 5000000]。 |
sink.max-retries | 否 | 3 | String | 写入任务的最大重试次数,取值范围为 [0, 10]。 |
sink.connect.timeout-ms | 否 | 1000 | String | 连接 StarRocks 数据仓库的超时时长,单位毫秒,取值范围为 100~60000。 |
sink.parallelism | 否 | NULL | String | 指定并行度。如果不指定并行度,则使用全局并行度。 |
否 | (none) | String | Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 sink.properties.format 表示 Stream Load 所导入的数据格式,如 CSV 或者 JSON。全部参数和解释,请参见 STREAM LOAD。 |
纯净模式