• [行业动态] Flink Hudi 0.10.0 发布,多项重要更新,稳定性大幅提升
    前言随着云数仓技术的不断成熟,数据湖俨然已成为当下最热门的技术之一,而 Apache Hudi 是当下最具竞争力的数据湖格式之一:拥有最活跃的开源社区之一,周活跃 PR 一直维持在 50+ 水平;拥有最活跃的国内用户群之一,目前的 Apache Hudi 钉钉群用户已超过 2200+,国内各大厂商都已经布局 Apache Hudi 生态。Apache Hudi 的活跃度得益于其出色的 file format 设计和丰富的事物语义支持:类 LSM 的 file format 布局很好的适配了近实时更新场景,解决了超大数据集更新的痛点;Hudi 的事物层语义在目前的湖存储中是极其成熟和丰富的,基本所有的数据治理都可以自动化完成:compaction、rollback、cleaning、clustering。Flink On HudiApache Hudi 的 table format 对流计算友好的特性使得 Flink On Hudi 成为 Apache Hudi 项目最值得探索和挖掘的方向之一,Flink 不仅为 Hudi 解锁了超大数据流的实时更新能力、更添加了流式消费和计算的能力,让端到端近实时 ETL 得以在低成本的文件存储上轻松实现。Flink On Hudi 项目在 2020 年 11 月立项,至今已迭代了三个版本,从第一个版本开始人气和活跃度就一直高涨。5 月份组建的 Apache Hudi 钉钉群截止目前半年的时间,已经有超过 2200+ 用户,并且活跃度一直排在 Flink 用户群的前列。Flink On Hudi 已成为部署 Apache Hudi 项目的首选方案,国内主要云厂商:阿里云、华为云、腾讯云,国外的 AWS 都已集成 Flink On Hudi;国内的大型互联网公司:头条、快手、B站 以及传统企业:顺丰、海康等均有 Flink On Hudi 的生产实践,具钉钉群的跟踪回访等不完全统计,至少超过 50+ 国内公司在生产上使用 Flink On Hudi,Uber 公司更将 Flink On Hudi 作为 2022 年的重点方向在推进 !Flink On Hudi 的开发者生态也非常活跃,目前国内有阿里云、华为云、头条、B站的同学持续贡献,Uber 公司和 AWS 更专门投入人力来对接 Flink On Hudi。版本 Highlights0.10.0 版本经过社区用户的千锤百炼,贡献了多项重要的 fix,更有核心读写能力的大幅增强,解锁了多个新场景,Flink On Hudi 侧的更新重点梳理如下:Bug 修复修复对象存储上极端 case 流读数据丢失的问题 [HUDI-2548];修复全量+增量同步偶发的数据重复 [HUDI-2686];修复 changelog 模式下无法正确处理 DELETE 消息 [HUDI-2798];修复在线压缩的内存泄漏问题 [HUDI-2715]。新特性支持增量读取;支持 batch 更新;新增 Append 模式写入,同时支持小文件合并;支持 metadata table。功能增强写入性能大幅提升:优化写入内存、优化了小文件策略(更加均衡,无碎片文件)、优化了 write task 和 coordinator 的交互;流读语义增强:新增参数 earliest,提升从最早消费性能、支持参数跳过压缩读取,解决读取重复问题;在线压缩策略增强:新增 eager failover + rollback,压缩顺序改为从最早开始;优化事件顺序语义:支持处理序,支持事件序自动推导。下面挑一些重点内容为大家详细介绍:小文件优化Flink On Hudi 写入流程大致分为以下几个组件:row data to hoodie:负责将 table 的数据结构转成 HoodieRecord;bucket assigner:负责新的文件 bucket (file group) 分配;write task:负责将数据写入文件存储;coordinator:负责写 trasaction 的发起和 commit;cleaner:负责数据清理。其中的 bucket assigner 负责了文件 file group 的分配,也是小文件分配策略的核心组件。0.10.0 版本的每个 bucket assign task 持有一个 bucket assigner,每个 bucket assigner 独立管理自己的一组 file group 分组:在写入 INSERT 数据的时候,bucket assigner 会扫描文件视图,查看当前管理的 file group 中哪些属于小文件范畴,如果 file group 被判定为小文件,则会继续追加写入。比如上图中 task-1 会继续往 FG-1、FG-2 中追加 80MB 和 60MB 的数据。为了避免过度的写放大,当可写入的 buffer 过小时会忽略,比如上图中 FG-3、FG-4、FG-5 虽然是小文件,但是不会往文件中追加写。task-2 会新开一个 file group 写入。全局文件视图0.10.0 版本将原本 write task 端的文件视图统一挪到 JobManager,JobManager 启动之后会使用 Javaline 本地启动一个 web server,提供全局文件视图的访问代理。Write task 通过发送 http 请求和 web server 交互,拿到当前写入的 file group 视图。Web server 避免了重复的文件系统视图加载,极大的节省了内存开销。流读能力增强0.10.0 版本新增了从最早消费数据的参数,通过指定 read.start-commit 为 earliest 即可流读全量 + 增量数据,值得一提的是,当从 earliest 开始消费时,第一次的 file split 抓取会走直接扫描文件视图的方式,在开启 metadata table 功能后,文件的扫描效率会大幅度提升;之后的增量读取部分会扫描增量的 metadata,以便快速轻量地获取增量的文件讯息。新增处理顺序Apache Hudi 的消息合并大体分为两块:增量数据内部合并、历史数据和增量数据合并。消息之间合并通过 write.precombine.field 字段来判断版本新旧,如下图中标注蓝色方块的消息为合并后被选中的消息。0.10.0 版本可以不指定 write.precombine.field 字段,此时使用处理顺序:即后来的消息比较新,对应上图紫色部分被选中的消息。Metadata TableMetadata table 是 0.7.0 Hudi 引入的功能,目的是在查询端减少 DFS 的访问,类似于文件 listings 和 partitions 信息直接通过 metadata table 查询获取。Metadata 在 0.10.0 版本得到大幅加强,Flink 端也支持了 该功能。新版的 metadata table 为同步更新模型,当完成一次成功的数据写入之后,coordinator 会先同步抽取文件列表、partiiton 列表等信息写入 metadata table 然后再写 event log 到 timeline (即 metadata 文件)。Metadata table 的基本文件格式为 avro log,avro log 中的文件编码区别于正常的 MOR data log 文件,是由高效的 HFile data block 构成,这样做的好处是自持更高效率的 kv 查找。同时 metadata table 的 avro log 支持直接压缩成 HFile 文件,进一步优化查询效率。总结和展望在短短的半年时间,Flink On Hudi 至今已积攒了数量庞大的用户群体。积极的用户反馈和丰富的用户场景不断打磨 Flink On Hudi 的易用性和成熟度,使得 Flink On Hudi 项目以非常高效的形式快速迭代。通过和头部公司如头条、B 站等共建的形式,Flink On Hudi 形成了非常良性的开发者用户群。Flink On Hudi 是 Apache Hudi 社区接下来两个大版本主要的发力方向,在未来规划中,主要有三点:完善端到端 streaming ETL 场景 支持原生的 change log、支持维表查询、支持更轻量的去重场景;Streaming 查询优化 record-level 索引,二级索引,独立的文件索引;Batch 查询优化 z-ordering、data skipping。文章转载自Apache Flink公众号  https://mp.weixin.qq.com/s/q_yqa4jTmEHHoItp1V5Vgg免责声明:转载文章版权归原作者所有。如涉及作品内容、版权等问题,请及时联系文章编辑!
  • [基础组件] 【MRS】【Flink功能】Flink任务运行失败
    【功能模块】MRS 8.0.2混合云版本   Flink组件【操作步骤&问题现象】1、登录MRS客户端,kinit登录2、执行命令yarn-session.sh -t conf/ -d提示flink任务运行失败,怀疑是MRS环境问题。【截图信息】
  • [基础组件] flink yarnsession起不来,报错:Couldn't deploy Yarn session cluster
    【功能模块】flink执行yarn-session.sh -t conf/ -d报错【操作步骤&问题现象】1、mrs 3.0.2 线下环境【问题现象】执行 yarn-session.sh -t conf/ -d报 Error while running the Flink session. | org.apache.flink.yarn.cli.FlinkYarnSessionCli (AbstractCustomCommandLine.java:118) org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster【操作步骤】source bigdata_env kinit 用户 密码 在flink目录下执行:yarn-session.sh -t conf/ -d【预期结果】能够成功提交任务,执行通过【实际结果】客户端报: Error while running the Flink session. | org.apache.flink.yarn.cli.FlinkYarnSessionCli (AbstractCustomCommandLine.java:118) org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster 前台页面查看logs日志显示:Exception in thread "main" java.lang.IllegalArgumentException: Can't get Kerberos realm at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)【原因定位】提交任务阻塞,最后导致超时,flink on yarn启动时检查虚拟内存,虚拟内存不足导致的 尝试解决的办法:1.关掉ssl 2、注释掉dns、3.将flink访问Zookeeper目录【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] 【MRS】【flink组件】集成flink组件时代码编译报错
    【功能模块】MRS版本:MRS3.1.0【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [行业动态] 从 Spark 做批处理到 Flink 做流批一体
    摘要:本⽂主要内容为:为什么要做流批一体?当前行业已有的解决方案和现状,优势和劣势探索生产实践场景的经验Shuflle Service 在 Spark 和 Flink 上的对比,以及 Flink 社区后面可以考虑做的工作总结一、为什么要做流批一体做流批一体到底有哪些益处,尤其是在 BI/AI/ETL 的场景下。整体来看,如果能帮助用户做到流批一体,会有以上 4 个比较明显的益处:可以避免代码重复,复用代码核心处理逻辑代码逻辑能完全一致是最好的,但这会有一定的难度。但整体来讲,现在的商业逻辑越来越长,越来越复杂,要求也很多,如果我们使用不同的框架,不同的引擎,用户每次都要重新写一遍逻辑,压力很大并且难以维护。所以整体来讲,尽量避免代码重复,帮助用户复用代码逻辑,就显得尤为重要。流批一体有两个方向这两个方向要考虑的问题很不一样,目前 Flink 做 Streaming、Spark 做 Batch 等等一些框架在批处理或流处理上都比较成熟,都已经产生了很多的单方面用户。当我们想帮助用户移到另外一个方向上时,比如一些商业需求,通常会分成两类,是先从流处理开始到批处理,还是从批处理开始到流处理。之后介绍的两个生产实践场景案例,正好对应这两个方向。减少维护工作量避免维护多套系统,系统之间的差异可能非常大,框架和引擎都不一样,会带来比较多的问题。如果公司内部有多条 pipeline ,一个实时一个离线,会造成数据不一致性,因此会在数据验证、数据准确性查询、数据存储等方面做很多工作,尽量去维护数据的一致性。学习更多框架和引擎很多,商业逻辑既要跑实时,也要跑离线,所以,支持用户时需要学习很多东西。二、当前行业现状Flink 和 Spark 都是同时支持流处理和批处理的引擎。我们一致认为 Flink 的流处理做的比较好,那么它的批处理能做到多好?同时,Spark 的批处理做的比较好,那么它的流处理能不能足够帮助用户解决现有的需求?现在有各种各样的引擎框架,能不能在它们之上有一个统一的框架,类似于联邦处理或者是一些简单的 physical API,比如 Beam API 或者是自定义接口。Beam 方面需要考虑的问题,是它在批处理和流处理上的优化能做到多好?Beam 目前还是偏物理执行,之后的计划是我们需要考究的。LinkedIn,包括其他公司,会考虑做一些自定义接口的解决方案,考虑有一个共通的 SQL 层,通用的 SQL 或 API 层,底下跑不同的框架引擎。这里需要考虑的问题是,像 Spark 、Flink 都是比较成熟的框架了,已经拥有大量的用户群体。当我们提出一个新的 API ,一个新的解决方案,用户的接受度如何? 在公司内部应该如何维护一套新的解决方案?三、生产案例场景后面内容主要聚焦在 Flink 做 batch 的效果,Flink 和 Spark 的简单对比,以及 LinkedIn 内部的一些解决方案。分享两个生产上的实例场景,一个是在机器学习特征工程生成时如何做流批一体,另一个是复杂的 ETL 数据流中如何做流批一体。3.1 案例 A - 机器学习特征工程第一类方向,流处理 -> 批处理,归类为流批一体。 案例 A 的主体逻辑是在机器学习中做特征生成时,如何从流处理到批处理的流批一体。核心的业务逻辑就是特征转换,转化的过程和逻辑比较复杂,用它做一些标准化。比如在 LinkedIn 的页面上输入的一些会员信息背景等,需要将这些信息提取出来标准化掉,才能进行一些推荐,帮你找一些工作等等。当会员的身份信息有更新时,会有过滤、预处理的逻辑、包括读取 Kafka 的过程,做特征转换的过程中,可能会有一些小表查询。这个逻辑是非常直接的,没有复杂的 join 操作及其他的数据处理过程。以前它的 pipeline 是实时的,需要定期从离线 pipeline 中读取补充信息来更新流。这种 backfill 对实时集群的压力是很大的,在 backfill 时,需要等待 backfill 工作起来,需要监控工作流不让实时集群宕掉。所以,用户提出能不能做离线的 backfill,不想通过实时流处理做 backfill。当前我们的用户是使用 Beam on Samza 做流处理,他们非常熟悉 Beam API 和 Spark Dataset API,也会用 Dataset API 去做除了 backfill 之外的一些其他业务处理。需要特别强调的是, Dataset API 很多都是直接对 Object 操作,对 type 安全性要求很高,如果建议这些用户直接改成 SQL 或者 DataFrame 等 workflow 是不切实际的,因为他们已有的业务逻辑都是对 Object 进行直接操作和转化等。在这个案例下,我们能提供给用户一些方案选择,Imperative API 。看下业界提供的方案:第一个选择是即将要统一化的 Flink DataStream API,此前我们在做方案评估时也有调研 Flink DataSet API(deprecated),DataStream API 可以做到统一,并且在流处理和批处理方面的支持都是比较完善的。但缺点是,毕竟是 Imperative API ,可能没有较多的优化,后续应该会持续优化。可以看下 FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) [1]  和 FLIP-134: Batch execution for the DataStream API [2]。第二个选择是 Spark Dataset ,也是用户一个比较自然的选择。可以用 Dataset API 做 Streaming,区别于  Flink 的 Dataset 、DataStream API 等物理 API,它是基于 Spark Dataframe SQL engine 做一些 type safety,优化程度相对好一些。可以看下文章 Databricks: Introducing Apache Spark Datasets [3] 和 Spark Structured Streaming Programming Guide: Unsupported-operations [4]。第三个选择是 Beam On Spark,它目前主要还是用 RDD runner,目前支持带 optimization 的 runner 还是有一定难度的。之后会详细说下 Beam 在案例 B 中的一些 ongoing 的工作。可以看下 Beam Documentation - Using the Apache Spark Runner [5] 和 BEAM-8470 Create a new Spark runner based on Spark Structured streaming framework [6]。从用户的反馈上来说,Flink 的 DataStream (DataSet) API 和 Spark 的 Dataset API 在用户 interface 上是非常接近的。作为 Infra 工程师来说,想要帮用户解决问题,对 API 的熟悉程度就比较重要了。但是 Beam 和 Flink 、Spark 的 API 是非常不一样的,它是 Google 的一条生态系统,我们之前也帮助用户解决了一些问题,他们的 workflow 是在 Beam on Samza 上,他们用 p collections 或者 p transformation 写了一些业务逻辑,output、input 方法的 signature 都很不一样,我们开发了一些轻量级 converter 帮助用户复用已有的业务逻辑,能够更好的用在重新写的 Flink 或 Spark 作业里。从 DAG 上来看,案例 A 是一个非常简单的业务流程,就是简单直接的对 Object 进行转换。Flink 和 Spark 在这个案例下,性能表现上是非常接近的。通常,我们会用 Flink Dashboard UI 看一些异常、业务流程等,相比 Spark 来说是一个比较明显的优势。Spark 去查询 Driver log,查询异常是比较麻烦的。但是 Flink 依旧有几个需要提升的地方:History Server - 支持更丰富的 Metrics 等Spark History Server UI 呈现的 metrics 比较丰富的,对用户做性能分析的帮助是比较大的。Flink 做批处理的地方是否也能让 Spark 用户能看到同等的 metrics 信息量,来降低用户的开发难度,提高用户的开发效率。更好的批处理运维工具分享一个 LinkedIn 从两三年前就在做的事情。LinkedIn 每天有 200000 的作业跑在集群上,需要更好的工具支持批处理用户运维自己的作业,我们提供了 Dr. Elephant 和 GridBench 来帮助用户调试和运维自己的作业。Dr. Elephant 已开源,能帮助用户更好的调试作业,发现问题并提供建议。另外,从测试集群到生产集群之前,会根据 Dr. Elephant 生成的报告里评估结果的分数来决定是否允许投产。GridBench 主要是做一些数据统计分析,包括 CPU 的方法热点分析等,帮助用户优化提升自己的作业。GridBench 后续也有计划开源,可以支持各种引擎框架,包括可以把 Flink 加进来,Flink job 可以用 GridBench 更好的做评估。GridBench Talk: Project Optimum: Spark Performance at LinkedIn Scale [7]。用户不仅可以看到 GridBench 生成的报告,Dr. Elephant 生成的报告,也可以通过命令行看到 job 的一些最基本信息,应用 CPU 时间、资源消耗等,还可以对不同 Spark job 和 Flink job 之间进行对比分析。以上就是 Flink 批处理需要提升的两块地方。3.2 案例 B - 复杂的 ETL 数据流第二类方向,批处理 -> 流处理,归类为流批一体。ETL 数据流的核心逻辑相对复杂一些,比如包括 session window 聚合窗口,每个小时计算一次页面的用户浏览量,分不同的作业,中间共享 metadata table 中的 page key,第一个作业处理 00 时间点,第二个作业处理 01 时间点,做一些 sessionize 的操作,最后输出结果,分 open session、close session ,以此来做增量处理每个小时的数据。这个 workflow 原先是通过 Spark SQL 做的离线增量处理,是纯离线的增量处理。当用户想把作业移到线上做一些实时处理,需要重新搭建一个比如 Beam On Samza 的实时的 workflow,在搭建过程中我们和用户有非常紧密的联系和沟通,用户是遇到非常多的问题的,包括整个开发逻辑的复用,确保两条业务逻辑产生相同的结果,以及数据最终存储的地方等等,花了很长时间迁移,最终效果是不太好的。另外,用户的作业逻辑里同时用 Hive 和 Spark 写了非常多很大很复杂的 UDF ,这块迁移也是非常大的工作量。用户对 Spark SQL 和 Spark DataFrame API 是比较熟悉的。上图中的黑色实线是实时处理的过程,灰色箭头主要是批处理的过程,相当于是一个Lambda结构。针对案例 B,作业中包括很多 join 和 session window,他们之前也是用 Spark SQL 开发作业的。很明显我 们要从 Declartive API 入手,当前提供了 3 种方案:第一个选择是 Flink Table API/SQL ,流处理批处理都可以做,同样的SQL,功能支持很全面,流处理和批处理也都有优化。可以看下文章 Alibaba Cloud Blog: What's All Involved with Blink Merging with Apache Flink? [8] 和 FLINK-11439 INSERT INTO flink_sql SELECT * FROM blink_sql [9]。第二个选择是  Spark DataFrame API/SQL ,也是可以用相同的 interface 做批处理和流处理,但是 Spark 的流处理支持力度还是不够的。可以看下文章 Databricks Blog: Deep Dive into Spark SQL’s Catalyst Optimizer [10] 和 Databricks Blog: Project Tungsten: Bringing Apache Spark Closer to Bare Metal [11]。第三个选择是 Beam Schema Aware API/SQL ,Beam 更多的是物理的 API ,在 Schema Aware API/SQL 上目前都在开展比较早期的工作,暂不考虑。所以,之后的主要分析结果和经验都是从 Flink Table API/SQL 和 Spark DataFrame API/SQL 的之间的对比得出来的。可以看下文章 Beam Design Document - Schema-Aware PCollections [12] 和 Beam User Guide - Beam SQL overview [13]。从用户的角度来说,Flink Table API/SQL 和 Spark DataFrame API/SQL 是非常接近的,有一些比较小的差别,比如 keywords、rules、 join 具体怎么写等等,也会给用户带来一定的困扰,会怀疑自己是不是用错了。Flink 和 Spark 都很好的集成了 Hive ,比如 HIve UDF 复用等,对案例B中的 UDF 迁移,减轻了一半的迁移压力。Flink 在 pipeline 模式下的性能是明显优于 Spark 的,可想而知,要不要落盘对性能影响肯定是比较大的,如果需要大量落盘,每个 stage 都要把数据落到磁盘上,再重新读出来,肯定是要比不落盘的 pipeline 模式的处理性能要差的。pipeline 比较适合短小的处理,在 20 分钟 40 分钟还是有比较大的优势的,如果再长的 pipeline 的容错性肯定不能和 batch 模式相比。Spark 的 batch 性能还是要比 Flink 好一些的。这一块需要根据自己公司内部的案例进行评估。Flink 对 window 的支持明显比其他引擎要丰富的多,比如 session window,用户用起来非常方便。我们用户为了实现 session window ,特意写了非常多的 UDF ,包括做增量处理,把 session 全部 build 起来,把 record 拿出来做处理等等。现在直接用 session window operator ,省了大量的开发消耗。同时 group 聚合等 window 操作也都是流批同时支持的。Session Window:// Session Event-time Window .window(Session withGap 10.minutes on $"rowtime" as $"w") // Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session withGap 10.minutes on $"proctime" as $"w")Slide Window:// Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w") // Sliding Processing-time Window (assuming a processing-time attribute "proctime") .window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w") // Sliding Row-count Window (assuming a processing-time attribute "proctime") .window(Slide over 10.rows every 5.rows on $"proctime" as $"w")UDF 是在引擎框架之间迁移时最大的障碍。如果 UDF 是用 Hive 写的,那是方便迁移的,因为不管是 Flink 还是 Spark 对 Hive UDF 的支持都是很好的,但如果 UDF 是用 Flink 或者 Spark 写的,迁移到任何一个引擎框架,都会遇到非常大的问题,比如迁移到 Presto 做 OLAP 近实时查询。为了实现 UDF 的复用,我们 LinkedIn 在内部开发了一个 transport 项目,已经开源至 github [14] 上, 可以看下 LinkedIn 发表的博客:Transport: Towards Logical Independence Using Translatable Portable UDFs [15]。transport 给所有引擎框架提供一个面向用户的 User API ,提供通用的函数开发接口,底下自动生成基于不同引擎框架的 UDF ,比如 Presto、Hive、Spark、Flink 等。用一个共通的 UDF API 打通所有的引擎框架,能让用户复用自己的业务逻辑。用户可以很容易的上手使用,比如如下用户开发一个 MapFromTwoArraysFunction:public class MapFromTwoArraysFunction extends StdUDF2<StdArray,StdArray,StdMap>{ private StdType _mapType; @Override public List<String> getInputParameterSignatures(){ return ImmutableList.of( "array[K]", "array[V]" ); } @Override public String getOutputParameterSignature(){ return "map(K,V)"; } } @Override public void init(StdFactory stdFactory){ super.init(stdFactory); } @Override public StdMap eval(StdArray a1, StdArray a2){ if(a1.size() != a2.size()) { return null; } StdMap map = getStdFactory().createMap(_mapType); for(int i = 0; i < a1.size; i++) { map.put(a1.get(i), a2.get(i)); } return map; }处理用户的 SQL 迁移问题 ,用户之前是用 Spark SQL 开发的作业,之后想使用流批一体,改成 Flink SQL 。目前的引擎框架还是比较多的,LinkedIn 开发出一个 coral 的解决方案,已在 github [16] 上开源,在 facebook 上也做了一些 talk ,包括和 transport UDF 一起给用户提供一个隔离层使用户可以更好的做到跨引擎的迁移,复用自己的业务逻辑。看下 coral 的执行流程,首先作业脚本中定义 熟悉的 ASCII SQL 和 table 的属性等,之后会生成一个 Coral IR 树状结构,最后翻译成各个引擎的 physical plan。在案例 B 分析中,流批统一,在集群业务量特别大的情况下,用户对批处理的性能、稳定性、成功率等是非常重视的。其中 Shuffle Service ,对批处理性能影响比较大。四、Shuffle Service 在 Spark 和 Flink 上的对比In-memory Shuffle,Spark 和 Flink 都支持,比较快,但不支持可扩展。Hash-based Shuffle ,Spark 和 Flink 都支持 , 相比 In-memory Shuffle ,容错性支持的更好一些,但同样不支持可扩展。Sort-based Shuffle,对大的 Shuffle 支持可扩展,从磁盘读上来一点一点 Sort match 好再读回去,在 FLIP-148: Introduce Sort-Based Blocking Shuffle to Flink  [17] 中也已经支持。External Shuffle Service, 在集群非常繁忙,比如在做动态资源调度时,外挂服务就会非常重要,对 Shuffle 的性能和资源依赖有更好的隔离,隔离之后就可以更好的去调度资源。FLINK-11805 A Common External Shuffle Service Framework [18] 目前处于 reopen 状态。Disaggregate Shuffle,大数据领域都倡导 Cloud Native 云原生,计算存储分离在 Shuffle Service 的设计上也是要考虑的。FLINK-10653 Introduce Pluggable Shuffle Service Architecture [19] 引入了可插拔的 Shuffle Service 架构。Spark 对 Shuffle Service 做了一个比较大的提升,这个工作也是由 LinkedIn 主导的 magnet 项目,形成了一篇名称为 introducing-magnet 的论文 (Magnet: A scalable and performant shuffle architecture for Apache Spark) [20],收录到了 LinkedIn blog 2020 里。magnet 很明显的提升了磁盘读写的效率,从比较小的 random range ,到比较大的顺序读,也会做一些 merging ,而不是随意的随机读取 shuffle data ,避免 random IO 的一些问题。通过 Magent Shuffle Service 缓解了 Shuffle 稳定性和可扩展性方面的问题。在此之前,我们发现了很多 Shuffle 方面的问题,比如 Job failure 等等非常高。如果想用 Flink 做批处理,帮助到以前用 Spark 做批处理的用户,在 Shuffle 上确实要花更大功夫。在 Shuffle 可用性上,会采用 best-effort 方式去推 shuffle blocks,忽略一些大的 block ,保证最终的一致性和准确性; 为 shuffle 临时数据生成一个副本,确保准确性。如果 push 过程特别慢,会有提前终止技术。Magent Shuffle 相比 Vanilla Shuffle ,读取 Shuffle data 的等待时间缩较少了几乎 100%,task 执行时间缩短了几乎 50%,端到端的任务时长也缩短了几乎 30%。五、总结LinkedIn 非常认可和开心看到 Flink 在流处理和批处理上的明显优势,做的更加统一,也在持续优化中。Flink 批处理能力有待提升,如 history server,metrics,调试。用户在开发的时候,需要从用户社区看一些解决方案,整个生态要搭建起来,用户才能方便的用起来。Flink 需要对 shuffle service 和大集群离线工作流投入更多的精力,确保 workflow 的成功率,如果规模大起来之后,如何提供更好的用户支持和对集群进行健康监控等。随着越来越多的框架引擎出现,最好能给到用户一个更加统一的 interface,这一块的挑战是比较大的,包括开发和运维方面,根据 LinkedIn 的经验,还是看到了很多问题的,并不是通过一个单一的解决方案,就能囊括所有的用户使用场景,哪怕是一些 function 或者 expression,也很难完全覆盖到。像 coral、transport UDF。原视频: https://www.bilibili.com/video/BV13a4y1H7XY?p=12参考链接[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API[3] https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html[4] https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations[5] https://beam.apache.org/documentation/runners/spark/[6] https://issues.apache.org/jira/browse/BEAM-8470[7] https://www.youtube.com/watch?v=D47CSeGpBd0[8] https://www.alibabacloud.com/blog/whats-all-involved-with-blink-merging-with-apache-flink_595401[9] https://issues.apache.org/jira/browse/FLINK-11439[10] https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html[11] https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html[12] https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit#heading=h.puuotbien1gf[13] https://beam.apache.org/documentation/dsls/sql/overview/[14] https://github.com/linkedin/transport[15] https://engineering.linkedin.com/blog/2018/11/using-translatable-portable-UDFs[16] https://github.com/linkedin/coral[17] https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink[18] https://issues.apache.org/jira/browse/FLINK-11805[19] https://issues.apache.org/jira/browse/FLINK-10653[20] https://engineering.linkedin.com/blog/2020/introducing-magnet文章转载自过往记忆大数据公众号  https://mp.weixin.qq.com/s/B-xb2rIL07oKuvAKvNa7Fw 免责声明:转载文章版权归原作者所有。如涉及作品内容、版权等问题,请及时联系文章编辑!
  • [行业动态] 顺丰科技 Hudi on Flink 实时数仓实践
    摘要:本文介绍了顺丰科技数仓的架构,趟过的一些问题、使用 Hudi 来优化整个 job 状态的实践细节,以及未来的一些规划。主要内容为:数仓架构Hudi 代码躺过的坑状态优化未来规划顺丰科技早在 2019 年引入 Hudi ,当时是基于 Spark 批处理,2020 年对数据的实时性要求更高公司对架构进行了升级,在社区 Hudi on Flink 的半成品上持续优化实现Binlog 数据 CDC 入湖。在 Hudi 社区飞速发展的同时公司今年对数仓也提出了新的要求,最终采用 Flink + Hudi 的方式来宽表的实时化。过程中遇到了很多问题主要有两点:Hudi Master 代码当时存在一些漏洞;宽表涉及到多个 Join,Top One 等操作使得状态很大。庆幸的是社区的修复速度很给力加上 Hudi 强大 upsert 能力使这两个问题得到以有效的解决。一、数仓架构感兴趣的同学可以参考之前顺丰分享的《Hudi on Flink 在顺丰的实践应用》二、Hudi 代码趟过的坑在去年我们是基于 Hudi 0.6 左右进行的 Hudi on Flink 的实践,代码较老。为了拥抱社区我们使用最新 master 代码进行实践,在大数据量写入场景中,发现了一个比较隐秘的丢数问题,这个问题花了将近两周的时间才定位到。1. Hudi StreamWriteFunction 算子核心流程梳理算子收数据的时候会先把数据按照 fileld 分组缓存好,数据的持续流会使得缓存数据越来越大,当达到一定阈值时便会执行 flush。阈值由 2 个核心参数控制:, 。当单个分组数据达到 64M 或者总缓存数据达到 800M ~ 1G 就会触发 flush 。flush 会调用 client 的 api 去创建一个 WriteHandle,然后把 WriteHandle 放入 Map 进行缓存,一个 handle 可以理解为对应一个文件的 cow。如果一个 fileld 在同一 checkpoint 期间被多次写入,则后一次是基于前一次的 cow, 它的 handle 是一个,判断一个 fileld 是否之前被写入过就是根据上面 Map 缓存得来的。执行 snapshotState 时会把内存的所有分组数据一次进行 flush, 之后对 client 的 handle 进行清空。2. 场景还原Hudi 本身是具备 upsert 能力的,所以我们开始认为 Hudi Sink 在 At Least Once 模式下是没问题的,并且 At Least Once 模式下 Flink 算子不需要等待 Barrier 对齐,能够处理先到的数据使得处理速度更快,于是我们在 Copy On Write 场景中对 Flink CheckpointingMode 设置了 AT_LEAST_ONCE。writeFunction 的上游是文件 fileld 分配算子,假如有一批 insert 数据 A、B、C、D 属于同一个分区并且分配到同一个的 subtask ,但是 A、B 和 C、D 是相邻两个不同的 checkpoint。当 A 进入时如果发现没有新的小文件可以使用,就会创建一个新的 fileld f0,当 B 流入时也会给他分配到 f0 上。同时因为是 AT_LEAST_ONCE 模式,C、D 数据都有可能被处理到也被分配到了 f0 上。也就是说 在 AT_LEAST_ONCE 模式下由于 C、D 数据被提前处理,导致 A、B、C、D 4 条属于两个 checkpoint 的 insert 数据被分配到了同一个 fileld。writeFunction 有可能当接收到 A、B、C 后这个算子的 barrier 就对齐了,会把 A、B、C 进行 flush,而 D 将被遗留到下一个 checkpoint 才处理。A、B、C 是 insert 数据所以就会直接创建一个文件写入,D 属于下一个 checkpoint ,A、B、C 写入时创建的 handle 已被清理了,等到下一个 checkpoint 执行 flush。因为 D 也是 insert 数据所以也会直接创建一个文件写数据,但是 A、B、C、D 的 fileld 是一样的,导致最终 D 创建的文件覆盖了 A、B、C 写入的文件最终导致 A、B、C 数据丢失。3. 问题定位这个问题之所以难定位是因为具有一定随机性,每次丢失的数据都不太一样,而且小数据量不易出现。最终通过开启 Flink 的 Queryable State 进行查询, 查找丢失数据的定位到 fileld, 发现 ABCD state 的 instant 都是 I,然后解析对应 fileld 的所有版本进行跟踪还原。三、状态优化我们对线上最大的离线宽边进行了实时化的,宽表字段较多,涉及到多个表对主表的 left join 还包括一些 Top One 的计算,这些算子都会占用 state. 而我们的数据周期较长需要保存 180 天数据。估算下来状态大小将会达到上百 T,这无疑会对状态的持久化带来很大的压力。但是这些操作放入 Hudi 来做就显得轻而易举。1. Top One 下沉 Hudi在 Hudi 中有一个配置项用来指定使用某个字段对 flush 的数据去重,当出现多条数据需要去重时就会按照整个字段进行比较,保留最大的那条记录,这其实和 Top One 很像。我们在 SQL 上将 Top One 的排序逻辑组合成了一个字段设置为 Hudi 的,同时把这个字段写入 state,同一 key 的数据多次进来时都会和 state 的 进行比较更新。Flink Top One 的 state 默认是保存整记录的所有字段,但是我们只保存了一个字段,大大节省了 state 的大小。2. 多表 Left Join 下沉 Hudi■ 2.1 Flink SQL join我们把这个场景简化成如下一个案例,假如有宽表 t_p 由三张表组成在 Flink SQL join 算子内部会维护一个左表和右表的 state,这都是每个 table 的全字段,且多一次 join 就会多出一个 state. 最终导致 state 大小膨胀,如果 join 算子上游是一个 append 流,state 大小膨胀的效果更明显。■ 2.2 把 Join 改写成 Union All对于上面案例每次 left join 只是补充了几个字段,我们想到用 union all 的方式进行 SQL 改写,union all 需要补齐所有字段,缺的字段用 null 补。我们认为 null 补充的字段不是有效字段。改成从 union all 之后要求 Hudi 具备局部更新的能力才能达到 join 的效果。当收到的数据是来自 t0 的时候就只更新 id 和 name 字段;同理 ,数据是来自 t1 的时候就只更新 age 字段;t2 只更新 sex 字段。不幸的是 Hudi 的默认实现是全字段覆盖,也就是说当收到 t0 的数据时会把 age sex 覆盖成 null, 收到 t1 数据时会把 name sex 覆盖成 null。这显然是不可接受的。这就要求我们对 Hudi sink 进行改造。■ 2.3 Hudi Union All 实现Hudi 在 cow 模式每条记录的更新写入都是对旧数据进行 copy 覆盖写入,似乎只要知道这条记录来自哪个表,哪几个字段是有效的字段就选择性的对 copy 出来的字段进行覆盖即可。但是在分区变更的场景中就不是那么好使了。在分区变更的场景中,数据从一个分区变到另一个分区的逻辑是把旧分区数据删掉,往新分区新增数据。这可能会把一些之前局部更新的字段信息丢失掉。细聊下来 Hudi on Flink 涉及到由几个核心算子组成 pipeline。RowDataToHoodieFunction:这是对收入的数据进行转化成一个 HudiRecord,收到数据是包含全字段的,我们在转化 HudiRecord 的时候只选择了有效字段进行转化。BoostrapFunction:在任务恢复的时候会读取文件加载索引数据,当任务恢复后次算子不做数据转化处理。BucketAssignFunction:这个算子用来对记录分配 location,loaction 包含两部分信息。一是分区目录,另一个是 fileld。fileld 用来标识记录将写入哪个文件,一旦记录被确定写入哪个文件,就会发记录按照 fileld 分组发送到 StreamWriteFunction,StreamWriteFunction 再按文件进行批量写入。原生的 BucketAssignFunction 的算子逻辑如下图,当收到一条记录时会先从 state 里面进行查找是否之前有写过这条记录,如果有就会找对应的 location。如果分区没有发生变更,就把当前这条记录也分配给这个location,如果在 state 中没有找到 location 就会新创建一个 location,把这个新的location 分配给当前记录,并更新到 state。总之这个 state 存储的 location 就是告诉当前记录应该从哪个文件进行更新或者写入。遇到分区变更的场景会复杂一点。假如一条记录从 2020 分区变更成了 2021,就会创建一条删除的记录,它的 loaction 是 state 中的 location。这条记录让下游进行实际的删除操作,然后再创建一个新的 location (分区是 2021) 发送到下游进行 insert。为了在 Hudi 中实现 top one,我们对 state 信息进行了扩展,用来做 Top One 时间字段。对于 StreamWriteFunction 在 Insert 场景中,假如收到了如下 3 条数据 ,,,在执行 flush 时会创建一个全字段的空记录 ,然后依次和 3 条记录进行合并。注意,这个合并过程只会选择有效字段的合并。如下图:在 Update 场景中的更新逻辑类似 insert 场景,假如老数据是 ,新收到了, 这 2 条数据,就会先从文件中把老的数据读出来,然后依次和新收到的数据进行合并,合并步骤同 insert。如下图:这样通过 union all 的方式达到了 left join 的效果,大大节省了 state 的大小。四、未来规划parquet 元数据信息收集,parquet 文件可以从 footer 里面得到每个行列的最大最小等信息,我们计划在写入文件的后把这些信息收集起来,并且基于上一次的 commit 的元数据信息进行合并,生成一个包含所有文件的元数据文件,这样可以在读取数据时进行谓词下推进行文件的过滤。公司致力于打造基于 Hudi 作为底层存储,Flink 作为流批一体化的 SQL 计算引擎,Flink 的批处理 Hudi 这块还涉足不深,未来可能会计划用 Flink 对 Hudi 实现 clustering 等功能,在 Flink 引擎上完善 Hudi 的批处理功能。文章转载至腾讯新闻,作者西北木土 https://xw.qq.com/amphtml/s/20210929A02O4F00 免责声明:转载文章版权归原作者所有。如涉及作品内容、版权等问题,请及时联系文章编辑!
  • [赋能学习] 华为FusionInsight MRS实战 - Flink增强特性之可视化开发平台FlinkSever开发学习
    # 华为FusionInsight MRS实战 - Flink增强特性之可视化开发平台FlinkSever开发学习 ## 背景说明 随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。 SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎自1.7.2版本开始引入Flink SQL的特性,并不断发展。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。 但是真正的要将Flink SQL开发工作投入到实际的生产场景中,如果使用原生的API接口进行作业的开发还是存在门槛较高,易用性低,SQL代码可维护性差的问题。新需求由业务人员提交给IT人员,IT人员排期开发。从需求到上线,周期长,导致错失新业务最佳市场时间窗口。同时,IT人员工作繁重,大量相似Flink作业,成就感低。 ## 华为Flink可视化开发平台FlinkServer优势: - 提供基于Web的可视化开发平台,只需要写SQL即可开发作业,极大降低作业开发门槛。 - 通过作业平台能力开放,支持业务人员自行编写SQL开发作业,快速应对需求,并将IT人员从繁琐的Flink作业开发工作中解放出来; - 同时支持流作业和批作业; - 支持常见的Connector,包括Kafka、Redis、HDFS等 下面将以kafka为例分别使用原生API接口以及FlinkServer进行作业开发,对比突出FlinkServer的优势 ## 场景说明 参考已发论坛帖 [《华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践》](https://bbs.huaweicloud.com/forum/forum.php?mod=viewthread&tid=153494) 需要使用FlinkSQL从一个源kafka topic接收cdl复杂嵌套json数据并进行解析,将解析后的数据发送到另一个kafka topic里 ## 使用原生API接口方案开发flink sql操作步骤 ### 前提条件 - 完成MRS Flink客户端的安装以及配置 - 完成Flink SQL原生接口相关配置 ### 操作步骤 - 使用如下命令首先启动Flink集群 ``` source /opt/hadoopclient/bigdata_env kinit developuser cd /opt/hadoopclient/Flink/flink ./bin/yarn-session.sh -t ssl/ ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163344f5rjefwil6mtbxke.png) - 使用如下命令启动Flink SQL Client ``` cd /opt/hadoopclient/Flink/flink/bin ./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163429t0jgp2og06moblfq.png) - 使用如下flink sql创建源端kafka表,并提取需要的信息: ``` CREATE TABLE huditableout_source( `schema` ROW `fields` ARRAY ROW> >, payload ROW `TIMESTAMP` BIGINT, `data` ROW uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT> >, type1 as `schema`.`fields`[1].type, optional1 as `schema`.`fields`[1].optional, field1 as `schema`.`fields`[1].field, type2 as `schema`.`fields`[2].type, optional2 as `schema`.`fields`[2].optional, field2 as `schema`.`fields`[2].field, ts as payload.`TIMESTAMP`, uid as payload.`data`.uid, uname as payload.`data`.uname, age as payload.`data`.age, sex as payload.`data`.sex, mostlike as payload.`data`.mostlike, lastview as payload.`data`.lastview, totalcost as payload.`data`.totalcost, localts as LOCALTIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163452thqe99fp4okvx7kb.png) - 使用如下flink sql创建目标端kafka表: ``` CREATE TABLE huditableout( type1 VARCHAR(32), optional1 BOOLEAN, field1 VARCHAR(32), type2 VARCHAR(32), optional2 BOOLEAN, field2 VARCHAR(32), ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT, localts TIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout2', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163515mzxekl117ipqhutq.png) - 使用如下flink sql将源端kafka流表写入到目标端kafka流表中 ``` insert into huditableout select type1, optional1, field1, type2, optional2, field2, ts, uid, uname, age, sex, mostlike, lastview, totalcost, localts from huditableout_source; ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163535ei3aqydmxfu2ip2g.png) - 检查测试结果 消费生产源kafka topic的数据(由cdl生成) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/1635564rnwwl5ofye3l1zx.png) 消费目标端kafka topic解析后的数据(flink sql任务生成的结果) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163618u2xcaeyihmqe6kem.png) 可以登录flink原生界面查看任务 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/1636409ujub3e7cvrfudvk.png) - 使用flink sql client方式查看结果 首先使用命令`set execution.result-mode=tableau;` 可以让查询结果直接输出到终端 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163702hvwwtzyynjcqq3q1.png) 使用flink sql查询上面已创建好的流表 `select * from huditableout` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163725mzzc9junt8f06bgx.png) 注意:因为是kafka流表,所以查询结果只会显示select任务启动之后写进该topic的数据 ## 使用FlinkServer可视化开发平台方案开发flink sql操作步骤 ### 前提条件 - 参考产品文档 《基于用户和角色的鉴权》章节创建一个具有“FlinkServer管理操作权限”的用户,使用该用户访问Flink Server ### 操作步骤 - 登录FlinkServer选择作业管理 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/1637469uv8hclqovm3oegh.png) - 创建任务cdl_kafka_json_test3并输入flink sql 说明: 可以看到开发flink sql任务时在FlinkServer界面可以自行设置flink集群规模 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/1638053kpbebzpvqluulgk.png) ``` CREATE TABLE huditableout_source( `schema` ROW `fields` ARRAY ROW> >, payload ROW `TIMESTAMP` BIGINT, `data` ROW uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT> >, type1 as `schema`.`fields`[1].type, optional1 as `schema`.`fields`[1].optional, field1 as `schema`.`fields`[1].field, type2 as `schema`.`fields`[2].type, optional2 as `schema`.`fields`[2].optional, field2 as `schema`.`fields`[2].field, ts as payload.`TIMESTAMP`, uid as payload.`data`.uid, uname as payload.`data`.uname, age as payload.`data`.age, sex as payload.`data`.sex, mostlike as payload.`data`.mostlike, lastview as payload.`data`.lastview, totalcost as payload.`data`.totalcost, localts as LOCALTIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); CREATE TABLE huditableout( type1 VARCHAR(32), optional1 BOOLEAN, field1 VARCHAR(32), type2 VARCHAR(32), optional2 BOOLEAN, field2 VARCHAR(32), ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT, localts TIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout2', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); insert into huditableout select type1, optional1, field1, type2, optional2, field2, ts, uid, uname, age, sex, mostlike, lastview, totalcost, localts from huditableout_source; ``` - 点击语义校验,确保语义校验通过 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163829nwrkssyzp6omceda.png) - 点击提交并启动任务 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163850agxe6k4wnnaiptgl.png) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163911ybtbw6kv3dtwk68d.png) - 检查测试结果 消费生产源kafka topic的数据(由cdl生成) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163933m2zojndreazwjqsl.png) 消费目标端kafka topic解析后的数据(flink sql任务生成的结果) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163952yxqackdpa68qbale.png)
  • [问题求助] 【GaussDB 200 6.5.1】支持Flink SQL CDC的扩展吗?
  • [问题求助] 【香港启德】【DLI功能】Flink作业无法提交
    【功能模块】DLI 模块【操作步骤&问题现象】1、创建新的flink作业 无法提交运行【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [生态空间] 【GaussDB 200 6.5.1】支持Flink SQL CDC的扩展吗?
    【功能模块】【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [二次开发] 【Flink】flink消费kafka任务提交后报错
    【功能模块】按照官方的代码,提交flink消费kafka任务后,出现错误。在flink客户端使用的命令为bin/flink run -yt conf/ssl/ -ys 2 -m yarn-cluster -yjm 1024 -ytm 1024 -c org.mytest.stream.ReadFromKafka /opt/flink/flink.jar --topic topictest --bootstrap.servers $bs --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka【截图信息】【日志信息】(可选,上传日志内容或者附件)[root@node3 flink]# bin/flink run -yt conf/ssl/ -ys 2 -m yarn-cluster -yjm 1024 -ytm 1024 -c org.mytest.stream.ReadFromKafka /opt/flink/flink.jar --topic topictest --bootstrap.servers $bs --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafkaSLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/opt/hadoopclient/Flink/flink/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/hadoopclient/HDFS/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]Cluster started: Yarn cluster with application id application_1625541283910_0063Job has been submitted with JobID f50af8c85ed9c74e813f52c71231674fjava.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f50af8c85ed9c74e813f52c71231674f)    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1651)    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1633)    at org.mytest.stream.ReadFromKafka.main(ReadFromKafka.java:21)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:700)    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:219)    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:932)    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1005)    at java.security.AccessController.doPrivileged(Native Method)    at javax.security.auth.Subject.doAs(Subject.java:422)    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1737)    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1005)Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f50af8c85ed9c74e813f52c71231674f)    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)    at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)    at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)    at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)    ... 19 moreCaused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)    at akka.actor.ActorCell.invoke(ActorCell.scala:561)    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)    at akka.dispatch.Mailbox.run(Mailbox.scala:225)    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.<init>(KafkaFetcher.java:109)    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.createFetcher(FlinkKafkaConsumer.java:237)    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)[root@node3 flink]# MRS版本:3.0.2flink版本:1.10.0
  • [二次开发] 【Flink】flink提交任务加yn后提交失败
    【功能模块】在将flink任务提交到集群时,如果指定-yn参数,任务提交失败。【操作步骤&问题现象】1、使用Flink客户端提交flink任务,按照产品文档中提供的参数提交,任务失败。2、去掉-yn参数重新提交,提交成功。【截图信息】1,失败截图2,成功截图MRS版本:3.0.1flink版本:1.10.0
  • [二次开发] 【Flink产品】使用flink run提交任务报错
    【功能模块】使用flink run命令提交任务报错【操作步骤&问题现象】1、在idea中编写程序,打包,部署到集群中。2、使用flink run提交任务,语句如下:./bin/flink run -yt conf/ssl/ -ys 4 -m yarn-cluster -c org.mytest.stream.StreamTest /opt/flink/flink.jar --host 10.10.10.10 --port 77773,查看yarn中,任务已经提交成功【截图信息】1,提交后报错截2、测试程序主要功能截图
  • [二次开发] 【mrs-flink产品】【调试flink-kafka程序】NoSuchMethodError
    【功能模块】mrs-flink【操作步骤&问题现象】1、使用per-job模式上传样例代码到集群上运行,执行FemaleInfoCollectionFromKafka这个样例;2、flink任务创建成功,但是任务执行时报错,显示KafkaConsumer的一个方法不存在。【截图信息】报错信息:客户端lib里已补充相关jar包,不知道具体缺少哪个?【日志信息】(可选,上传日志内容或者附件)org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)    at akka.actor.ActorCell.invoke(ActorCell.scala:561)    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)    at akka.dispatch.Mailbox.run(Mailbox.scala:225)    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.(KafkaFetcher.java:109)    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.createFetcher(FlinkKafkaConsumer.java:237)    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
  • [Flink] 【mrs产品】【flink功能】flink集群启动时初始化cluster entrypoint失败
    【功能模块】mrs的flink【操作步骤&问题现象】1、执行启动flink集群脚本:./bin/yarn-session.sh -t ssl/ -n 22、显示报错:org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster    at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:380)    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:551)    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$7(FlinkYarnSessionCli.java:789)    at java.security.AccessController.doPrivileged(Native Method)    at javax.security.auth.Subject.doAs(Subject.java:422)    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761)    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:789)Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. Diagnostics from YARN: Application application_1624785727989_0098 failed 2 times in previous 10000 milliseconds due to AM Container for appattempt_1624785727989_0098_000002 exited with  exitCode: 1Failing this attempt.Diagnostics: [2021-07-09 15:44:51.047]Exception from container-launch.Container id: container_e02_1624785727989_0098_02_000001Exit code: 1Exception message: Launch container failedShell output: main : command provided 13.后台日志显示初始化cluster entrypoint失败: Could not start cluster entrypoint YarnSessionClusterEntrypoint. | org.apache.flink.runtime.entrypoint.ClusterEntrypoint (ClusterEntrypoint.java:520) org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnSessionClusterEntrypoint.【截图信息】【日志信息】(可选,上传日志内容或者附件)
总条数:123 到第
上滑加载中