• 热门
  • 基础
  • 数据库
  • 安全
  • 大数据
  • 人工智能
  • 混合云
  • 开发与运维
  • 企业应用

应用服务

行业引擎

全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

KMR Flink同步MySQL数据至Starrocks

最近更新时间:2024-06-21 10:46:59

准备工作

创建Serverless Flink集群

进入托管Hadoop的控制台,然后点击创建集群,在基础信息配置的可选组件中选择flink。

准备KRDS MySQL的数据库信息

您可进入云数据库MySQL控制台创建实例,注意要和Serverless Flink、Serverless StarRocks集群在同一个VPC下。

获取StarRocks的数据库信息

  • StarRocks 数据源配置时,StarRocks集群对应的集群信息、数据库用户名密码需填写正确;

  • 填写的数据库用户名信息,必须拥有相应数据库表的读写权限,来保障任务数据能够被正常读取或写入 StarRocks 中。

  • EMR StarRocks 集群和Serverless Flink,KRDS MySQL的 VPC 必须一致。若 VPC 不一致时,则需要在 StarRocks 集群信息的Manager访问安全配置栏中,选择Manager白名单,并添加入方向规则。

创建MySQL源数据表

创建MySQL测试表

首先连接KRDS MySQL实例,详情请参见登录数据库,然后执行以下命令,创建数据库test_sr,以及数据表test_sr.score_board。

CREATE DATABASE test_sr;

CREATE TABLE test_sr.score_board(
    id int(11) NOT NULL COMMENT "",
    name varchar(255) NULL DEFAULT "" COMMENT "",
    score int(11) NOT NULL DEFAULT "0" COMMENT "",
    PRIMARY KEY(id)
);

向MySQL插入测试数据

向score_board表中插入若干条数据。

MySQL [test]> INSERT INTO score_board values
    (1, 'Bob', 21),
    (2, 'Stan', 21),
    (3, 'Sam', 22),
    (4, 'Tony', 22),
    (5, 'Alice', 22),
    (6, 'Lucy', 23),
    (7, 'Polly', 23),
    (8, 'Tom', 23),
    (9, 'Rose', 24),
    (10, 'Jerry', 24),
    (11, 'Jason', 24),
    (12, 'Lily', 25),
    (13, 'Stephen', 25),
    (14, 'David', 25),
    (15, 'Eddie', 26),
    (16, 'Kate', 27),
    (17, 'Cathy', 27),
    (18, 'Judy', 27),
    (19, 'Julia', 28),
    (20, 'Robert', 28),
    (21, 'Jack', 29);

创建StarRocks 表

可以通过点击集群信息的Manager UI链接登录StarRocks 集群,创建与上述MySQL集群一样的表结构

CREATE DATABASE test_sr;
USE test_sr;
CREATE TABLE test_sr.score_board(
    id int(11) NOT NULL COMMENT "",
    name varchar(65533) NULL DEFAULT "" COMMENT "",
    score int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id);

编写 Flink任务运行导入

1.导入connector

使用SSH方式登录KMR集群,下载的Connector用于Flink和StarRocks、KRDS MySQL连接,也可以下载到本地并上传到KMR集群,将Connector放在/opt/kmr/flink/flink-1.17.1/lib目录下。

# 进入lib目录
cd /opt/kmr/flink/flink-1.17.1/lib

# starrocks connector
wget https://repo1.maven.org/maven2/com/starrocks/flink-connector-starrocks/1.2.9_flink-1.17/flink-connector-starrocks-1.2.9_flink-1.17.jar

# mysql connector
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar

2. 数据同步

启动Flink YARN Session并运行SQL客户端。

su hadoop
cd /opt/kmr/flink/flink-1.17.1/lib 
yarn-session.sh -d
sql-clint.sh

根据要待导出数据的 MySQL表,在 Flink 中创建一张表,例如 mysql_test,并配置读取任务属性,包括设置 Flink Connector 和库表的信息:

CREATE TABLE
  mysql_test (`id` INT, `name` STRING, `score` INT)
WITH
  (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://17x.xx.xxx.xxx:3306/test_sr',
    'table-name' = 'score_board',
    'username' = 'admin',
    'password' = 'xxxxxxxx'
  );

根据要待导入数据的 StarRocks 表,在 Flink 中创建一张表,例如 starrocks_test,并配置读取任务属性,包括设置 Flink Connector 和库表的信息:

CREATE TABLE starrocks_test
(
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
     'connector' = 'starrocks',
     'jdbc-url'='jdbc:mysql://1xx.xx.252.xxx:9030',
     'load-url'='1xx.xx.252.xxx:8030',-- 由于starrocks是目标表,所以使用load-url描述
     'database-name' = 'test_sr',
     'table-name' = 'score_board',
     'username' = 'admin',
     'password' = 'xxxxxx'
 );

编写执行任务语句,执行时记得开启yarn-session。

insert into
  starrocks_test
select
  *
from
  mysql_test;

验证数据

通过Flink的面板可以看出写入任务执行成功。

验证StarRocks端可以看到MySQL中的数据通过Flink成功写入。

参数说明

通用参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的starrocks 连接器或者jdbc连接器。

jdbc-url

(none)

String

用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式:jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>

database-name

(none)

String

数据库名称。

table-name

(none)

String

表名称。

username

(none)

String

用户名称。

password

(none)

String

用户密码。

源表参数

参数

是否必选

默认值

数据类型

描述

scan-url

(none)

String

FE 节点的 IP 和 http 端口信息,如果有多个,需要用逗号(,)分隔。格式为fe_ip1:http_port,fe_ip2:http_port..。

scan.connect.timeout-ms

1000

String

连接 StarRocks 数据仓库的超时时长,单位毫秒。

scan.params.keep-alive-min

10

String

读取任务的保活时长,单位分钟。

scan.params.query-timeout-s

600

String

读取任务的最大超时时长,单位秒。

scan.params.mem-limit-byte

1073741824

String

BE 节点中单个查询的内存上限,单位为 bytes。默认值 1073741824,相当于 1GB。

scan.max-retries

1

String

读取任务失败后的最大重试次数

目标表参数

参数

是否必选

默认值

数据类型

描述

load-url

(none)

String

用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式:<fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>

sink.semantic

at-least-once

String

sink 保证的语义。有效值:at-least-onceexactly-once

sink.version

AUTO

String

数据加载时使用的接口。V1:使用 Stream Load 接口加载数据。V2:使用 Transaction Stream Load 接口加载数据,要求 StarRocks 至少为 2.4 版本。AUTO:判断 StarRocks 是否支持 Transaction Stream Load 接口,然后选择版本。支持则选择 V2,不支持则选择 V1。

sink.buffer-flush.max-bytes

94371840

String

数据写入 StarRocks 前,Buffer 可容纳的最大数据量,范围为[64MB, 10GB]。

默认值 94371840,相当于 90MB。

sink.buffer-flush.max-rows

500000

String

数据写入 StarRocks 前,Buffer 可容纳的最大数据行数。

sink.buffer-flush.interval-ms

300000

String

Buffer 刷新时间间隔,单位为毫秒,取值范围 [64000, 5000000]。

sink.max-retries

3

String

写入任务的最大重试次数,取值范围为 [0, 10]。

sink.connect.timeout-ms

1000

String

连接 StarRocks 数据仓库的超时时长,单位毫秒,取值范围为 100~60000。

sink.parallelism

NULL

String

指定并行度。如果不指定并行度,则使用全局并行度。

sink.properties.*

(none)

String

Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 sink.properties.format 表示 Stream Load 所导入的数据格式,如 CSV 或者 JSON。全部参数和解释,请参见 STREAM LOAD

纯净模式常规模式

纯净模式

点击可全屏预览文档内容

鼠标选中内容,快速反馈问题

如果在文档使用中出现问题,可选中有问题的部分进行快速反馈,我们将跟进处理。
不再提示
好的,我知道了

聆听反馈