全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

流计算作业开发-自定义开发模式

最近更新时间:2023-06-16 11:46:57

自定义开发相关功能包括ETL、Custom Operator、UDF、维表等,提供JAR包上传和在线代码编写两种实现方式。下面将为每一种功能提供详细说明和并附注案例。

ETL operator

ETL功能以算子形式,将stream流暴露给用户,进行细力度的操作。系统提供JAR/在线开发两种实现方式。

1.JAR开发

(1)代码编写
ETL功能需要继承基类ETLFunction,并重写ETL和tableSchema两个方法。

  • params为页面设置的自定义参数,以map形式输入,此处只简单输出。
  • with Logging,代码中可直接使用log实例,日志会打印在taskmanager的log信息中。
  • ETL函数,重载父类函数。每一条流消息调用一次本方法。本例中将第一列取出,并拼接一个固定列值。实际使用时以具体业务逻辑替换。
  • tableSchema函数,重载父类函数。因处理逻辑不确定,所以返回schema需算子自己定义。以标准json schema返回。
    注意:必须要和ETL函数的返回值对应上,不然后续转换过程和报错。

(2)代码打包
mvn clean package -DskipTests -Dcheckstyle.skip=true
(3)页面配置

  • 在资源管理功能中将打包后的streaming-flink-test-1.0-SANPSHOT.jar上传。
  • 在作业开发界面选择依赖的jar。
  • 拖拽算子生成graph,并设置ETL Operator参数。其中类名称需为全路径,在执行时从依赖jar包中反射注入。

2.在线开发
(1)代码编写
在线代码编写功能在原ETL算子中,增加了在线开发选项,并提供JAVA/SCALA两种语法支持。默认会展示最基本的代码框架,如需业务相关模板可在是否使用模板功能中选择。
双击ETL算子后进入编辑页面。按上图进行操作,其中需注意点如下:

  • 模板需预先在左侧模板管理功能中设置,且公共模板整个租户可见。
  • 模板具体参见Flink自定义开发模板.doc,只需编写模板内容部分,系统会自动填充成补全后内容运行。
  • 添加参数输入的参数于ETL功能中使用,通过params: java.util.Map[String, Any]直接获取。

如需在ETL中调用维表,请参见维表与ETL结合方式。

Custom Operator

Custom Operator功能提供一个自定义的全新算子,可以充当ETL,也可以充当Source或者Sink。需要自己维护TypeInformation并通过flatMap函数实现业务逻辑,Custom Operator只支持JAR开发模式。

1.代码编写
ETL功能需要继承基类TableOperateProcessor,并重写innerbuild函数。举例CustomTestSimple为一个Custom Operator简单实现:
(1)构造器中定义了四个参数:

参数名称 说明
name 本算子的name,通常设置为在jobGraph中显示名称。
childs 子算子集合。
configs Map格式参数集合。
sm StreamingMate可获取环境变量、维表等全局参数。

(2)with Logging,代码中可直接使用log实例,日志会打印在taskmanager的log信息中。
(3)innerBuild函数,重载父类函数。将dataStream直接暴露给用户,本例中不做处理直接转给下一个算子。实际使用时以具体业务逻辑替换。
(4)dataStream.dataType.asInstanceOf[RowTypeInfo]获取输入流的schema。
注意: 必须要隐式声明implicit valtpe:TypeInformation[Row],供下游算子使用

2.代码打包
同ETL

3.页面配置
同ETL

自定义UDF

Udf功能采用Flink自身Udf语法,详细说明参见Flink官网

1.JAR开发
(1)代码编写
Udaf功能需要继承基类AggregateFunction,并重写createAccumulator、getValue、accumulate、getResultType四个方法(有些可选方法见官网)。
(2)代码打包
mvn clean package -DskipTests -Dcheckstyle.skip=true
(3)页面配置
在资源管理功能中将打包后的streaming-flink-test-1.0-SANPSHOT.jar上传。

2.在线代码编写模式
(1)在左侧菜单栏添加了函数管理功能,支持UDF/UDAF的添加和修改。
(2)双击新增按钮后进入编辑页面。按上图进行操作,其中需要注意点如下:

  • 函数名为最终register的函数名称,sql中使用这个名称。
  • 函数属性用以展示,在选择函数时使用。
  • 模板具体参见Flink自定义开发模板.doc,只需编写模板内容部分,系统会自动填充成补全后内容运行。

维表配置

1.功能说明
当前Flink版本只支持流式数据源,当流数据需要关联外部数据库(如mysql、oracle、redis等),需要采用维表的形式支持。
数据库的维表查询请求,有大量相同 key 的重复请求。如何减少重复请求本地缓存是常用的方案。本方案目前提供两种缓存方案:LRU 和 ALL。

  • cache = ALL(默认): 全量内存缓存
  • cacheTTLMs:缓存的过期时间(ms)
  • cache = LRU: LRU内存缓存
  • cacheSize: 缓存的条目数量
  • cacheTTLMs:缓存的过期时间(ms)

2.与ETL结合方式
本版本维表功能只支持在ETL算子中使用。本文提供mysql版的维表实现案例,如需其他版本请用户自定义实现。

  • 代码编写
    dac.getConfigByKey(MySQLUpdater.key)为从运行时环境获取缓存对象,其中MySQLUpdater.key为对应实现的key。
  • 代码打包
    同ETL
  • 页面配置
    其中数据源非必选参数,如使用外部数据源,可完全由自定义参数配置数据库连接。维表可设置多个。
文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈