流计算作业开发-自定义开发模式

最近更新时间:2019-12-20 14:15:48

自定义开发相关功能包括ETL、Custom Operator、UDF、维表等,提供JAR包上传和在线代码编写两种实现方式。下面将为每一种功能提供详细说明和并附注案例。

ETL operator

ETL功能以算子形式,将stream流暴露给用户,进行细力度的操作。系统提供JAR/在线开发两种实现方式。

(1) JAR开发

1) 代码编写

ETL功能需要继承基类ETLFunction,并重写ETL和tableSchema两个方法。

  • params为页面设置的自定义参数,以map形式输入,此处只简单输出。

  • with Logging,代码中可直接使用log实例,日志会打印在taskmanager的log信息中。

  • ETL函数,重载父类函数。每一条流消息调用一次本方法。本例中将第一列取出,并拼接一个固定列值。实际使用时以具体业务逻辑替换。

  • tableSchema函数,重载父类函数。因处理逻辑不确定,所以返回schema需算子自己定义。以标准json schema返回。 注意:必须要和ETL函数的返回值对应上,不然后续转换过程和报错。

2) 代码打包

mvn clean package -DskipTests -Dcheckstyle.skip=true

3) 页面配置

  • 在资源管理功能中将打包后的【streaming-flink-test-1.0-SANPSHOT.jar】上传。

  • 在作业开发界面选择依赖的jar。

  • 拖拽算子生成graph,并设置ETL Operator参数。其中类名称需为全路径,在执行时从依赖jar包中反射注入。

(2)在线开发

1) 代码编写

在线代码编写功能在原ETL算子中,增加了【在线开发】选项,并提供JAVA/SCALA两种语法支持。默认会展示最基本的代码框架,如需业务相关模板可在【是否使用模板】功能中选择。

双击ETL算子后进入编辑页面。按上图进行操作,其中需注意点如下:

  • 模板需预先在左侧【模板管理】功能中设置,且公共模板整个租户可见。

  • 模板具体参见【Flink自定义开发模板.doc】,只需编写【模板内容】部分,系统会自动填充成【补全后内容】运行。

  • 添加参数】输入的参数于ETL功能中使用,通过params: java.util.Map[String, Any]直接获取。

如需在ETL中调用维表,请参见【维表与ETL结合方式】。

Custom Operator

Custom Operator功能提供一个自定义的全新算子,可以充当ETL,也可以充当Source或者Sink。需要自己维护TypeInformation并通过flatMap函数实现业务逻辑,Custom Operator只支持JAR开发模式。

1) 代码编写

ETL功能需要继承基类TableOperateProcessor,并重写innerbuild函数。举例CustomTestSimple为一个Custom Operator简单实现:

a) 构造器中定义了四个参数:

  • name:本算子的name,通常设置为在jobGraph中显示名称。

  • childs:子算子集合

  • configs: Map格式参数集合

  • sm: StreamingMate可获取环境变量、维表等全局参数

b) with Logging,代码中可直接使用log实例,日志会打印在taskmanager的log信息中。

c) innerBuild函数,重载父类函数。将dataStream直接暴露给用户,本例中不做处理直接转给下一个算子。实际使用时以具体业务逻辑替换。

d) dataStream.dataType.asInstanceOf[RowTypeInfo]获取输入流的schema

e) 注意:必须要隐式声明implicit val tpe: TypeInformation[Row],供下游算子使用

2) 代码打包 同ETL

3) 页面配置 同ETL

自定义UDF

Udf功能采用Flink自身Udf语法,详细说明参见https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html

(1)JAR开发

1)代码编写

Udaf功能需要继承基类AggregateFunction,并重写createAccumulator、getValue、accumulate、getResultType四个方法(有些可选方法见官网)。

2)代码打包

mvn clean package -DskipTests -Dcheckstyle.skip=true

3)页面配置

在资源管理功能中将打包后的【streaming-flink-test-1.0-SANPSHOT.jar】上传。

(2)在线代码编写模式

1)在左侧菜单栏添加了【函数管理】功能,支持UDF/UDAF的添加和修改。

2)双击新增按钮后进入编辑页面。按上图进行操作,其中需要注意点如下:

  • 函数名为最终register的函数名称,sql中使用这个名称。

  • 函数属性用以展示,在选择函数时使用。

  • 模板具体参见【Flink自定义开发模板.doc】,只需编写【模板内容】部分,系统会自动填充成【补全后内容】运行。

维表配置

(1)功能说明

当前Flink版本只支持流式数据源,当流数据需要关联外部数据库(如mysql、oracle、redis等),需要采用维表的形式支持。 数据库的维表查询请求,有大量相同 key 的重复请求。如何减少重复请求?本地缓存是常用的方案。本方案目前提供两种缓存方案:LRU 和 ALL。

  • cache = ALL(默认): 全量内存缓存

  • cacheTTLMs:缓存的过期时间(ms)

  • cache = LRU: LRU内存缓存

  • cacheSize: 缓存的条目数量

  • cacheTTLMs:缓存的过期时间(ms)

(2)与ETL结合方式

本版本维表功能只支持在ETL算子中使用。本文提供mysql版的维表实现案例,如需其他版本请用户自定义实现。

  • 代码编写

dac.getConfigByKey(MySQLUpdater.key)为从运行时环境获取缓存对象,其中MySQLUpdater.key为对应实现的key。

  • 代码打包

同ETL

  • 页面配置

其中数据源非必选参数,如使用外部数据源,可完全由自定义参数配置数据库连接。维表可设置多个。

金山云,开启您的云计算之旅

免费注册