进入流计算组件,点击新建文件夹和新建作业,创建一个新的流计算作业,创建完毕后即进入流计算作业的开发IDE界面。
左侧为本项目流计算任务树,右侧为任务组件和画布区。可以通过拖拽算子和连线轻松编辑流计算作业。
创建文件夹/作业时,支持:在线开发和JAR包上传2种模式,其中在线开发模式支持Flink引擎;JAR包上传模式,依赖线下开发并上传至云平台的JAR包,支持Flink和Spark Streaming两种引擎。
新建在线开发模式的流计算作业,将进入在线开发的IDE界面,中间的源表、数据处理、结果表等operator支持拖拽。拖拽后,将自动弹出算子的参数设置栏,完成参数配置,连接各operator后,即可生成流作业。流计算支持以下方式,灵活支持各种业务场景:
双流joining(支持2个Kafka数据源)
维表管理(支持Kafka数据源关联mysql、oracle等其他数据源 )
jar包依赖(通过上传的jar包进行ETL等操作)
单元测试(支持线下上传数据包,进行乱序、延时测试)
新建作业后,会自动弹出作业参数配置弹窗,可先对流作业的信息进行配置:
参数名称 | 说明 |
---|---|
时间属性 | 支持processing time(系统时间)和event time(时间时间)2种,分别表示流计算应用中,按照什么时间进行数据处理。 |
Checkpoint触发间隔 | 即设置检查点的时间间隔。 |
默认并行数 | 即流作业的并行数。 |
自定义环境参数 | 非必须,可配置Flink引擎的其他环境参数,缓存属性等。 |
数据预览功能可支持提前对Source、Sink的数据结构和数据进行预览,辅助开发。
在IDE界面拖拽数据源插件KafkaSource,双击拖拽的KafkaSource弹出参数设置框(首次拖拽插件时,自动弹出),配置参数后,点击右上角的【X】进行保存。配置参数说明如下:
参数名称 | 说明 |
---|---|
名称 | 即插件名称,可修改 |
数据源 | 可选择有项目下有权限的数据源 |
Topic名称 | 可选择已选数据源下的topic |
中间表名称 | 非必填,为将Kafka中的数据进行结构化后注册表的名称 |
Operator并发度 | 非必填,如不指定将按照作业默认并行数执行 |
Kafka消费位置 | 支持Latest(最近时间)、Earliest(最早时间)、custom(自定义选择开始时间)三种方式,默认Latest time执行。 |
指定事件时间字段 | 当作业配置中,时间属性为event time时出现,指定Topic中的日期时间字段或时间戳(毫秒)字段,作为事件时间。 |
watermark(ms): | 最大乱序/延时数据等待时间,表示会等待并处理多长时间的乱序/延时数据,超出等待时间的数据将被丢弃。 |
自定义环境参数 | 可按需配置心跳时间间隔、相应超时时间等内容。 |
在IDE界面拖拽数据源插件Sql Opertaor,编写具体的流计算业务逻辑,Sql operator不能独立存在,只能在Source和Sink之间。
双击拖拽的Sql Opertaor,即可通过编写Flink SQL进行数据处理,SQL编译框默认提供编写规范,并支持添加预制的模板。
拖拽目标表算子,即可进行添加目标表的操作,流计算服务支持将数据输出至:JDBC(含:Mysql、Oracle、MPP三种数据源)、HBase、Kafka、Redis、ES七种数据源。
JDBC Sink
字段名 | 说明 | 文字描述 |
---|---|---|
名称 | 必填 | 即 Operator 名称,可修改 |
数据源类型 | 必填 | 数据源类型,支持;Oracle、MySQL |
目标数据源 | 必填 | 结果数据写入的数据源 |
目标数据库 | 必填 | 结果数据写入的数据库 |
目标数据表 | 必填 | 结果数据写入的数据表 |
batch size(条) | 选项 | 默认值:1,每批次写入数据的最大条数阈值(数据将分批次写入目标数据表) |
operator并行度 | 选项 | 若不指定 operator 并行度,按照作业默认并行度执行 |
刷新时间(s) | 选项 | 默认 0,数据写入目标表的最大等待时间,0 或者不填表示不启用 |
备注: 数据按照批次写入,有两个条件限制:批次大小 batchsize 和刷新时间flushIntervalms,只要满足其中一个条件,就会触发写数据操作。batchsize 表示这一批次的大小,也就是有多少条数,刷新时间表示这一批次等待多长时间写入。
HBase Sink
字段名 | 说明 | 文字描述 |
---|---|---|
名称 | 必填 | 即 Operator 名称,可修改 |
目标数据源 | 必填 | 结果数据写入的数据源 |
目标数据库 | 必填 | 结果数据写入的数据库 |
目标数据表 | 必填 | 结果数据写入的数据表 |
列族 | 必填 | 结果数据写入的列族 |
rowkey列 | 必填 | 以结果数据的某一列的值作为写入目标数据表的 RowKey值 |
操作方式 | 必填 | put 表示每次新增,incr 表示在原有值的基础上累加 |
sink并行度 | 选填 | 若不指定 operator 并行度,按照作业默认并行度执行 |
刷新时间(ms) | 选填 | 默认 100,数据写入目标表的最大等待时间,0 或者不填表示不启用 |
batch size(条) | 选填 | 默认 1000,每批次写入数据的最大条数阈值(数据将分批次写入目标数据表) |
备注: 数据按照批次写入,有两个条件限制:批次大小 batchsize 和刷新时间flushIntervalms,只要满足其中一个条件,就会触发写数据操作。batchsize 表示这一批次的大小,也就是有多少条数,刷新时间表示这一批次等待多长时间写入。
Kafka sink
字段名 | 说明 | 文字描述 |
---|---|---|
名称 | 必填 | 即 Operator 名称,可修改 |
目标数据源 | 必填 | 结果数据写入的 kafka |
目标Topic | 必填 | 结果数据写入的 Topic |
operator并行度 | 选填 | 若不指定 operator 并行度,按照作业默认并行度执行 |
Redis Sink
字段名 | 说明 | 文字描述 |
---|---|---|
名称 | 必填 | 即 Operator 名称,可修改 |
数据源 | 必填 | 结果数据写入的数据源 |
Sink并行度 | 选填 | 若不指定 operator 并行度,按照作业默认并行度执行 |
redis操作 | 必填 | |
操作类型 | 必填 | 支持:incr、incrByFloat、set、sadd、zadd、rpush、lpush、hset |
Redis Key | 必填 | 组装 RedisKey 所用到的字段(从上游表中取的某个或多个字段对应的值),字段的组合值作为 RedisKey,多个字段时使用逗号分隔 |
key连接符 | 必填 | 存储 RedisKey 时不同字段间的连接符 |
RedisValue | 必填 | 字段的值作为 RedisValue,只允许一个字段,如果多个字段,默认取第一个字段(hset 操作,支持多个字段,如果多个字段逗号分隔) |
过期时间-ms | 选填 | RedisKey 过期时间,不填写时表示数据不过期 |
权重 | 必填 | 当 redisOpTpye 为 zadd 时,有此字段,且必填,RedisScore,操作类型为 zadd 时使用,设置 RedisKey 的权重 |
ES Sink
字段名 | 说明 | 文字描述 |
---|---|---|
名称 | 必填 | 即Operator名称,可修改 |
目标数据源 | 必填 | 结果数据写入的数据源 |
目标索引 | 必填 | 结果数据写入的索引 |
目标Type | 必填 | 结果数据写入的 Type |
Sink 并行度 | 选填 | 若不指定 operator 并行度,按照作业默认并行度执行 |
batch size(条) | 选填 | 默认 1,每批次写入数据的最大条数阈值(数据将分批次数据目标表),最大:10 万 |
刷新时间(ms) | 选填 | 默认 10000,数据写入目标表的最大等待时间,0 或者不填表示不启用,最大:1 亿 ms |
备注:数据按照批次写入,有两个条件限制:批次大小 batchsize 和刷新时间flushIntervalms,只要满足其中一个条件,就会触发写数据操作。batchsize 表示这一批次的大小,也就是有多少条数,刷新时间表示这一批次等待多长时间写入。
将鼠标悬停在算子上,可浮现连线点,可选择算子的连接顺序,生成作业流。
在IDE界面通过拖拽方式将源表Kafka Source ,Sql Operator,Kafka Sink 进行连接,多个作业的时候也可灵活连接。
流计算支持同时对2个Kafka Source进行关联处理(双流joining)。
操作名称 | 说明 |
---|---|
偷锁 | 作业支持多人协同开发,但默认被创建人锁定,当非作业创建人需要编辑作业时,需要点击偷锁权限,临时获取作业编辑权限;保存:保存流计算作业的配置信息 |
重新加载 | 多人协同开发时,点击重新加载,更新同步作业最新编辑结果 |
提交 | 提交时将生成作业版本 |
单元测试 | 通过单元测试,在开发页面便捷式对作业进行测试,具体参数说明详见下文《单元测试》 |
测试运行 | 点击测试运行,将作业提交调度,并在“运维中心-流计算-测试实例”中生成测试实例 |
单元测试用于测试数据处理算子的可用性;
单元测试需选择指定数据包,数据包在 [资源管理] 中上传;
单元测试使用的数据包将复用画布中拖拽的Kafka Source的schema信息;
单元测试Source算子并行度为1;
单元测试结果获取会有一定延迟(每3s获取一次,每次上限100条)。
支持生成多个版本的流作业(点击提交按钮生成新版本),点击版本管理tab,可以查看个版本的信息。
当多版本时,通过点击流程图对比或版本对比,可对任意2个版本的流程图和参数进行对比,内容不一致的部分会被标注出来。
文档内容是否对您有帮助?
评价建议不能为空
非常感谢您的反馈,我们会继续努力做到更好!