开发思路

Schema信息hive的catalog保存管理已有schema信息
任务模式yarn的资源管理,并且每个SQL任务独立,任务失败后Application自动结束
优先思考的问题

如何使用 HiveCatalog

catalog配置示例

#/opt/flink/conf/sql-client-defaults.yaml 

execution:
    planner: blink
    current-catalog: hive  # set the HiveCatalog as the current catalog of the session
    current-database: default
catalogs:
   - name: hive
     type: hive
     default-database: default
     hive-conf-dir: /opt/hive/conf # hive.metastore.uris代码中可以直接使用HiveConf创建Catalog注册到TableEnv

配置完成

  • 一旦配置正确,就HiveCatalog应该开箱即用。用户可以使用 DDL 创建 Flink 元对象,应该立即看到它们。
  • is_generic=true 通用表 hive中不可用,is_generic=false 非通用表 hive中可用
  • 重启flink-SQL-Client后,已经创建的表可见
  • 使用hive的metastore读取元数据信息,权限控制 (RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=admin(auth:SIMPLE))
  • 风险点: 集群依赖jar的安装下载,兼容性问题

SQL-Client功能项

  1. source与sink的创建,编辑,管理
  2. 开发功能 示例数据 source/sink
  3. 调试功能 debug运行时的输入输出
  4. 线上运行 提交运行,获取jobId,状态监控,日志获取,启动停止/恢复
  5. catalog的管理

思路

  1. flink-sql-client中SqlClient用于提交SQL任务cli.executeInNonInteractiveMode(readExecutionContent()); // 1.13.x
    Operation operation = executor.parseStatement(sessionId, stmt);
    // 最终调用
    org.apache.flink.table.client.gateway.local.LocalExecutor#executeOperation

    @Override
       public TableResult executeOperation(String sessionId, Operation operation)
               throws SqlExecutionException {
           final ExecutionContext context = getExecutionContext(sessionId);
           final TableEnvironmentInternal tEnv =
                  (TableEnvironmentInternal) context.getTableEnvironment();
           try {
               return context.wrapClassLoader(() -> tEnv.executeInternal(operation));
          } catch (Exception e) {
               throw new SqlExecutionException(MESSAGE_SQL_EXECUTION_ERROR, e);
          }
      }默认情况下,读取FLINK_CONF_DIR下的flink_cluster集群配置(job_manager地址,提交任务)
  2. flink jar 提交有 集群模式 和 独立模式, 对应SQL都是提交到相同集群上运行?
  3. flink-sql的资源队列控制与用户权限
  4. 网络与账户权限问题,(后续如kafka2kafka,kafka2mysql,kafka2es,网络互通与账户权限)
  5. sql-client 多个用户提交同一个集群,无法满足资源隔离需求
    1. 自定义封装SQL-client之后是否能控制实现指定集群/拉起新集群,从而限制资源/完成隔离
  6. 用户自定义函数 udf的需求
flink yarn-per-job

flink yarn-cluster 提交流程

用户请求处理步骤

  1. 配置集群环境, flinkSQL读取hive的Catalog
  2. 用户界面管理Table(创建,修改,删除) DDL
  3. 用户编写SQL语句实现查询与数据转换逻辑 DML
    1. 用户一个Job只包含一条SQL(强制要求?!)
    2. 用户一个Job多条SQL(1个集群多个job/多个集群每个集群一个JOB)
    3. 用户首次提交创建一个集群(维护用户的flink集群,所有用户的任务提交到该集群)
  4. 用户配置其他相关(checkpoint/重启策略)
  5. 参考sql-client代码处理依赖模块加载,创建tableEnv,提交查询
    1. 默认通过读取本地集群配置
    2. 指定路径下预先配置插件包(connector相关/udf相关)
    3. 可指定job_manager,实现多个SQL运行于相同集群
    4. flink run 模式
  6. yarn-per-job启动的集群不包含提交任务相关的jar及接口,即不可重用
    1. 每次执行JOB前先创建一个flink-yarn集群,然后分次提交SQL的任务,(select 限制返回时长和条数, insert 提交成功即返回)
    2. 每次执行SQL启动一个yarn-per-job 集群
    3. 预先启动一个flink-yarn集群,配置指定集群appId
      1. start cluster then submit queries 每次执行前先创建一个flink-yarn集群,然后分次提交SQL的任务
      2. start cluster when submit queries 每次启动一个yarn-per-job集群,再执行
      3. prepare the cluster before submit queries 提前手动创建一个flink-yarn集群,配置集群appId
  7. 任务状态与停止

开发备忘

  1. yarn App(flink 集群) 与 flinkJob
  2. yarn接口flink接口
  3. 停止集群
    1. 当flinkjob处于running状态时,调用cancel接口, Yarn(FINISHED KILLED)
    2. 当flinkjob处于failed状态时,调用cancel接口无效,集群仍存在, 此时调用cluster的DELETE,Yarn(FINISHED SUCCEEDED)
    3. 当上面接口都调用失败时,调用yarn接口kill应用,Yarn(FINISHED KILLED)
  4. 权限相关异常
    • create database 与create table Got exception: org.apache.hadoop.security.AccessControlException Permission denied: user=root, access=WRITE, inode=”/user/hive/warehouse/flink.db”:admin:supergroup:drwxr-xr-
  5. detached与attached
    1. detached模式下,appmaster 和jobmanager绑定,jobmanager超过retry次数后,globalFailure 退出,Yarn上的App同时Finished,FinalStatus标记为Failed
    2. attached模式下,appmaster与jobmanager分离,超过失败次数后job标记为failed,但yarn-per-job的flink集群并不会退出,同时Yarn的App为RUNNING,FinalStatus为UNDEFINED
  6. flink的HA模式
    • on-yarn 的HA模式一般采用zookeeper来保存中间状态,当app异常退出,yarn自动拉起对应的app进行appattempts
    • appattempts的最大次数配置 yarn的配置 > flink的配置,两者取最小值
    • 非程序内部异常退出,似乎不计算为attempts次数,手动kill的am可以无限次重启
    • flink本身retry次数用尽,会直接设置app的状态,为正常的app行为,故不会再次拉起
  7. HA配置demo如下
# /opt/flink/conf/flink-conf.yaml 

peline.max-parallelism: 20
restart-strategy: fixed-delay
pipeline.auto-watermark-interval: 5000 ms
restart-strategy.fixed-delay.delay: 10 s
restart-strategy.fixed-delay.attempts: 1
# HighAvailabilityOptions HA-MODE
yarn.application-attempts: 2
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.storageDir: hdfs:///tmp/flink/recovery
high-availability.zookeeper.path.root: /flink_on_yarn
high-availability.cluster-id: /cluster_yarn