最近更新时间:2024-04-07 10:19:40
Yarn Session模式提交作业主要有两种方式:Yarn-session和Yarn-cluster
1. 使用Flink中的 Yarn-session(yarn客户端),会启动两个必要服务JobManager和TaskManagers;
2. 客户端通过 Yarn-session提交作业;
3. Yarn-session会持续运行,不断接收客户端提交的任务;
4. 适用于具有大量小作业的情况。
在Flink目录启动Yarn-session: 运行bin/yarn-session.sh -d
命令启动yarn-session。
可以通过yarn application -list
命令查看已经启动的yarn-session,通过bin/flink run examples/batch/WordCount.jar
命令使用Flink提交任务,通过yarn application -kill application_id
去关闭该yarn session。
特点:
1. 直接提交任务给YARN;
2. 适用于大型作业;
3. 会自动关闭 session
在Flink目录启动,直接提交任务:bin/flink run -m yarn-cluster examples/batch/WordCount.jar
当Flink执行executor时,会自动生成基于程序代码的DAG数据流图;
ActorSystem创建Actor并将数据流图发送给JobManager中的Actor;
JobManager会不断接收TaskManager的心跳消息,从而获取有效的TaskManager;
JobManager通过调度器在TaskManager中调度执行Task(在Flink中,最小的调度单元就是task,对应一个线程);
在程序运行过程中,task与task之间可以进行数据传输。
Job Client
1. 主要职责是提交任务, 提交后可以结束进程, 也可以等待结果返回;
2. Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点;
3. Job Client负责接受用户的程序代码,然后创建数据流,将数据流提交给Job Manager以便进一步执行。执行完成后,Job Client 将结果返回给用户。
JobManager
1. 主要职责是调度工作并协调任务做检查点;
2. 集群中至少要有一个master,master负责调度task,协调checkpoints和容错;
3. 高可用设置的话可以有多个master,但要保证一个是leader, 其他是standby;
4. Job Manager 包含Actor System、Scheduler、CheckPoint三个重要的组件;
5. JobManager从客户端接收到任务以后,首先生成优化过的执行计划,再调度到 TaskManager中执行。
TaskManager
1. 主要职责是从 JobManager 处接收任务, 并部署和启动任务, 接收上游的数据并处理;
2. Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点;
3. TaskManager在创建之初就设置好了Slot, 每个Slot可以执行一个任务。
本文通过以下操作语句,介绍Flink SQL的基础用法:
SELECT
从DataSet/DataStream中选择数据,筛选出某些列。
示例:
取出表中的所有存储信息:SELECT * FROM Table;
取出表中的name列:SELECT name FROM Table;
WHERE
从数据集/流中过滤数据,与SELECT一起使用,选择符合条件的记录。
示例:
取出年龄等于20的人员的姓名:SELECT name FROM Table WHERE age = 20;
DISTINCT
从数据集/流中去重,常配合SELECT的结果使用。
示例:
取出表中的不重复的姓名:SELECT DISTINCT name FROM Table;
GROUP BY
对数据进行分组操作。
示例:
根据年龄进行分组:SELECT * FROM Table GROUP BY age;
JOIN
把来自两个表的数据联合起来形成结果表,Flink支持四种JOIN类型的操作:
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN
LEFT JOIN 与 JOIN 的区别是当右表没有与左边相 JOIN 的数据时候,右边对应
的字段补 NULL 输出,RIGHT JOIN 相当于 LEFT JOIN 左右两个表交互一下位置。
FULL JOIN 相当于 RIGHT JOIN 和 LEFT JOIN 之后进行 UNION ALL操作。
示例:
把订单表数据和商品表数据关联:SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product;
UNION和UNION ALL
UNION将两个结果集合并起来,要求两个结果集字段完全一致,其中包括字段类型和字段顺序。
示例:SELECT * FROM T1 UNION (ALL);
KMR5.2.0中适配了flink-1.17.1
,可以通过SQL的方式直接关联Hive分区表的最新分区,并会自动监听最新的Hive分区。当监控到新的分区后,会自动的做维表数据的全量替换。以下是在Flink中,使用HiveCatalog来操作Hive表的一个示例:
--创建一个HiveCatalog
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/opt/kmr/hive/hive-3.1.3/conf/'
);
--切换catalog
USE CATALOG myhive;
--创建数据库
create database myhivebase;
--创建表
CREATE TABLE `myhive`.`myhivebase`.`test_table_flink` (
id BIGINT COMMENT 'unique id',
data STRING
) WITH (
'connector'='hive',
'catalog-type'='hive',
'uri'='thrift://master1IP,thrift://master2IP:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://hdfs-ha/flink/warehouse/tablespace/managed/hive',
'engine.hive.enabled'='true'
);
--插入数据
INSERT INTO `myhive`.`myhivebase`.`test_table_flink` VALUES (111, 'aaaa');
--查询数据
select * from `myhive`.`myhivebase`.`test_table_flink`;
纯净模式