自定义开发相关功能包括ETL、Custom Operator、UDF、维表等,提供JAR包上传和在线代码编写两种实现方式。下面将为每一种功能提供详细说明和并附注案例。
ETL功能以算子形式,将stream流暴露给用户,进行细力度的操作。系统提供JAR/在线开发两种实现方式。
1.JAR开发
(1)代码编写
ETL功能需要继承基类ETLFunction,并重写ETL和tableSchema两个方法。
(2)代码打包
mvn clean package -DskipTests -Dcheckstyle.skip=true
(3)页面配置
2.在线开发
(1)代码编写
在线代码编写功能在原ETL算子中,增加了在线开发选项,并提供JAVA/SCALA两种语法支持。默认会展示最基本的代码框架,如需业务相关模板可在是否使用模板功能中选择。
双击ETL算子后进入编辑页面。按上图进行操作,其中需注意点如下:
如需在ETL中调用维表,请参见维表与ETL结合方式。
Custom Operator功能提供一个自定义的全新算子,可以充当ETL,也可以充当Source或者Sink。需要自己维护TypeInformation并通过flatMap函数实现业务逻辑,Custom Operator只支持JAR开发模式。
1.代码编写
ETL功能需要继承基类TableOperateProcessor,并重写innerbuild函数。举例CustomTestSimple为一个Custom Operator简单实现:
(1)构造器中定义了四个参数:
参数名称 | 说明 |
---|---|
name | 本算子的name,通常设置为在jobGraph中显示名称。 |
childs | 子算子集合。 |
configs | Map格式参数集合。 |
sm | StreamingMate可获取环境变量、维表等全局参数。 |
(2)with Logging,代码中可直接使用log实例,日志会打印在taskmanager的log信息中。
(3)innerBuild函数,重载父类函数。将dataStream直接暴露给用户,本例中不做处理直接转给下一个算子。实际使用时以具体业务逻辑替换。
(4)dataStream.dataType.asInstanceOf[RowTypeInfo]获取输入流的schema。
注意: 必须要隐式声明implicit valtpe:TypeInformation[Row],供下游算子使用
2.代码打包
同ETL
3.页面配置
同ETL
Udf功能采用Flink自身Udf语法,详细说明参见Flink官网。
1.JAR开发
(1)代码编写
Udaf功能需要继承基类AggregateFunction,并重写createAccumulator、getValue、accumulate、getResultType四个方法(有些可选方法见官网)。
(2)代码打包
mvn clean package -DskipTests -Dcheckstyle.skip=true
(3)页面配置
在资源管理功能中将打包后的streaming-flink-test-1.0-SANPSHOT.jar上传。
2.在线代码编写模式
(1)在左侧菜单栏添加了函数管理功能,支持UDF/UDAF的添加和修改。
(2)双击新增按钮后进入编辑页面。按上图进行操作,其中需要注意点如下:
1.功能说明
当前Flink版本只支持流式数据源,当流数据需要关联外部数据库(如mysql、oracle、redis等),需要采用维表的形式支持。
数据库的维表查询请求,有大量相同 key 的重复请求。如何减少重复请求本地缓存是常用的方案。本方案目前提供两种缓存方案:LRU 和 ALL。
2.与ETL结合方式
本版本维表功能只支持在ETL算子中使用。本文提供mysql版的维表实现案例,如需其他版本请用户自定义实现。
dac.getConfigByKey(MySQLUpdater.key)
为从运行时环境获取缓存对象,其中MySQLUpdater.key
为对应实现的key。文档内容是否对您有帮助?
评价建议不能为空
非常感谢您的反馈,我们会继续努力做到更好!