全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

Flink使用指南

最近更新时间:2024-04-07 10:19:40

Yarn Session模式提交作业的方式

Yarn Session模式提交作业主要有两种方式:Yarn-sessionYarn-cluster

Yarn-session特性

1. 使用Flink中的 Yarn-session(yarn客户端),会启动两个必要服务JobManager和TaskManagers;

2. 客户端通过 Yarn-session提交作业;

3. Yarn-session会持续运行,不断接收客户端提交的任务;

4. 适用于具有大量小作业的情况。

在Flink目录启动Yarn-session: 运行bin/yarn-session.sh -d命令启动yarn-session。

可以通过yarn application -list 命令查看已经启动的yarn-session,通过bin/flink run examples/batch/WordCount.jar命令使用Flink提交任务,通过yarn application -kill application_id 去关闭该yarn session。

Yarn-cluster

特点:

1. 直接提交任务给YARN;

2. 适用于大型作业;

3. 会自动关闭 session

在Flink目录启动,直接提交任务:bin/flink run -m yarn-cluster examples/batch/WordCount.jar

任务的调度与执行

  1. 当Flink执行executor时,会自动生成基于程序代码的DAG数据流图;

  2. ActorSystem创建Actor并将数据流图发送给JobManager中的Actor;

  3. JobManager会不断接收TaskManager的心跳消息,从而获取有效的TaskManager;

  4. JobManager通过调度器在TaskManager中调度执行Task(在Flink中,最小的调度单元就是task,对应一个线程);

  5. 在程序运行过程中,task与task之间可以进行数据传输。

Job Client

1. 主要职责是提交任务, 提交后可以结束进程, 也可以等待结果返回;

2. Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点;

3. Job Client负责接受用户的程序代码,然后创建数据流,将数据流提交给Job Manager以便进一步执行。执行完成后,Job Client 将结果返回给用户。

JobManager

1. 主要职责是调度工作并协调任务做检查点;

2. 集群中至少要有一个master,master负责调度task,协调checkpoints和容错;

3. 高可用设置的话可以有多个master,但要保证一个是leader, 其他是standby;

4. Job Manager 包含Actor System、Scheduler、CheckPoint三个重要的组件;

5. JobManager从客户端接收到任务以后,首先生成优化过的执行计划,再调度到 TaskManager中执行。

TaskManager

1. 主要职责是从 JobManager 处接收任务, 并部署和启动任务, 接收上游的数据并处理;

2. Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点;

3. TaskManager在创建之初就设置好了Slot, 每个Slot可以执行一个任务。

Flink SQL

本文通过以下操作语句,介绍Flink SQL的基础用法:

SELECT

从DataSet/DataStream中选择数据,筛选出某些列。

示例:

  • 取出表中的所有存储信息:SELECT * FROM Table;

  • 取出表中的name列:SELECT name FROM Table;

WHERE

从数据集/流中过滤数据,与SELECT一起使用,选择符合条件的记录。

示例:

  • 取出年龄等于20的人员的姓名:SELECT name FROM Table WHERE age = 20;

DISTINCT

从数据集/流中去重,常配合SELECT的结果使用。

示例:

  • 取出表中的不重复的姓名:SELECT DISTINCT name FROM Table;

GROUP BY

对数据进行分组操作。

示例:

  • 根据年龄进行分组:SELECT * FROM Table GROUP BY age;

JOIN

把来自两个表的数据联合起来形成结果表,Flink支持四种JOIN类型的操作:

  1. JOIN - INNER JOIN

  2. LEFT JOIN - LEFT OUTER JOIN

  3. RIGHT JOIN - RIGHT OUTER JOIN

  4. FULL JOIN - FULL OUTER JOIN

LEFT JOIN 与 JOIN 的区别是当右表没有与左边相 JOIN 的数据时候,右边对应
的字段补 NULL 输出,RIGHT JOIN 相当于 LEFT JOIN 左右两个表交互一下位置。
FULL JOIN 相当于 RIGHT JOIN 和 LEFT JOIN 之后进行 UNION ALL操作。

示例:

  • 把订单表数据和商品表数据关联:SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product;

UNION和UNION ALL

UNION将两个结果集合并起来,要求两个结果集字段完全一致,其中包括字段类型和字段顺序。

  • 示例:SELECT * FROM T1 UNION (ALL);

Flink关联Hive分区表

KMR5.2.0中适配了flink-1.17.1,可以通过SQL的方式直接关联Hive分区表的最新分区,并会自动监听最新的Hive分区。当监控到新的分区后,会自动的做维表数据的全量替换。以下是在Flink中,使用HiveCatalog来操作Hive表的一个示例:

--创建一个HiveCatalog
CREATE CATALOG myhive WITH (
  'type' = 'hive',
  'default-database' = 'default',
  'hive-conf-dir' = '/opt/kmr/hive/hive-3.1.3/conf/'
);
--切换catalog
USE CATALOG myhive;
--创建数据库
create database myhivebase;
--创建表
 CREATE TABLE `myhive`.`myhivebase`.`test_table_flink` (
      id BIGINT COMMENT 'unique id',
      data STRING
  ) WITH (
   'connector'='hive',
   'catalog-type'='hive',
   'uri'='thrift://master1IP,thrift://master2IP:9083',
   'clients'='5',
   'property-version'='1',
   'warehouse'='hdfs://hdfs-ha/flink/warehouse/tablespace/managed/hive',
   'engine.hive.enabled'='true'
  );
--插入数据
INSERT INTO `myhive`.`myhivebase`.`test_table_flink` VALUES (111, 'aaaa');
--查询数据
select * from `myhive`.`myhivebase`.`test_table_flink`;

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈