开发思路
Schema信息 | hive的catalog保存管理已有schema信息 |
任务模式 | yarn的资源管理,并且每个SQL任务独立,任务失败后Application自动结束 |
Table of Contents
如何使用 HiveCatalog
catalog配置示例
#/opt/flink/conf/sql-client-defaults.yaml
execution:
planner: blink
current-catalog: hive # set the HiveCatalog as the current catalog of the session
current-database: default
catalogs:
- name: hive
type: hive
default-database: default
hive-conf-dir: /opt/hive/conf # hive.metastore.uris代码中可以直接使用HiveConf创建Catalog注册到TableEnv
配置完成
- 一旦配置正确,就HiveCatalog应该开箱即用。用户可以使用 DDL 创建 Flink 元对象,应该立即看到它们。
- is_generic=true 通用表 hive中不可用,is_generic=false 非通用表 hive中可用
- 重启flink-SQL-Client后,已经创建的表可见
- 使用hive的metastore读取元数据信息,权限控制 (RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=admin(auth:SIMPLE))
- 风险点: 集群依赖jar的安装下载,兼容性问题
SQL-Client功能项
- source与sink的创建,编辑,管理
- 开发功能 示例数据 source/sink
- 调试功能 debug运行时的输入输出
- 线上运行 提交运行,获取jobId,状态监控,日志获取,启动停止/恢复
- catalog的管理
思路
- flink-sql-client中SqlClient用于提交SQL任务cli.executeInNonInteractiveMode(readExecutionContent()); // 1.13.x
Operation operation = executor.parseStatement(sessionId, stmt);
// 最终调用
org.apache.flink.table.client.gateway.local.LocalExecutor#executeOperation
@Override
public TableResult executeOperation(String sessionId, Operation operation)
throws SqlExecutionException {
final ExecutionContext context = getExecutionContext(sessionId);
final TableEnvironmentInternal tEnv =
(TableEnvironmentInternal) context.getTableEnvironment();
try {
return context.wrapClassLoader(() -> tEnv.executeInternal(operation));
} catch (Exception e) {
throw new SqlExecutionException(MESSAGE_SQL_EXECUTION_ERROR, e);
}
}默认情况下,读取FLINK_CONF_DIR下的flink_cluster集群配置(job_manager地址,提交任务) - flink jar 提交有 集群模式 和 独立模式, 对应SQL都是提交到相同集群上运行?
- flink-sql的资源队列控制与用户权限
- 网络与账户权限问题,(后续如kafka2kafka,kafka2mysql,kafka2es,网络互通与账户权限)
- sql-client 多个用户提交同一个集群,无法满足资源隔离需求
- 自定义封装SQL-client之后是否能控制实现指定集群/拉起新集群,从而限制资源/完成隔离
- 用户自定义函数 udf的需求
flink yarn-cluster 提交流程
用户请求处理步骤
- 配置集群环境, flinkSQL读取hive的Catalog
- 用户界面管理Table(创建,修改,删除) DDL
- 用户编写SQL语句实现查询与数据转换逻辑 DML
- 用户一个Job只包含一条SQL(强制要求?!)
- 用户一个Job多条SQL(1个集群多个job/多个集群每个集群一个JOB)
- 用户首次提交创建一个集群(维护用户的flink集群,所有用户的任务提交到该集群)
- 用户配置其他相关(checkpoint/重启策略)
- 参考sql-client代码处理依赖模块加载,创建tableEnv,提交查询
- 默认通过读取本地集群配置
- 指定路径下预先配置插件包(connector相关/udf相关)
- 可指定job_manager,实现多个SQL运行于相同集群
- flink run 模式
- yarn-per-job启动的集群不包含提交任务相关的jar及接口,即不可重用
- 每次执行JOB前先创建一个flink-yarn集群,然后分次提交SQL的任务,(select 限制返回时长和条数, insert 提交成功即返回)
- 每次执行SQL启动一个yarn-per-job 集群
- 预先启动一个flink-yarn集群,配置指定集群appId
- start cluster then submit queries 每次执行前先创建一个flink-yarn集群,然后分次提交SQL的任务
- start cluster when submit queries 每次启动一个yarn-per-job集群,再执行
- prepare the cluster before submit queries 提前手动创建一个flink-yarn集群,配置集群appId
- 任务状态与停止
开发备忘
- yarn App(flink 集群) 与 flinkJob
- yarn接口 与 flink接口
- 停止集群
- 当flinkjob处于running状态时,调用cancel接口, Yarn(FINISHED KILLED)
- 当flinkjob处于failed状态时,调用cancel接口无效,集群仍存在, 此时调用cluster的DELETE,Yarn(FINISHED SUCCEEDED)
- 当上面接口都调用失败时,调用yarn接口kill应用,Yarn(FINISHED KILLED)
- 权限相关异常
- create database 与create table Got exception: org.apache.hadoop.security.AccessControlException Permission denied: user=root, access=WRITE, inode=”/user/hive/warehouse/flink.db”:admin:supergroup:drwxr-xr-
- detached与attached
- detached模式下,appmaster 和jobmanager绑定,jobmanager超过retry次数后,globalFailure 退出,Yarn上的App同时Finished,FinalStatus标记为Failed
- attached模式下,appmaster与jobmanager分离,超过失败次数后job标记为failed,但yarn-per-job的flink集群并不会退出,同时Yarn的App为RUNNING,FinalStatus为UNDEFINED
- flink的HA模式
- on-yarn 的HA模式一般采用zookeeper来保存中间状态,当app异常退出,yarn自动拉起对应的app进行appattempts
- appattempts的最大次数配置 yarn的配置 > flink的配置,两者取最小值
- 非程序内部异常退出,似乎不计算为attempts次数,手动kill的am可以无限次重启
- flink本身retry次数用尽,会直接设置app的状态,为正常的app行为,故不会再次拉起
- HA配置demo如下
# /opt/flink/conf/flink-conf.yaml
peline.max-parallelism: 20
restart-strategy: fixed-delay
pipeline.auto-watermark-interval: 5000 ms
restart-strategy.fixed-delay.delay: 10 s
restart-strategy.fixed-delay.attempts: 1
# HighAvailabilityOptions HA-MODE
yarn.application-attempts: 2
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.storageDir: hdfs:///tmp/flink/recovery
high-availability.zookeeper.path.root: /flink_on_yarn
high-availability.cluster-id: /cluster_yarn