• [基础组件] 【MRS】【Flink功能】Flink任务运行失败
    【功能模块】MRS 8.0.2混合云版本   Flink组件【操作步骤&问题现象】1、登录MRS客户端,kinit登录2、执行命令yarn-session.sh -t conf/ -d提示flink任务运行失败,怀疑是MRS环境问题。【截图信息】
  • [基础组件] flink yarnsession起不来,报错:Couldn't deploy Yarn session cluster
    【功能模块】flink执行yarn-session.sh -t conf/ -d报错【操作步骤&问题现象】1、mrs 3.0.2 线下环境【问题现象】执行 yarn-session.sh -t conf/ -d报 Error while running the Flink session. | org.apache.flink.yarn.cli.FlinkYarnSessionCli (AbstractCustomCommandLine.java:118) org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster【操作步骤】source bigdata_env kinit 用户 密码 在flink目录下执行:yarn-session.sh -t conf/ -d【预期结果】能够成功提交任务,执行通过【实际结果】客户端报: Error while running the Flink session. | org.apache.flink.yarn.cli.FlinkYarnSessionCli (AbstractCustomCommandLine.java:118) org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster 前台页面查看logs日志显示:Exception in thread "main" java.lang.IllegalArgumentException: Can't get Kerberos realm at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)【原因定位】提交任务阻塞,最后导致超时,flink on yarn启动时检查虚拟内存,虚拟内存不足导致的 尝试解决的办法:1.关掉ssl 2、注释掉dns、3.将flink访问Zookeeper目录【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] 【MRS】【flink组件】集成flink组件时代码编译报错
    【功能模块】MRS版本:MRS3.1.0【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [行业动态] 从 Spark 做批处理到 Flink 做流批一体
    摘要:本⽂主要内容为:为什么要做流批一体?当前行业已有的解决方案和现状,优势和劣势探索生产实践场景的经验Shuflle Service 在 Spark 和 Flink 上的对比,以及 Flink 社区后面可以考虑做的工作总结一、为什么要做流批一体做流批一体到底有哪些益处,尤其是在 BI/AI/ETL 的场景下。整体来看,如果能帮助用户做到流批一体,会有以上 4 个比较明显的益处:可以避免代码重复,复用代码核心处理逻辑代码逻辑能完全一致是最好的,但这会有一定的难度。但整体来讲,现在的商业逻辑越来越长,越来越复杂,要求也很多,如果我们使用不同的框架,不同的引擎,用户每次都要重新写一遍逻辑,压力很大并且难以维护。所以整体来讲,尽量避免代码重复,帮助用户复用代码逻辑,就显得尤为重要。流批一体有两个方向这两个方向要考虑的问题很不一样,目前 Flink 做 Streaming、Spark 做 Batch 等等一些框架在批处理或流处理上都比较成熟,都已经产生了很多的单方面用户。当我们想帮助用户移到另外一个方向上时,比如一些商业需求,通常会分成两类,是先从流处理开始到批处理,还是从批处理开始到流处理。之后介绍的两个生产实践场景案例,正好对应这两个方向。减少维护工作量避免维护多套系统,系统之间的差异可能非常大,框架和引擎都不一样,会带来比较多的问题。如果公司内部有多条 pipeline ,一个实时一个离线,会造成数据不一致性,因此会在数据验证、数据准确性查询、数据存储等方面做很多工作,尽量去维护数据的一致性。学习更多框架和引擎很多,商业逻辑既要跑实时,也要跑离线,所以,支持用户时需要学习很多东西。二、当前行业现状Flink 和 Spark 都是同时支持流处理和批处理的引擎。我们一致认为 Flink 的流处理做的比较好,那么它的批处理能做到多好?同时,Spark 的批处理做的比较好,那么它的流处理能不能足够帮助用户解决现有的需求?现在有各种各样的引擎框架,能不能在它们之上有一个统一的框架,类似于联邦处理或者是一些简单的 physical API,比如 Beam API 或者是自定义接口。Beam 方面需要考虑的问题,是它在批处理和流处理上的优化能做到多好?Beam 目前还是偏物理执行,之后的计划是我们需要考究的。LinkedIn,包括其他公司,会考虑做一些自定义接口的解决方案,考虑有一个共通的 SQL 层,通用的 SQL 或 API 层,底下跑不同的框架引擎。这里需要考虑的问题是,像 Spark 、Flink 都是比较成熟的框架了,已经拥有大量的用户群体。当我们提出一个新的 API ,一个新的解决方案,用户的接受度如何? 在公司内部应该如何维护一套新的解决方案?三、生产案例场景后面内容主要聚焦在 Flink 做 batch 的效果,Flink 和 Spark 的简单对比,以及 LinkedIn 内部的一些解决方案。分享两个生产上的实例场景,一个是在机器学习特征工程生成时如何做流批一体,另一个是复杂的 ETL 数据流中如何做流批一体。3.1 案例 A - 机器学习特征工程第一类方向,流处理 -> 批处理,归类为流批一体。 案例 A 的主体逻辑是在机器学习中做特征生成时,如何从流处理到批处理的流批一体。核心的业务逻辑就是特征转换,转化的过程和逻辑比较复杂,用它做一些标准化。比如在 LinkedIn 的页面上输入的一些会员信息背景等,需要将这些信息提取出来标准化掉,才能进行一些推荐,帮你找一些工作等等。当会员的身份信息有更新时,会有过滤、预处理的逻辑、包括读取 Kafka 的过程,做特征转换的过程中,可能会有一些小表查询。这个逻辑是非常直接的,没有复杂的 join 操作及其他的数据处理过程。以前它的 pipeline 是实时的,需要定期从离线 pipeline 中读取补充信息来更新流。这种 backfill 对实时集群的压力是很大的,在 backfill 时,需要等待 backfill 工作起来,需要监控工作流不让实时集群宕掉。所以,用户提出能不能做离线的 backfill,不想通过实时流处理做 backfill。当前我们的用户是使用 Beam on Samza 做流处理,他们非常熟悉 Beam API 和 Spark Dataset API,也会用 Dataset API 去做除了 backfill 之外的一些其他业务处理。需要特别强调的是, Dataset API 很多都是直接对 Object 操作,对 type 安全性要求很高,如果建议这些用户直接改成 SQL 或者 DataFrame 等 workflow 是不切实际的,因为他们已有的业务逻辑都是对 Object 进行直接操作和转化等。在这个案例下,我们能提供给用户一些方案选择,Imperative API 。看下业界提供的方案:第一个选择是即将要统一化的 Flink DataStream API,此前我们在做方案评估时也有调研 Flink DataSet API(deprecated),DataStream API 可以做到统一,并且在流处理和批处理方面的支持都是比较完善的。但缺点是,毕竟是 Imperative API ,可能没有较多的优化,后续应该会持续优化。可以看下 FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) [1]  和 FLIP-134: Batch execution for the DataStream API [2]。第二个选择是 Spark Dataset ,也是用户一个比较自然的选择。可以用 Dataset API 做 Streaming,区别于  Flink 的 Dataset 、DataStream API 等物理 API,它是基于 Spark Dataframe SQL engine 做一些 type safety,优化程度相对好一些。可以看下文章 Databricks: Introducing Apache Spark Datasets [3] 和 Spark Structured Streaming Programming Guide: Unsupported-operations [4]。第三个选择是 Beam On Spark,它目前主要还是用 RDD runner,目前支持带 optimization 的 runner 还是有一定难度的。之后会详细说下 Beam 在案例 B 中的一些 ongoing 的工作。可以看下 Beam Documentation - Using the Apache Spark Runner [5] 和 BEAM-8470 Create a new Spark runner based on Spark Structured streaming framework [6]。从用户的反馈上来说,Flink 的 DataStream (DataSet) API 和 Spark 的 Dataset API 在用户 interface 上是非常接近的。作为 Infra 工程师来说,想要帮用户解决问题,对 API 的熟悉程度就比较重要了。但是 Beam 和 Flink 、Spark 的 API 是非常不一样的,它是 Google 的一条生态系统,我们之前也帮助用户解决了一些问题,他们的 workflow 是在 Beam on Samza 上,他们用 p collections 或者 p transformation 写了一些业务逻辑,output、input 方法的 signature 都很不一样,我们开发了一些轻量级 converter 帮助用户复用已有的业务逻辑,能够更好的用在重新写的 Flink 或 Spark 作业里。从 DAG 上来看,案例 A 是一个非常简单的业务流程,就是简单直接的对 Object 进行转换。Flink 和 Spark 在这个案例下,性能表现上是非常接近的。通常,我们会用 Flink Dashboard UI 看一些异常、业务流程等,相比 Spark 来说是一个比较明显的优势。Spark 去查询 Driver log,查询异常是比较麻烦的。但是 Flink 依旧有几个需要提升的地方:History Server - 支持更丰富的 Metrics 等Spark History Server UI 呈现的 metrics 比较丰富的,对用户做性能分析的帮助是比较大的。Flink 做批处理的地方是否也能让 Spark 用户能看到同等的 metrics 信息量,来降低用户的开发难度,提高用户的开发效率。更好的批处理运维工具分享一个 LinkedIn 从两三年前就在做的事情。LinkedIn 每天有 200000 的作业跑在集群上,需要更好的工具支持批处理用户运维自己的作业,我们提供了 Dr. Elephant 和 GridBench 来帮助用户调试和运维自己的作业。Dr. Elephant 已开源,能帮助用户更好的调试作业,发现问题并提供建议。另外,从测试集群到生产集群之前,会根据 Dr. Elephant 生成的报告里评估结果的分数来决定是否允许投产。GridBench 主要是做一些数据统计分析,包括 CPU 的方法热点分析等,帮助用户优化提升自己的作业。GridBench 后续也有计划开源,可以支持各种引擎框架,包括可以把 Flink 加进来,Flink job 可以用 GridBench 更好的做评估。GridBench Talk: Project Optimum: Spark Performance at LinkedIn Scale [7]。用户不仅可以看到 GridBench 生成的报告,Dr. Elephant 生成的报告,也可以通过命令行看到 job 的一些最基本信息,应用 CPU 时间、资源消耗等,还可以对不同 Spark job 和 Flink job 之间进行对比分析。以上就是 Flink 批处理需要提升的两块地方。3.2 案例 B - 复杂的 ETL 数据流第二类方向,批处理 -> 流处理,归类为流批一体。ETL 数据流的核心逻辑相对复杂一些,比如包括 session window 聚合窗口,每个小时计算一次页面的用户浏览量,分不同的作业,中间共享 metadata table 中的 page key,第一个作业处理 00 时间点,第二个作业处理 01 时间点,做一些 sessionize 的操作,最后输出结果,分 open session、close session ,以此来做增量处理每个小时的数据。这个 workflow 原先是通过 Spark SQL 做的离线增量处理,是纯离线的增量处理。当用户想把作业移到线上做一些实时处理,需要重新搭建一个比如 Beam On Samza 的实时的 workflow,在搭建过程中我们和用户有非常紧密的联系和沟通,用户是遇到非常多的问题的,包括整个开发逻辑的复用,确保两条业务逻辑产生相同的结果,以及数据最终存储的地方等等,花了很长时间迁移,最终效果是不太好的。另外,用户的作业逻辑里同时用 Hive 和 Spark 写了非常多很大很复杂的 UDF ,这块迁移也是非常大的工作量。用户对 Spark SQL 和 Spark DataFrame API 是比较熟悉的。上图中的黑色实线是实时处理的过程,灰色箭头主要是批处理的过程,相当于是一个Lambda结构。针对案例 B,作业中包括很多 join 和 session window,他们之前也是用 Spark SQL 开发作业的。很明显我 们要从 Declartive API 入手,当前提供了 3 种方案:第一个选择是 Flink Table API/SQL ,流处理批处理都可以做,同样的SQL,功能支持很全面,流处理和批处理也都有优化。可以看下文章 Alibaba Cloud Blog: What's All Involved with Blink Merging with Apache Flink? [8] 和 FLINK-11439 INSERT INTO flink_sql SELECT * FROM blink_sql [9]。第二个选择是  Spark DataFrame API/SQL ,也是可以用相同的 interface 做批处理和流处理,但是 Spark 的流处理支持力度还是不够的。可以看下文章 Databricks Blog: Deep Dive into Spark SQL’s Catalyst Optimizer [10] 和 Databricks Blog: Project Tungsten: Bringing Apache Spark Closer to Bare Metal [11]。第三个选择是 Beam Schema Aware API/SQL ,Beam 更多的是物理的 API ,在 Schema Aware API/SQL 上目前都在开展比较早期的工作,暂不考虑。所以,之后的主要分析结果和经验都是从 Flink Table API/SQL 和 Spark DataFrame API/SQL 的之间的对比得出来的。可以看下文章 Beam Design Document - Schema-Aware PCollections [12] 和 Beam User Guide - Beam SQL overview [13]。从用户的角度来说,Flink Table API/SQL 和 Spark DataFrame API/SQL 是非常接近的,有一些比较小的差别,比如 keywords、rules、 join 具体怎么写等等,也会给用户带来一定的困扰,会怀疑自己是不是用错了。Flink 和 Spark 都很好的集成了 Hive ,比如 HIve UDF 复用等,对案例B中的 UDF 迁移,减轻了一半的迁移压力。Flink 在 pipeline 模式下的性能是明显优于 Spark 的,可想而知,要不要落盘对性能影响肯定是比较大的,如果需要大量落盘,每个 stage 都要把数据落到磁盘上,再重新读出来,肯定是要比不落盘的 pipeline 模式的处理性能要差的。pipeline 比较适合短小的处理,在 20 分钟 40 分钟还是有比较大的优势的,如果再长的 pipeline 的容错性肯定不能和 batch 模式相比。Spark 的 batch 性能还是要比 Flink 好一些的。这一块需要根据自己公司内部的案例进行评估。Flink 对 window 的支持明显比其他引擎要丰富的多,比如 session window,用户用起来非常方便。我们用户为了实现 session window ,特意写了非常多的 UDF ,包括做增量处理,把 session 全部 build 起来,把 record 拿出来做处理等等。现在直接用 session window operator ,省了大量的开发消耗。同时 group 聚合等 window 操作也都是流批同时支持的。Session Window:// Session Event-time Window .window(Session withGap 10.minutes on $"rowtime" as $"w") // Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session withGap 10.minutes on $"proctime" as $"w")Slide Window:// Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w") // Sliding Processing-time Window (assuming a processing-time attribute "proctime") .window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w") // Sliding Row-count Window (assuming a processing-time attribute "proctime") .window(Slide over 10.rows every 5.rows on $"proctime" as $"w")UDF 是在引擎框架之间迁移时最大的障碍。如果 UDF 是用 Hive 写的,那是方便迁移的,因为不管是 Flink 还是 Spark 对 Hive UDF 的支持都是很好的,但如果 UDF 是用 Flink 或者 Spark 写的,迁移到任何一个引擎框架,都会遇到非常大的问题,比如迁移到 Presto 做 OLAP 近实时查询。为了实现 UDF 的复用,我们 LinkedIn 在内部开发了一个 transport 项目,已经开源至 github [14] 上, 可以看下 LinkedIn 发表的博客:Transport: Towards Logical Independence Using Translatable Portable UDFs [15]。transport 给所有引擎框架提供一个面向用户的 User API ,提供通用的函数开发接口,底下自动生成基于不同引擎框架的 UDF ,比如 Presto、Hive、Spark、Flink 等。用一个共通的 UDF API 打通所有的引擎框架,能让用户复用自己的业务逻辑。用户可以很容易的上手使用,比如如下用户开发一个 MapFromTwoArraysFunction:public class MapFromTwoArraysFunction extends StdUDF2<StdArray,StdArray,StdMap>{ private StdType _mapType; @Override public List<String> getInputParameterSignatures(){ return ImmutableList.of( "array[K]", "array[V]" ); } @Override public String getOutputParameterSignature(){ return "map(K,V)"; } } @Override public void init(StdFactory stdFactory){ super.init(stdFactory); } @Override public StdMap eval(StdArray a1, StdArray a2){ if(a1.size() != a2.size()) { return null; } StdMap map = getStdFactory().createMap(_mapType); for(int i = 0; i < a1.size; i++) { map.put(a1.get(i), a2.get(i)); } return map; }处理用户的 SQL 迁移问题 ,用户之前是用 Spark SQL 开发的作业,之后想使用流批一体,改成 Flink SQL 。目前的引擎框架还是比较多的,LinkedIn 开发出一个 coral 的解决方案,已在 github [16] 上开源,在 facebook 上也做了一些 talk ,包括和 transport UDF 一起给用户提供一个隔离层使用户可以更好的做到跨引擎的迁移,复用自己的业务逻辑。看下 coral 的执行流程,首先作业脚本中定义 熟悉的 ASCII SQL 和 table 的属性等,之后会生成一个 Coral IR 树状结构,最后翻译成各个引擎的 physical plan。在案例 B 分析中,流批统一,在集群业务量特别大的情况下,用户对批处理的性能、稳定性、成功率等是非常重视的。其中 Shuffle Service ,对批处理性能影响比较大。四、Shuffle Service 在 Spark 和 Flink 上的对比In-memory Shuffle,Spark 和 Flink 都支持,比较快,但不支持可扩展。Hash-based Shuffle ,Spark 和 Flink 都支持 , 相比 In-memory Shuffle ,容错性支持的更好一些,但同样不支持可扩展。Sort-based Shuffle,对大的 Shuffle 支持可扩展,从磁盘读上来一点一点 Sort match 好再读回去,在 FLIP-148: Introduce Sort-Based Blocking Shuffle to Flink  [17] 中也已经支持。External Shuffle Service, 在集群非常繁忙,比如在做动态资源调度时,外挂服务就会非常重要,对 Shuffle 的性能和资源依赖有更好的隔离,隔离之后就可以更好的去调度资源。FLINK-11805 A Common External Shuffle Service Framework [18] 目前处于 reopen 状态。Disaggregate Shuffle,大数据领域都倡导 Cloud Native 云原生,计算存储分离在 Shuffle Service 的设计上也是要考虑的。FLINK-10653 Introduce Pluggable Shuffle Service Architecture [19] 引入了可插拔的 Shuffle Service 架构。Spark 对 Shuffle Service 做了一个比较大的提升,这个工作也是由 LinkedIn 主导的 magnet 项目,形成了一篇名称为 introducing-magnet 的论文 (Magnet: A scalable and performant shuffle architecture for Apache Spark) [20],收录到了 LinkedIn blog 2020 里。magnet 很明显的提升了磁盘读写的效率,从比较小的 random range ,到比较大的顺序读,也会做一些 merging ,而不是随意的随机读取 shuffle data ,避免 random IO 的一些问题。通过 Magent Shuffle Service 缓解了 Shuffle 稳定性和可扩展性方面的问题。在此之前,我们发现了很多 Shuffle 方面的问题,比如 Job failure 等等非常高。如果想用 Flink 做批处理,帮助到以前用 Spark 做批处理的用户,在 Shuffle 上确实要花更大功夫。在 Shuffle 可用性上,会采用 best-effort 方式去推 shuffle blocks,忽略一些大的 block ,保证最终的一致性和准确性; 为 shuffle 临时数据生成一个副本,确保准确性。如果 push 过程特别慢,会有提前终止技术。Magent Shuffle 相比 Vanilla Shuffle ,读取 Shuffle data 的等待时间缩较少了几乎 100%,task 执行时间缩短了几乎 50%,端到端的任务时长也缩短了几乎 30%。五、总结LinkedIn 非常认可和开心看到 Flink 在流处理和批处理上的明显优势,做的更加统一,也在持续优化中。Flink 批处理能力有待提升,如 history server,metrics,调试。用户在开发的时候,需要从用户社区看一些解决方案,整个生态要搭建起来,用户才能方便的用起来。Flink 需要对 shuffle service 和大集群离线工作流投入更多的精力,确保 workflow 的成功率,如果规模大起来之后,如何提供更好的用户支持和对集群进行健康监控等。随着越来越多的框架引擎出现,最好能给到用户一个更加统一的 interface,这一块的挑战是比较大的,包括开发和运维方面,根据 LinkedIn 的经验,还是看到了很多问题的,并不是通过一个单一的解决方案,就能囊括所有的用户使用场景,哪怕是一些 function 或者 expression,也很难完全覆盖到。像 coral、transport UDF。原视频: https://www.bilibili.com/video/BV13a4y1H7XY?p=12参考链接[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API[3] https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html[4] https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations[5] https://beam.apache.org/documentation/runners/spark/[6] https://issues.apache.org/jira/browse/BEAM-8470[7] https://www.youtube.com/watch?v=D47CSeGpBd0[8] https://www.alibabacloud.com/blog/whats-all-involved-with-blink-merging-with-apache-flink_595401[9] https://issues.apache.org/jira/browse/FLINK-11439[10] https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html[11] https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html[12] https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit#heading=h.puuotbien1gf[13] https://beam.apache.org/documentation/dsls/sql/overview/[14] https://github.com/linkedin/transport[15] https://engineering.linkedin.com/blog/2018/11/using-translatable-portable-UDFs[16] https://github.com/linkedin/coral[17] https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink[18] https://issues.apache.org/jira/browse/FLINK-11805[19] https://issues.apache.org/jira/browse/FLINK-10653[20] https://engineering.linkedin.com/blog/2020/introducing-magnet文章转载自过往记忆大数据公众号  https://mp.weixin.qq.com/s/B-xb2rIL07oKuvAKvNa7Fw 免责声明:转载文章版权归原作者所有。如涉及作品内容、版权等问题,请及时联系文章编辑!
  • [行业动态] 顺丰科技 Hudi on Flink 实时数仓实践
    摘要:本文介绍了顺丰科技数仓的架构,趟过的一些问题、使用 Hudi 来优化整个 job 状态的实践细节,以及未来的一些规划。主要内容为:数仓架构Hudi 代码躺过的坑状态优化未来规划顺丰科技早在 2019 年引入 Hudi ,当时是基于 Spark 批处理,2020 年对数据的实时性要求更高公司对架构进行了升级,在社区 Hudi on Flink 的半成品上持续优化实现Binlog 数据 CDC 入湖。在 Hudi 社区飞速发展的同时公司今年对数仓也提出了新的要求,最终采用 Flink + Hudi 的方式来宽表的实时化。过程中遇到了很多问题主要有两点:Hudi Master 代码当时存在一些漏洞;宽表涉及到多个 Join,Top One 等操作使得状态很大。庆幸的是社区的修复速度很给力加上 Hudi 强大 upsert 能力使这两个问题得到以有效的解决。一、数仓架构感兴趣的同学可以参考之前顺丰分享的《Hudi on Flink 在顺丰的实践应用》二、Hudi 代码趟过的坑在去年我们是基于 Hudi 0.6 左右进行的 Hudi on Flink 的实践,代码较老。为了拥抱社区我们使用最新 master 代码进行实践,在大数据量写入场景中,发现了一个比较隐秘的丢数问题,这个问题花了将近两周的时间才定位到。1. Hudi StreamWriteFunction 算子核心流程梳理算子收数据的时候会先把数据按照 fileld 分组缓存好,数据的持续流会使得缓存数据越来越大,当达到一定阈值时便会执行 flush。阈值由 2 个核心参数控制:, 。当单个分组数据达到 64M 或者总缓存数据达到 800M ~ 1G 就会触发 flush 。flush 会调用 client 的 api 去创建一个 WriteHandle,然后把 WriteHandle 放入 Map 进行缓存,一个 handle 可以理解为对应一个文件的 cow。如果一个 fileld 在同一 checkpoint 期间被多次写入,则后一次是基于前一次的 cow, 它的 handle 是一个,判断一个 fileld 是否之前被写入过就是根据上面 Map 缓存得来的。执行 snapshotState 时会把内存的所有分组数据一次进行 flush, 之后对 client 的 handle 进行清空。2. 场景还原Hudi 本身是具备 upsert 能力的,所以我们开始认为 Hudi Sink 在 At Least Once 模式下是没问题的,并且 At Least Once 模式下 Flink 算子不需要等待 Barrier 对齐,能够处理先到的数据使得处理速度更快,于是我们在 Copy On Write 场景中对 Flink CheckpointingMode 设置了 AT_LEAST_ONCE。writeFunction 的上游是文件 fileld 分配算子,假如有一批 insert 数据 A、B、C、D 属于同一个分区并且分配到同一个的 subtask ,但是 A、B 和 C、D 是相邻两个不同的 checkpoint。当 A 进入时如果发现没有新的小文件可以使用,就会创建一个新的 fileld f0,当 B 流入时也会给他分配到 f0 上。同时因为是 AT_LEAST_ONCE 模式,C、D 数据都有可能被处理到也被分配到了 f0 上。也就是说 在 AT_LEAST_ONCE 模式下由于 C、D 数据被提前处理,导致 A、B、C、D 4 条属于两个 checkpoint 的 insert 数据被分配到了同一个 fileld。writeFunction 有可能当接收到 A、B、C 后这个算子的 barrier 就对齐了,会把 A、B、C 进行 flush,而 D 将被遗留到下一个 checkpoint 才处理。A、B、C 是 insert 数据所以就会直接创建一个文件写入,D 属于下一个 checkpoint ,A、B、C 写入时创建的 handle 已被清理了,等到下一个 checkpoint 执行 flush。因为 D 也是 insert 数据所以也会直接创建一个文件写数据,但是 A、B、C、D 的 fileld 是一样的,导致最终 D 创建的文件覆盖了 A、B、C 写入的文件最终导致 A、B、C 数据丢失。3. 问题定位这个问题之所以难定位是因为具有一定随机性,每次丢失的数据都不太一样,而且小数据量不易出现。最终通过开启 Flink 的 Queryable State 进行查询, 查找丢失数据的定位到 fileld, 发现 ABCD state 的 instant 都是 I,然后解析对应 fileld 的所有版本进行跟踪还原。三、状态优化我们对线上最大的离线宽边进行了实时化的,宽表字段较多,涉及到多个表对主表的 left join 还包括一些 Top One 的计算,这些算子都会占用 state. 而我们的数据周期较长需要保存 180 天数据。估算下来状态大小将会达到上百 T,这无疑会对状态的持久化带来很大的压力。但是这些操作放入 Hudi 来做就显得轻而易举。1. Top One 下沉 Hudi在 Hudi 中有一个配置项用来指定使用某个字段对 flush 的数据去重,当出现多条数据需要去重时就会按照整个字段进行比较,保留最大的那条记录,这其实和 Top One 很像。我们在 SQL 上将 Top One 的排序逻辑组合成了一个字段设置为 Hudi 的,同时把这个字段写入 state,同一 key 的数据多次进来时都会和 state 的 进行比较更新。Flink Top One 的 state 默认是保存整记录的所有字段,但是我们只保存了一个字段,大大节省了 state 的大小。2. 多表 Left Join 下沉 Hudi■ 2.1 Flink SQL join我们把这个场景简化成如下一个案例,假如有宽表 t_p 由三张表组成在 Flink SQL join 算子内部会维护一个左表和右表的 state,这都是每个 table 的全字段,且多一次 join 就会多出一个 state. 最终导致 state 大小膨胀,如果 join 算子上游是一个 append 流,state 大小膨胀的效果更明显。■ 2.2 把 Join 改写成 Union All对于上面案例每次 left join 只是补充了几个字段,我们想到用 union all 的方式进行 SQL 改写,union all 需要补齐所有字段,缺的字段用 null 补。我们认为 null 补充的字段不是有效字段。改成从 union all 之后要求 Hudi 具备局部更新的能力才能达到 join 的效果。当收到的数据是来自 t0 的时候就只更新 id 和 name 字段;同理 ,数据是来自 t1 的时候就只更新 age 字段;t2 只更新 sex 字段。不幸的是 Hudi 的默认实现是全字段覆盖,也就是说当收到 t0 的数据时会把 age sex 覆盖成 null, 收到 t1 数据时会把 name sex 覆盖成 null。这显然是不可接受的。这就要求我们对 Hudi sink 进行改造。■ 2.3 Hudi Union All 实现Hudi 在 cow 模式每条记录的更新写入都是对旧数据进行 copy 覆盖写入,似乎只要知道这条记录来自哪个表,哪几个字段是有效的字段就选择性的对 copy 出来的字段进行覆盖即可。但是在分区变更的场景中就不是那么好使了。在分区变更的场景中,数据从一个分区变到另一个分区的逻辑是把旧分区数据删掉,往新分区新增数据。这可能会把一些之前局部更新的字段信息丢失掉。细聊下来 Hudi on Flink 涉及到由几个核心算子组成 pipeline。RowDataToHoodieFunction:这是对收入的数据进行转化成一个 HudiRecord,收到数据是包含全字段的,我们在转化 HudiRecord 的时候只选择了有效字段进行转化。BoostrapFunction:在任务恢复的时候会读取文件加载索引数据,当任务恢复后次算子不做数据转化处理。BucketAssignFunction:这个算子用来对记录分配 location,loaction 包含两部分信息。一是分区目录,另一个是 fileld。fileld 用来标识记录将写入哪个文件,一旦记录被确定写入哪个文件,就会发记录按照 fileld 分组发送到 StreamWriteFunction,StreamWriteFunction 再按文件进行批量写入。原生的 BucketAssignFunction 的算子逻辑如下图,当收到一条记录时会先从 state 里面进行查找是否之前有写过这条记录,如果有就会找对应的 location。如果分区没有发生变更,就把当前这条记录也分配给这个location,如果在 state 中没有找到 location 就会新创建一个 location,把这个新的location 分配给当前记录,并更新到 state。总之这个 state 存储的 location 就是告诉当前记录应该从哪个文件进行更新或者写入。遇到分区变更的场景会复杂一点。假如一条记录从 2020 分区变更成了 2021,就会创建一条删除的记录,它的 loaction 是 state 中的 location。这条记录让下游进行实际的删除操作,然后再创建一个新的 location (分区是 2021) 发送到下游进行 insert。为了在 Hudi 中实现 top one,我们对 state 信息进行了扩展,用来做 Top One 时间字段。对于 StreamWriteFunction 在 Insert 场景中,假如收到了如下 3 条数据 ,,,在执行 flush 时会创建一个全字段的空记录 ,然后依次和 3 条记录进行合并。注意,这个合并过程只会选择有效字段的合并。如下图:在 Update 场景中的更新逻辑类似 insert 场景,假如老数据是 ,新收到了, 这 2 条数据,就会先从文件中把老的数据读出来,然后依次和新收到的数据进行合并,合并步骤同 insert。如下图:这样通过 union all 的方式达到了 left join 的效果,大大节省了 state 的大小。四、未来规划parquet 元数据信息收集,parquet 文件可以从 footer 里面得到每个行列的最大最小等信息,我们计划在写入文件的后把这些信息收集起来,并且基于上一次的 commit 的元数据信息进行合并,生成一个包含所有文件的元数据文件,这样可以在读取数据时进行谓词下推进行文件的过滤。公司致力于打造基于 Hudi 作为底层存储,Flink 作为流批一体化的 SQL 计算引擎,Flink 的批处理 Hudi 这块还涉足不深,未来可能会计划用 Flink 对 Hudi 实现 clustering 等功能,在 Flink 引擎上完善 Hudi 的批处理功能。文章转载至腾讯新闻,作者西北木土 https://xw.qq.com/amphtml/s/20210929A02O4F00 免责声明:转载文章版权归原作者所有。如涉及作品内容、版权等问题,请及时联系文章编辑!
  • [赋能学习] 华为FusionInsight MRS实战 - Flink增强特性之可视化开发平台FlinkSever开发学习
    # 华为FusionInsight MRS实战 - Flink增强特性之可视化开发平台FlinkSever开发学习 ## 背景说明 随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。 SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎自1.7.2版本开始引入Flink SQL的特性,并不断发展。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。 但是真正的要将Flink SQL开发工作投入到实际的生产场景中,如果使用原生的API接口进行作业的开发还是存在门槛较高,易用性低,SQL代码可维护性差的问题。新需求由业务人员提交给IT人员,IT人员排期开发。从需求到上线,周期长,导致错失新业务最佳市场时间窗口。同时,IT人员工作繁重,大量相似Flink作业,成就感低。 ## 华为Flink可视化开发平台FlinkServer优势: - 提供基于Web的可视化开发平台,只需要写SQL即可开发作业,极大降低作业开发门槛。 - 通过作业平台能力开放,支持业务人员自行编写SQL开发作业,快速应对需求,并将IT人员从繁琐的Flink作业开发工作中解放出来; - 同时支持流作业和批作业; - 支持常见的Connector,包括Kafka、Redis、HDFS等 下面将以kafka为例分别使用原生API接口以及FlinkServer进行作业开发,对比突出FlinkServer的优势 ## 场景说明 参考已发论坛帖 [《华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践》](https://bbs.huaweicloud.com/forum/forum.php?mod=viewthread&tid=153494) 需要使用FlinkSQL从一个源kafka topic接收cdl复杂嵌套json数据并进行解析,将解析后的数据发送到另一个kafka topic里 ## 使用原生API接口方案开发flink sql操作步骤 ### 前提条件 - 完成MRS Flink客户端的安装以及配置 - 完成Flink SQL原生接口相关配置 ### 操作步骤 - 使用如下命令首先启动Flink集群 ``` source /opt/hadoopclient/bigdata_env kinit developuser cd /opt/hadoopclient/Flink/flink ./bin/yarn-session.sh -t ssl/ ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163344f5rjefwil6mtbxke.png) - 使用如下命令启动Flink SQL Client ``` cd /opt/hadoopclient/Flink/flink/bin ./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163429t0jgp2og06moblfq.png) - 使用如下flink sql创建源端kafka表,并提取需要的信息: ``` CREATE TABLE huditableout_source( `schema` ROW `fields` ARRAY ROW> >, payload ROW `TIMESTAMP` BIGINT, `data` ROW uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT> >, type1 as `schema`.`fields`[1].type, optional1 as `schema`.`fields`[1].optional, field1 as `schema`.`fields`[1].field, type2 as `schema`.`fields`[2].type, optional2 as `schema`.`fields`[2].optional, field2 as `schema`.`fields`[2].field, ts as payload.`TIMESTAMP`, uid as payload.`data`.uid, uname as payload.`data`.uname, age as payload.`data`.age, sex as payload.`data`.sex, mostlike as payload.`data`.mostlike, lastview as payload.`data`.lastview, totalcost as payload.`data`.totalcost, localts as LOCALTIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163452thqe99fp4okvx7kb.png) - 使用如下flink sql创建目标端kafka表: ``` CREATE TABLE huditableout( type1 VARCHAR(32), optional1 BOOLEAN, field1 VARCHAR(32), type2 VARCHAR(32), optional2 BOOLEAN, field2 VARCHAR(32), ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT, localts TIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout2', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163515mzxekl117ipqhutq.png) - 使用如下flink sql将源端kafka流表写入到目标端kafka流表中 ``` insert into huditableout select type1, optional1, field1, type2, optional2, field2, ts, uid, uname, age, sex, mostlike, lastview, totalcost, localts from huditableout_source; ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163535ei3aqydmxfu2ip2g.png) - 检查测试结果 消费生产源kafka topic的数据(由cdl生成) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/1635564rnwwl5ofye3l1zx.png) 消费目标端kafka topic解析后的数据(flink sql任务生成的结果) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163618u2xcaeyihmqe6kem.png) 可以登录flink原生界面查看任务 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/1636409ujub3e7cvrfudvk.png) - 使用flink sql client方式查看结果 首先使用命令`set execution.result-mode=tableau;` 可以让查询结果直接输出到终端 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163702hvwwtzyynjcqq3q1.png) 使用flink sql查询上面已创建好的流表 `select * from huditableout` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163725mzzc9junt8f06bgx.png) 注意:因为是kafka流表,所以查询结果只会显示select任务启动之后写进该topic的数据 ## 使用FlinkServer可视化开发平台方案开发flink sql操作步骤 ### 前提条件 - 参考产品文档 《基于用户和角色的鉴权》章节创建一个具有“FlinkServer管理操作权限”的用户,使用该用户访问Flink Server ### 操作步骤 - 登录FlinkServer选择作业管理 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/1637469uv8hclqovm3oegh.png) - 创建任务cdl_kafka_json_test3并输入flink sql 说明: 可以看到开发flink sql任务时在FlinkServer界面可以自行设置flink集群规模 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/1638053kpbebzpvqluulgk.png) ``` CREATE TABLE huditableout_source( `schema` ROW `fields` ARRAY ROW> >, payload ROW `TIMESTAMP` BIGINT, `data` ROW uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT> >, type1 as `schema`.`fields`[1].type, optional1 as `schema`.`fields`[1].optional, field1 as `schema`.`fields`[1].field, type2 as `schema`.`fields`[2].type, optional2 as `schema`.`fields`[2].optional, field2 as `schema`.`fields`[2].field, ts as payload.`TIMESTAMP`, uid as payload.`data`.uid, uname as payload.`data`.uname, age as payload.`data`.age, sex as payload.`data`.sex, mostlike as payload.`data`.mostlike, lastview as payload.`data`.lastview, totalcost as payload.`data`.totalcost, localts as LOCALTIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); CREATE TABLE huditableout( type1 VARCHAR(32), optional1 BOOLEAN, field1 VARCHAR(32), type2 VARCHAR(32), optional2 BOOLEAN, field2 VARCHAR(32), ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT, localts TIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout2', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); insert into huditableout select type1, optional1, field1, type2, optional2, field2, ts, uid, uname, age, sex, mostlike, lastview, totalcost, localts from huditableout_source; ``` - 点击语义校验,确保语义校验通过 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163829nwrkssyzp6omceda.png) - 点击提交并启动任务 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163850agxe6k4wnnaiptgl.png) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163911ybtbw6kv3dtwk68d.png) - 检查测试结果 消费生产源kafka topic的数据(由cdl生成) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163933m2zojndreazwjqsl.png) 消费目标端kafka topic解析后的数据(flink sql任务生成的结果) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202110/14/163952yxqackdpa68qbale.png)
  • [问题求助] 【GaussDB 200 6.5.1】支持Flink SQL CDC的扩展吗?
  • [问题求助] 【香港启德】【DLI功能】Flink作业无法提交
    【功能模块】DLI 模块【操作步骤&问题现象】1、创建新的flink作业 无法提交运行【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [生态空间] 【GaussDB 200 6.5.1】支持Flink SQL CDC的扩展吗?
    【功能模块】【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [二次开发] 【Flink】flink消费kafka任务提交后报错
    【功能模块】按照官方的代码,提交flink消费kafka任务后,出现错误。在flink客户端使用的命令为bin/flink run -yt conf/ssl/ -ys 2 -m yarn-cluster -yjm 1024 -ytm 1024 -c org.mytest.stream.ReadFromKafka /opt/flink/flink.jar --topic topictest --bootstrap.servers $bs --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka【截图信息】【日志信息】(可选,上传日志内容或者附件)[root@node3 flink]# bin/flink run -yt conf/ssl/ -ys 2 -m yarn-cluster -yjm 1024 -ytm 1024 -c org.mytest.stream.ReadFromKafka /opt/flink/flink.jar --topic topictest --bootstrap.servers $bs --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafkaSLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/opt/hadoopclient/Flink/flink/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/hadoopclient/HDFS/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]Cluster started: Yarn cluster with application id application_1625541283910_0063Job has been submitted with JobID f50af8c85ed9c74e813f52c71231674fjava.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f50af8c85ed9c74e813f52c71231674f)    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1651)    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1633)    at org.mytest.stream.ReadFromKafka.main(ReadFromKafka.java:21)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:700)    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:219)    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:932)    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1005)    at java.security.AccessController.doPrivileged(Native Method)    at javax.security.auth.Subject.doAs(Subject.java:422)    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1737)    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1005)Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f50af8c85ed9c74e813f52c71231674f)    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)    at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)    at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)    at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)    ... 19 moreCaused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)    at akka.actor.ActorCell.invoke(ActorCell.scala:561)    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)    at akka.dispatch.Mailbox.run(Mailbox.scala:225)    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.<init>(KafkaFetcher.java:109)    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.createFetcher(FlinkKafkaConsumer.java:237)    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)[root@node3 flink]# MRS版本:3.0.2flink版本:1.10.0
  • [二次开发] 【Flink】flink提交任务加yn后提交失败
    【功能模块】在将flink任务提交到集群时,如果指定-yn参数,任务提交失败。【操作步骤&问题现象】1、使用Flink客户端提交flink任务,按照产品文档中提供的参数提交,任务失败。2、去掉-yn参数重新提交,提交成功。【截图信息】1,失败截图2,成功截图MRS版本:3.0.1flink版本:1.10.0
  • [二次开发] 【Flink产品】使用flink run提交任务报错
    【功能模块】使用flink run命令提交任务报错【操作步骤&问题现象】1、在idea中编写程序,打包,部署到集群中。2、使用flink run提交任务,语句如下:./bin/flink run -yt conf/ssl/ -ys 4 -m yarn-cluster -c org.mytest.stream.StreamTest /opt/flink/flink.jar --host 10.10.10.10 --port 77773,查看yarn中,任务已经提交成功【截图信息】1,提交后报错截2、测试程序主要功能截图
  • [二次开发] 【mrs-flink产品】【调试flink-kafka程序】NoSuchMethodError
    【功能模块】mrs-flink【操作步骤&问题现象】1、使用per-job模式上传样例代码到集群上运行,执行FemaleInfoCollectionFromKafka这个样例;2、flink任务创建成功,但是任务执行时报错,显示KafkaConsumer的一个方法不存在。【截图信息】报错信息:客户端lib里已补充相关jar包,不知道具体缺少哪个?【日志信息】(可选,上传日志内容或者附件)org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)    at akka.actor.ActorCell.invoke(ActorCell.scala:561)    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)    at akka.dispatch.Mailbox.run(Mailbox.scala:225)    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.(KafkaFetcher.java:109)    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.createFetcher(FlinkKafkaConsumer.java:237)    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
  • [Flink] 【mrs产品】【flink功能】flink集群启动时初始化cluster entrypoint失败
    【功能模块】mrs的flink【操作步骤&问题现象】1、执行启动flink集群脚本:./bin/yarn-session.sh -t ssl/ -n 22、显示报错:org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster    at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:380)    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:551)    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$7(FlinkYarnSessionCli.java:789)    at java.security.AccessController.doPrivileged(Native Method)    at javax.security.auth.Subject.doAs(Subject.java:422)    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761)    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:789)Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. Diagnostics from YARN: Application application_1624785727989_0098 failed 2 times in previous 10000 milliseconds due to AM Container for appattempt_1624785727989_0098_000002 exited with  exitCode: 1Failing this attempt.Diagnostics: [2021-07-09 15:44:51.047]Exception from container-launch.Container id: container_e02_1624785727989_0098_02_000001Exit code: 1Exception message: Launch container failedShell output: main : command provided 13.后台日志显示初始化cluster entrypoint失败: Could not start cluster entrypoint YarnSessionClusterEntrypoint. | org.apache.flink.runtime.entrypoint.ClusterEntrypoint (ClusterEntrypoint.java:520) org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnSessionClusterEntrypoint.【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [二次开发] 【MRS产品】【flink功能】运行样例demo时报配置错误
    pom文件导入的华为依赖包,运行flink java的样例demo或者自己写的wordcount使用本地环境的demo时均报如下错误:Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:/E:/tools/.m2/repository/org/apache/flink/flink-runtime_2.11/1.10.0-hw-ei-302002/flink-runtime_2.11-1.10.0-hw-ei-302002.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer}自己单独新建工程使用官方flink依赖跑wordcount样例程序正常。
总条数:107 到第
上滑加载中