• [问题求助] 【DLI】【FLINK】Flink作业反压了怎么办
    【功能模块】DLI Flink【操作步骤&问题现象】使用DLI Flink发现页面上有反压,数据消费变慢了【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] 【DLI】【Flink】如何使用opensource语法
    【功能模块】Flink【操作步骤&问题现象】你好,我看资料里有opensource语法的资料,但直接使用会报语法错误,请问如何才能使用。【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] 使用DLI Flink Jar作业看不到日志输出
    【功能模块】DLI Flink【操作步骤&问题现象】1、使用DLI Flink Jar作业看不到日志输出2、从转储的日志来看,日志全输出到.err日志文件中,没有输出到.log文件中【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [技术讨论] 鲲鹏-flink-1.11.3移植
    编译环境虚拟机服务器 KVM Virtual Machine 虚拟机配置:aarch64架构 、 32G内存 、 16核 、300G磁盘空间 虚拟机操作系统: Linux version 4.19.90-2009.3.0.0045.up1.uel20.aarch64 (abuild@armbuild-02) (gcc version 7.3.0 (GCC)) #1 SMP Sun Oct 11 16:12:59 UTC 2020 软件版本:flink-1.11.3 版本获取方式https://github.com/apache/flink/archive/release-1.11.3.tar.gz 参考编译指南和问题 编译完成Jar包中还有x86 so依赖Flink移植指南: https://support.huaweicloud.com/prtg-apache-kunpengbds/kunpengbds_02_0011.html flink 1.11 后就不再与 hadoop 版本绑定,不需要编译特定 hadoop 版本 的 shaded 包 flink 1.11.0 release notes : https://flink.apache.org/news/2020/07/06/release-1.11.0.html  执行如下编译命令: mvn clean package -DskipTests Flink按照迁移指南编译完还含有x86 so依赖,通过checkSo结果如下:  一些问题可能导致下载的jar包不是从鲲鹏仓库中下载的。这几个包分别对应的jar是:scala-compiler-2.12.7.jar flink-shaded-netty-4.1.39.Final-11.0.jarscala-compiler-2.11.12.jar 问题处理方案: 手动从华为鲲鹏仓库中下载这些jar包,然后替换本地mvn仓库中jar ,下载链接分别如下:https://mirrors.huaweicloud.com/kunpeng/maven/org/apache/flink/flink-shaded-netty/4.1.39.Final-11.0/flink-shaded-netty-4.1.39.Final-11.0.jarhttps://mirrors.huaweicloud.com/kunpeng/maven/com/data-artisans/frocksdbjni/5.17.2-artisans-2.0/frocksdbjni-5.17.2-artisans-2.0.jar https://mirrors.huaweicloud.com/kunpeng/maven/org/scala-lang/scala-compiler/2.11.12/scala-compiler-2.11.12.jar 替换完后从新编译。 编译后通过迁移分析工具分析Flink的jar包已经不包含x86的依赖。 对于link-1.11.3-bin中还有x86的依赖  从华为鲲鹏库中下载如下包替换mvn本地参考中的包:https://mirrors.huaweicloud.com/kunpeng/maven/org/apache/flink/flink-shaded-netty-tcnative-dynamic/2.0.25.Final-11.0/flink-shaded-netty-tcnative-dynamic-2.0.25.Final-11.0.jar 对于flink-python 中的x86依赖需要进行编译: 安装gradle-5.4.1wget https://services.gradle.org/distributions/gradle-5.4.1-bin.zipunzip gradle-5.4.1-bin.zipexport PATH=`pwd`/gradle-5.4.1/bin:$PATH移植beam-vendor-grpcwget https://github.com/apache/beam/archive/v2.18.0.tar.gz -O beam-v2.18.0.tar.gztar -zxf beam8v2.18.0.tar.gzcd beam-2.19.0/vendor/grpc-1_21_0/ vim build.gradle在文件vendorJava所在行之前加入以下内容:repositories {     maven { url "https://mirrors.huaweicloud.com/kunpeng/maven/" }    mavenLocal()    maven { url "https://mirrors.huaweicloud.com/repository/maven/"} } 执行编译gradle build编译好的beam-vendor-grpc-1_21_0-0.1.jar在build/libs目录下安装到本地maven仓库直接替换mvn仓库中的jar包,或者通过mvn install 安装:mvn install:install-file -DgroupId=org.apache.beam -DartifactId=beam-vendor-grpc-1_21_0 -Dversion=0.1 -Dpackaging=jar -DgeneratePom=true -Dfile=build/libs/beam-vendor-grpc-1_21_0-0.1.jar NodeJs问题Runtime web 包是编译时,因为要在线下载一些包,基本都在国外,下载慢或者根本访问不了。1) nodejs 包下载不下来Downloading https://nodejs.org/dist/v10.9.0/node-v10.9.0-linux-x64.tar.gz to /root/.m2/repository/com/github/eirslett/node/10.9.0/node-10.9.0-linux-x64.tar.gz 直接访问 URL 下载,放到 mvn 目录中:   2) npm 执行不动 Running 'npm ci --cache-max=0 --no-save' in /home/venn/git/flink-1.12.0/flink-runtime-web/web-dashboard 直接安装 npm yum  install -y npm  nodejs npm config set registry http://registry.npm.taobao.org
  • [运维管理] HD651的flink如何收集应用运行日志到kafka
    【求助】 1.任务提交到集群是使用的配置是集群上的flink/conf/下的日志配置文件 2.如何在任务提交时指定自定义的配置文件?
  • [二次开发] 【Flink产品】【Flink sink ES】flink 连接器 sink数据到ES
    【功能模块】Flink DataStream 数据sink到带有用户名密码的验证的ES集群中报错。【操作步骤&问题现象】1、使用开源的flink-connector-elasticsearch6_2.11 的jar包2、SunCertPathBuilderException: unable to find valid certification path to requested target3、在es客户端那 提示找不到有效的证书路径  我觉得应该是协议错误 我们集群使用curl 方式操作es 需要指定  --tlsv1.2【求助】1.是不是不能使用开源的flink 连接器 sink 到es2.有没有FI集群sink 到ES的样例demo 可以参考一下3.或者帮忙提醒一下从哪里设置tlsv1.2协议  【日志信息】(可选,上传日志内容或者附件)Caused by: org.elasticsearch.client.ResponseException: method [HEAD], host [https://10.28.132.195:24100], URI [/], status line [HTTP/1.1 403 Forbidden]
  • [指导教程] DLI Flink作业生产环境推荐配置指导
    ### 1. 首先客户需要在消息通知服务(SMN)中提前创建一个【主题】,并将客户指定的邮箱或者手机号添加到主题订阅中。这时候指定的邮箱或者手机会收到请求订阅的通知,点击链接确认订阅即可。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/23/115309bfam8vp3yyv4r5kw.png) ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/23/115321hwisgpf1rr7n760d.png) ### 2. 创建Flink SQL作业,编写作业SQL完成后,配置【运行参数】。 #### 2.1 配置作业的【CU数量】、【管理单元】与【最大并行数】,依据如下公式: ```sql CU数量 = 管理单元 + (算子总并行数 / 单TM Slot数) * 单TM所占CU数 ``` 例如:CU数量为9CU,管理单元为1CU,最大并行数为16,则计算单元为8CU。 如果不手动配置TaskManager资源,则单TM所占CU数默认为1,单TM slot数显示值为0,实际值依据上述公式计算结果为 16÷(9-1)=2。 #### 2.2 勾选【保存作业日志】按钮,选择一个OBS桶。如该桶未授权,需点击【立即授权】。此项配置可以在作业异常失败后将作业日志保存到客户的OBS桶下,方便客户定位故障原因。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/23/115335hityehyflhl0rmng.png) #### 2.3 勾选【作业异常告警】选项,选择前述步骤创建的【SMN主题】。此项配置可以在作业异常情况下,向客户指定邮箱或者手机发送消息通知,方便客户及时感知异常。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/23/115348e3rgqch6kip4hcxu.png) #### 2.4 勾选【开启Checkpoint】选项,依据自身业务情况调整Checkpoint间隔和模式。Flink checkpoint机制可以保证Flink任务突然失败时,能够从最近的Checkpoint进行状态恢复重启。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/23/1153599cjlu8pkb3uprwjp.png) 说明: - 此处的Checkpoint间隔为两次触发Checkpoint的间隔,执行checkpoint会影响实时计算性能,配置间隔时间需权衡对业务的性能影响及恢复时长,最好大于Checkpoint的完成时间,建议设置为5min。 - Exactly Once模式保证每条数据只被消费一次,At Least Once模式每条数据至少被消费一次,请依据业务情况选择。 #### 2.5 勾选【异常自动恢复】与【从Checkpoint恢复】,根据自身业务情况选择重试次数。 #### 2.6 配置【脏数据策略】,依据自身的业务逻辑和数据特征选择忽略、抛出异常或者保存脏数据。 选择【运行队列】,提交并运行作业。 Flink Jar作业可靠性配置与SQL作业相同,不再另行说明。 ### 3. 登录【云监控服务CES】,在【云服务监控】列表中找到【数据湖探索】服务,在Flink作业中找到目标作业,点击【创建告警规则】。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/23/1154119e7ondsbp7ozbs1r.png) ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/23/115418ndcpvnfoukbnvvcd.png) ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/23/115426a7grpccpa7k7cflq.png) DLI 为Flink作业提供了丰富的监控指标,客户可以依据自身需求使用不同的监控指标定义告警规则,实现更细粒度的作业监控。 监控指标说明见:https://support.huaweicloud.com/usermanual-dli/dli_01_0445.html
  • [二次开发] 【Flink产品】【flink功能】样例代码flink生产数据到kafka打jar包执行报错
    【功能模块】版本信息BASE    6.5.1    ( 补丁  6.5.1.7 )Flink    6.5.1    ( 补丁  6.5.1.7 )Porter    6.5.1    ( 补丁  6.5.1.7 )Spark2x    6.5.1    ( 补丁  6.5.1.7 )HD    6.5.1    ( 补丁  6.5.1.7 )Elasticsearch    6.5.1    ( 补丁  6.5.1.7 )在flink客户端下执行  命令如下bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/flinkStreamKafka.jar --topic Auto_Agreement --bootstrap.servers ip:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.combin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/tmpdata/sou1yu/flinkStreamKafka.jar --topic Auto_Agreement --bootstrap.servers ip:21005【操作步骤&问题现象】1、正常导入项目 导入jar包2、source 过执行环境  kinit 过用户 3.在flink客户端下执行  命令如下 都不行(下面命令的ip也正确的 下面只是为了保护一下ip所以处理了一下)bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/flinkStreamKafka.jar --topic Auto_Agreement --bootstrap.servers ip:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.combin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/tmpdata/sou1yu/flinkStreamKafka.jar --topic Auto_Agreement --bootstrap.servers ip:21005【截图信息】【日志信息】(可选,上传日志内容或者附件)SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/opt/client/Flink/flink/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/client/Flink/flink/lib/flink-dist_2.11-1.7.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/client/HDFS/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]Starting execution of programuse command as: ./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21005./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei******************************************************************************************<topic> is the kafka topic name<bootstrap.servers> is the ip:port list of brokers******************************************************************************************------------------------------------------------------------ The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 1a0dc0071081418158fbb38a8326a920)        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:490)        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)        at com.huawei.bigdata.flink.examples.WriteIntoKafka.main(WriteIntoKafka.java:32)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:430)        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:814)        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:288)        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1051)        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1127)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:422)        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1127)Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)        at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)        at java.util.concurrent.FutureTask.run(FutureTask.java:266)        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)        at java.lang.Thread.run(Thread.java:748)Caused by: java.util.concurrent.TimeoutException        ... 8 more
  • [二次开发] flink写出数据到hbase中,写不进去数据。
    【功能模块】【操作步骤&问题现象】1、flink消费kafka数据输出到hbase中,写不进hbase中,报错信息如下:com.huawei.bigdata.flink.examples.outputs.HbaseSink2 (HbaseSink2.java:132) java.io.InterruptedIOException: Giving up trying to location region in meta: thread is interrupted.    at org.apache.hadoop.hbase.client.ConnectionImplementation.locateRegionInMeta(ConnectionImplementation.java:973)    at org.apache.hadoop.hbase.client.ConnectionImplementation.locateRegion(ConnectionImplementation.java:790)    at org.apache.hadoop.hbase.client.HRegionLocator.getRegionLocation(HRegionLocator.java:64)    at org.apache.hadoop.hbase.client.RegionLocator.getRegionLocation(RegionLocator.java:58)    at org.apache.hadoop.hbase.client.RegionLocator.getRegionLocation(RegionLocator.java:47)    at org.apache.hadoop.hbase.client.RegionServerCallable.prepare(RegionServerCallable.java:223)    at org.apache.hadoop.hbase.client.RpcRetryingCallerImpl.callWithRetries(RpcRetryingCallerImpl.java:107)    at org.apache.hadoop.hbase.client.HTable.put(HTable.java:540)    at com.huawei.bigdata.flink.examples.outputs.HbaseSink2.invoke(HbaseSink2.java:128)    at com.huawei.bigdata.flink.examples.outputs.HbaseSink2.invoke(HbaseSink2.java:29)    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:734)    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:712)    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [二次开发] flink消费kafka数据输出到hbase中,报.ClassNotFoundException
    【功能模块】【操作步骤&问题现象】1、我在原有flink连接kafka的example中写了一个sinkHbase的代码(说明:运行原来flink连接kafka样例代码没有任何问题),之后进行测试,我将所需要的jar包都上传到flink/lib下面了,然后运行代码报:Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration,具体报错信息如下所示:2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] 【DLI】【Flink作业】对车辆进出园场景 Flink作业“dli_vehicle_all”的疑问
    【功能模块】【DLI】【Flink作业】【问题】 问题1:    在LiveData向Topic “T_IO_PARKING_FREE_SPACE” 发送消息,此topic作为“dli_vehicle_all”作业的其中一个数据源,发送消息内容如下:    Flink作业数据源:    Flink作业目标表:        到最后,到DAYU中查询这张目标表,确实新增了一条记录,但是所有字段值都为空。    问题:MQS到Flink作业的数据源 需要配置消息与字段的映射关系吗?否则怎么解释这个现象:发送消息后,确实在目标表实时新增了记录,但值都为空。问题2: DLI中有能看到完整SQL日志的地方吗?(能看到参数值的)【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] DAYU Flink 任务执行时写入GUASS DB 200报错
    运行flink任务时,日志显示 内部连接失败,如下图所示:麻烦帮忙看下是什么原因导致的。
  • [运维宝典] 一个Flink作业反压的问题分析
    问题场景客户作业场景如下图所示,从DMS kafka通过DLI Flink将业务数据实时清洗存储到DWS。其中,DMS Kafka 目标Topic 6个分区,DLI Flink作业配置taskmanager数量为12,并发数为1。问题现象客户在DLI服务共有三个相同规格的队列,该作业在其中003号队列上运行正常,在001和002号队列上都存在严重的反压导致数据处理缓慢。作业列表显示如下图,可以看到Sink反压状态正常,Souce和Map反压状态为HIGH。问题分析根据反压情况分析,该作业的性能瓶颈在Sink,由于Sink处理数据缓慢导致上游反压严重。该作业所定义的Sink类型为DwsCsvSink,该Sink的工作原理如下图所示:Sink将结果数据分片写入到OBS,每一分片写入完成后,调用DWS insert select sql将obs路径下该分片数据load到dws。因此性能瓶颈出现在分片数据写入到OBS这一步。但问题来了,写同一个桶,为什么在不同队列上的表现不一致?为此,我们排查了各个队列的CPU、内存和网络带宽情况,结果显示负载都很低。这种情况下,只能继续分析FlinkUI和TaskManager日志。数据倾斜?然后我们在FlinkUI任务情况页面,看到如下情况:Map阶段的12个TaskManager并不是所有反压都很严重,而是只有一半是HIGH状态,难道有数据倾斜导致分配到不同TaskManager的数据不均匀?然后看Source subTask详情,发现有两个TaskManager读取的数据量是其他几个的几十倍,这说明源端Kafka分区流入的数据量不均匀。难道就是这么简单的问题?很不幸并不是,通过进一步分析源端数据我们发现Kafka 6个分区数据流入记录数相差并不大。这两个Task只是多消费了部分存量数据,接收数据增长的速度各TaskManager保持一致。时钟同步进一步分析TaskManager日志,我们发现单个分片数据写入OBS竟然耗费3min以上。这非常异常,要知道单个分片数据才500000条而已。进一步通过分析代码发现如下问题:在写OBS数据时,其中一个taskmanager写分片目录后获取该目录的最后修改时间,作为处理该分片的开始时间,该时间为OBS服务端的时间。后续其他taskmanager向该分片目录写数据时,会获取本地时间与分片开始时间对比,间隔大于所规定的转储周期才会写分片数据。如果集群节点NTP时间与OBS服务端不同步,本地时间晚于OBS服务端时间,则会造成写入OBS等待。后续排查集群节点,发现6个节点中一半时间同步有问题,这也和只有一半taskmanager反压严重的现象相对应。问题修复在集群节点上执行如下命令,强制时间同步。systemctl stop ntp ntpdate ntp.myhuaweicloud.com systemctl start ntp systemctl status ntpdateNTP同步后,作业反压很快消失,故障恢复。
  • [二次开发] 使用flink连接kafka生产消费数据,运行FlinkKafkaJavaExample.jar代码,消费不了数据,报错
    【功能模块】【操作步骤&问题现象】1、运行样例代码FlinkKafkaJavaExample.jar代码,生产者可以生产数据,但是消费者不能消费数据,运行消费者命令会报错,如附件所示:2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] flink实时计算中,如果部分数据格式转换异常,怎么处理
    flink实时计算中,如果部分数据格式转换异常,怎么处理
总条数:123 到第
上滑加载中