• [赋能学习] Hudi通过DeltaStreamer消费Kafka写入Hive样例
    场景:通过Hudi消费kafka数据写入Hive DeltaStreamer工具使用参考 https://hudi.apache.org/cn/docs/writing_data.html 验证环境: FusionInsight HD 8.1.0 安全集群,安装有 Spark2X、Hive、Kafka服务 说明:kafka服务端配置 allow.everyone.if.no.acl.found 为true 步骤: 1、准备集群(略) 2、安装客户端和准备测试用户 参考产品文档,默认安装客户端路径 /opt/client 测试用例通过manager页面创建,例如用户名developuser密码Haosuwei123 3、将附件中的 delta-stream-example-0.7.0.jar 上传到客户端节点/opt/haosuwei/hudi-demo/目录下,如果修改目录,注意修改下面提交命令中的路径信息 4、ssh登录客户端节点,执行安全认证 ``` source /opt/client/bigdata_env echo Haosuwei123 | kinit developuser ``` 5、确认HDFS上没有路径/tmp/huditest,有则删除 ``` hdfs dfs -rm -r /tmp/huditest ``` 6、创建kafka的topic: ``` kafka-topics.sh --create --topic delta_demo --bootstrap-server 8.5.136.1:21005 ``` 7、准备DeltaStreamer使用的配置文件kafka-source.properties,其内容参考如下,注意修改kafka的broker地址,保存到与第3步相同的路径中 ``` // hudi配置 hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=y,ym,ymd hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator hoodie.datasource.write.hive_style_partitioning=true hoodie.delete.shuffle.parallelism=10 hoodie.upsert.shuffle.parallelism=10 hoodie.bulkinsert.shuffle.parallelism=10 hoodie.insert.shuffle.parallelism=10 hoodie.finalize.write.parallelism=10 hoodie.cleaner.parallelism=10 hoodie.datasource.write.precombine.field=id hoodie.base.path = /tmp/huditest/delta_demo hoodie.timeline.layout.version = 1 hoodie.datasource.write.payload.class=com.huawei.hudi.testcz.MyOverwriteNonDefaultsWithLatestAvroPayload hoodie.compaction.payload.class=com.huawei.hudi.testcz.MyOverwriteNonDefaultsWithLatestAvroPayload // hive config hoodie.datasource.hive_sync.table=delta_demo hoodie.datasource.hive_sync.partition_fields=y,ym,ymd hoodie.datasource.hive_sync.assume_date_partitioning=false hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.use_jdbc=false // Kafka Source topic hoodie.deltastreamer.source.kafka.topic=delta_demo // checkpoint hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/huditest/checkpoint/ // Kafka props bootstrap.servers=8.5.136.1:21005 auto.offset.reset=latest group.id=group-1 offset.rang.limit=10000 ``` 8、提交spark任务 开发时以yarn-client模式提交,提交命令参考如下 ``` source /opt/client/bigdata_env kinit -k -t user.keytab developuser spark-submit --master yarn-client \ --jars /opt/client/Hudi/hudi/conf,/opt/client/Hudi/hudi/lib/*,/opt/haosuwei/hudi-demo/delta-stream-example-0.7.0.jar \ --driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/haosuwei/hudi-demo/delta-stream-example-0.7.0.jar \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ spark-internal --props file:///opt/haosuwei/hudi-demo/kafka-source.properties \ --target-base-path /tmp/huditest/delta_demo \ --table-type COPY_ON_WRITE \ --target-table delta_demo \ --source-ordering-field id \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --schemaprovider-class com.huawei.hudi.deltastream.DataSchemaProviderExample \ --transformer-class com.huawei.hudi.deltastream.TransformerExample \ --enable-hive-sync --continuous ``` 9、插入数据验证: 通过kafka命令行,往topic中写入一条数据,其中id为2时其他字段为2222 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/21/184535ytpfixzyuycuxwcd.png) 查看hive中自动创建的表 delta_demo ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/21/184559terj6gfjw9gh3ocs.png) 10、更新验证 通过kafka命令行,往topic中写入一条数据,其中id为2时其他字段为xxxx ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/21/18470182n1hygk6wabtwlm.png) 再次查看hive中表delta_demo中数据已被更新 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202104/21/184730wh9adnm7bapzmom5.png) 【其他】 生产环境建议以 yarn cluster模式提交spark任务,相关的提交方式修改如下: 1. 将 kafka-source.properties 上传到 HDFS,例如放到 /tmp目录 ``` hdfs dfs -put kafka-source.properties /tmp ``` 2. 修改提交spark任务的命令 ``` source /opt/client/bigdata_env source /opt/client/Hudi/component_env kinit -k -t user.keytab developuser spark-submit --master yarn-client \ --jars /opt/client/Hudi/hudi/conf,/opt/client/Hudi/hudi/lib/*,/opt/haosuwei/hudi-demo/delta-stream-example-0.7.0.jar \ --driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/haosuwei/hudi-demo/delta-stream-example-0.7.0.jar \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ spark-internal --props hdfs://hacluster/tmp/kafka-source.properties \ --target-base-path /tmp/huditest/delta_demo \ --table-type COPY_ON_WRITE \ --target-table delta_demo \ --source-ordering-field id \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --schemaprovider-class com.huawei.hudi.deltastream.DataSchemaProviderExample \ --transformer-class com.huawei.hudi.deltastream.TransformerExample \ --enable-hive-sync --continuous ```
  • [二次开发] 【Flink产品】【flink功能】样例代码flink生产数据到kafka打jar包执行报错
    【功能模块】版本信息BASE    6.5.1    ( 补丁  6.5.1.7 )Flink    6.5.1    ( 补丁  6.5.1.7 )Porter    6.5.1    ( 补丁  6.5.1.7 )Spark2x    6.5.1    ( 补丁  6.5.1.7 )HD    6.5.1    ( 补丁  6.5.1.7 )Elasticsearch    6.5.1    ( 补丁  6.5.1.7 )在flink客户端下执行  命令如下bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/flinkStreamKafka.jar --topic Auto_Agreement --bootstrap.servers ip:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.combin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/tmpdata/sou1yu/flinkStreamKafka.jar --topic Auto_Agreement --bootstrap.servers ip:21005【操作步骤&问题现象】1、正常导入项目 导入jar包2、source 过执行环境  kinit 过用户 3.在flink客户端下执行  命令如下 都不行(下面命令的ip也正确的 下面只是为了保护一下ip所以处理了一下)bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/flinkStreamKafka.jar --topic Auto_Agreement --bootstrap.servers ip:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.combin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/tmpdata/sou1yu/flinkStreamKafka.jar --topic Auto_Agreement --bootstrap.servers ip:21005【截图信息】【日志信息】(可选,上传日志内容或者附件)SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/opt/client/Flink/flink/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/client/Flink/flink/lib/flink-dist_2.11-1.7.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/client/HDFS/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]Starting execution of programuse command as: ./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21005./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei******************************************************************************************<topic> is the kafka topic name<bootstrap.servers> is the ip:port list of brokers******************************************************************************************------------------------------------------------------------ The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 1a0dc0071081418158fbb38a8326a920)        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:490)        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)        at com.huawei.bigdata.flink.examples.WriteIntoKafka.main(WriteIntoKafka.java:32)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:430)        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:814)        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:288)        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1051)        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1127)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:422)        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1127)Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)        at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)        at java.util.concurrent.FutureTask.run(FutureTask.java:266)        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)        at java.lang.Thread.run(Thread.java:748)Caused by: java.util.concurrent.TimeoutException        ... 8 more
  • [基础组件] 业务账号无法生产和消费kafka消息
    【功能模块】注册的kafka组业务账号,授权后仍然无法生产和消费kafka中topic消息【操作步骤&问题现象】1、在集群Manager中注册kafka组用户kafka1。2、创建topic1       bin/kafka-topics.sh --zookeeper $zk--create --topic topic1 --partitions 2 --replication-factor 23,授权      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=$zk--add --allow-principal User:kafka1 --producer --topic topic14,以kafka1的身份向Topic topic1生产数据      cat NOTICE | bin/kafka-console-producer.sh --broker-list $bs--topic topic1 --producer.config config/producer.properties --request-required-acks 1【截图信息】查看topic1 权限生产消息【日志信息】(可选,上传日志内容或者附件)
  • [二次开发] flink消费kafka数据输出到hbase中,报.ClassNotFoundException
    【功能模块】【操作步骤&问题现象】1、我在原有flink连接kafka的example中写了一个sinkHbase的代码(说明:运行原来flink连接kafka样例代码没有任何问题),之后进行测试,我将所需要的jar包都上传到flink/lib下面了,然后运行代码报:Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration,具体报错信息如下所示:2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [维护宝典] Kafka CPU使用率高排查思路及解决方案
    一、          CPU使用率高的一般排查思路1) Top命令排查CPU使用率高的进程2) su - omm切换到omm用户,jstack -l 进程pid>>文件路径3) top -H -p 进程pid 查看CPU使用率高4) “printf "%x\n" 线程号”将上一步中CPU高的线程号转换成16进制5) 在jstack文件中搜索上一步中的16进制信息查看具体的线程二、          Kafka CPU使用率高的常见原因及解决方案1.     集群规划不合理a)    命令grep -c processor /proc/cpuinfo 查看CPU核数,df -h命令查看Kafka磁盘挂载情况,建议每台机器最大挂盘数量 <= processor / 2。挂载过多磁盘也可能导致CPU繁忙。b)    Kafka配置中查看num.io.threads、num.network.threads、num.replica.fetchers配置,num.io.threads和num.network.threads配置一般建议磁盘个数的倍数,但不能超过CPU核数。num.replica.fetchers配置一般建议不超过5,同步线程数过多也会造成CPU繁忙。c)    分区设置不合理:分区一般设置为节点个数的倍数,如果分区设置不合理,所有的生产和消费都集中在某个节点上,也可能导致CPU高。2.     操作系统句柄数过高a)    查看操作系统句柄数:cat /var/log/osinfo/statistics/file-nr.txtb)    统计各进程打开句柄数:lsof -n|awk '{print $2}'|sort|uniq -c|sort -nrc)    查看进程使用句柄数:lsof -p 进程pid | wc -l如果句柄数高,排查使用句柄数高的进程是否有句柄泄漏;排查操作系统日志,查看是否存在TCP内存溢出等。3.     根据CPU使用率高的线程具体排查a)    使用C++客户端,版本不兼容,kafka-request.log日志中大量获取全部topic元数据的信息:Jstack信息中查找CPU高的线程,线程都在获取元数据信息:Request日志开debug后,大量获取全部Topic元数据信息,且耗时很长。CPU长时间频繁处理元数据请求导致CPU升高。b)    使用Logstash版本过低,与Kafka高版本服务端不兼容:      Kafka服务端对部分topic加权限后,21005端口访问未加权限的topic,kafka-authorizer.log日志中大量请求Deny的日志           CPU频繁处理deny请求导致CPU异常升高。c)    Sssd异常使用id -Gn名称返回结果异常为sssd服务有问题kafka-authorizer.log日志为大量打印某用户不属于kafka组或者kafkaadmin组服务器的kafka无法正常鉴权,导致用户频繁访问此服务器,cpu异常升高
  • [维护宝典] Kafka过期数据未老化原因及解决方案
    一、          针对单个Topic过期时间的配置未设置成功查看Kafka中某个Topic过期时间配置是否成功的方式如下:1) Kafka客户端执行kafka-topics.sh --describe --zookeeper ZK业务IP:24002/kafka --topic TopicName  如果topic的describe信息中有“retention.ms”信息,则设置成功。2)651X版本还可以通过FusionInsight Manager页面“集群->Kafka->KafkaTopic监控”中Topic配置查看二、          客户端版本低导致设置未生效确认执行配置命令的客户端是否与Kafka服务端版本一致,如果不一致,下载最新的客户端重新执行修改配置命令。三、          节点异常或节点磁盘下线或数据目录异常1.     节点异常FusionInsight Manager页面“集群->Kafka->实例”查看各个实例状态是否是良好。2.     磁盘下线FusionInsight Manager有没有“数据目录状态异常”的告警。Topic副本所在节点server.log日志中搜“offline”关键字和“checkpoint file”关键字查看磁盘是否下线或因checkpoint文件问题没有上线。3.     数据目录权限异常节点上Kafka数据目录(一般是“/srv/BigData/kafka/dataX/kafka-logs”)目录权限是否正常。四、          BufferOverflowExceptionKafka server.log日志中删除时出现“Uncaught exception in scheduled task kafka-log-retention”和“java.nio.BufferOverflowException”关键字。解决办法是升级到6518及之后版本。五、          Kafka数据Timestamp异常1) 把未过期删除的分区中最早的segment的.log、.timeindex和.index文件拷贝到Kafka客户端所在节点,例如“/opt/client/test”路径2) 执行kafka-run-class.sh kafka.tools.DumpLogSegments --files .timeindex文件路径 --print-data-log如果最后一条timestamp时间戳值与当前值的差值小于设置的retention.ms的值,Kafka不会删除此segment及之后所有segment的数据。例如,执行结果如下所示:最后一条timestamp时间远超过当前时间,此分区此segment及之后的数据Kafka不会基于时间自动删除。如果想要删除可以手动停节点删除,也可以通过修改基于日志大小来删除。Kafka基于时间删除时,查找过期的日志分段文件不是根据最近修改时间(因为它可以被有意或者无意修改),而是根据日志分段中的最大时间戳largestTimestamp来计算的。要获取日志分段中的最大时间戳,首先查询该日志分段对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳值大于0,则取其值,否则设置为最近修改时间。删除时先从跳表中移除待删除的日志分段,然后加上delete后缀,最后交由delete-file延时任务来删除这些文件,延时任务通过file.delete.delay.ms配置,默认60000(1分钟)。日志删除任务周测检配置是Log.retention.check.interval.ms,默认300000(5分钟)。
  • [赋能学习] FusionInsight Kafka节点退服后未完全同步分区迁移方案
    一、          收集Kafka实例和分区信息1.       执行kafka-broker-info.sh --zookeeper zk业务IP:24002/kafka 命令获取Kafka实例信息2.       执行kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka --unavailable-partitions 命令获取Kafka未同步分区3.       对比Replicas和ISR列表中的信息,确定哪些节点的分区未同步,确定未同步的节点正常与否。例如,如下案例中,对比Replicas和ISR列表中的信息发现3节点没有同步,而正常的kafka实例中没有3。   二、          准备迁移方案根据步骤一中的未完全同步的分区即需要迁移的分区,把需要迁移的所有分区写入一个新的json文件,文件中replicas中的信息需要根据步骤一中第2步查询结果中的Replicas把退服节点的brokerid改为正常节点的brokerid(即步骤一中第1步查询到的brokerid),其他不变,格式如下所示:{"version":1,"partitions":[{"topic":"test1","partition":0"replicas":[1,2]},{"topic":"test1","partition":1"replicas":[1,2]},{"topic":"test1","partition":5,"replicas":[2,1]},{"topic":"test1","partition":3,"replicas":[2,1]}]}如果涉及的分区比较多,也可按如下方式生成迁移的json文件。1.       根据步骤一中未完全同步的分区中的topic信息编写迁移topic的json文件,例如json文件名为generate.json,格式如下:{"topics":[{"topic": "test1"},{"topic": "test2"}], "version":1}2.       执行命令kafka-reassign-partitions.sh --zookeeper zk业务IP:24002/kafka --topics-to-move-json-file generate.json文件路径  --broker-list 1,2 --generate 生成迁移方案,其中,broker-list中对应参数是所有正常的broker实例的id,即步骤一中1获取的所有brokerid,不同brokerid间用逗号隔开。3.       把上一步得到的“Proposed partition reassignment configuration”的json文件写入迁移的json文件中,例如move.json,此json文件中只保留需要迁移的topic和partition的信息。如下图所示,红色方框内即迁移的json文件信息。三、          执行迁移执行kafka-reassign-partitions.sh --zookeeper zk业务IP:24002/kafka  --reassignment-json-file move.json文件路径 --execute 执行迁移四、          查看迁移进度执行kafka-reassign-partitions.sh --zookeeper zk业务IP:24002/kafka  --reassignment-json-file move.json文件路径 --verify 查看迁移进度,当所有partition都提示completed successfully时说明迁移完成。五、          查看是否所有分区都同步执行kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka --unavailable-partitions 命令获取Kafka未同步分区,如果结果为空,说明所有分区都同步,操作完成;如果有leader为-1,按步骤六执行。六、          Leader为-1的解决办法1.      Kafka配置中查看unclean.leader.election.enable配置,如果配置为true,只执行下面步骤2即可;如果配置为false,执行下面步骤3、4、5。2.      切controller:进zookeeper客户端,执行 zkCli.sh -server zk业务IP:24002/kafka进入zk,执行deleteall /controller 切controller。3.      Kafka客户端修改参数配置:kafka-topics.sh --alter --topic topicName --zookeeper zk业务IP:24002/kafka --config unclean.leader.election.enable=true,其中,topicName是leader为-1的topic。4.      按步骤2切controller。5.      查看leader为-1的分区leader是否正常,正常后在kafka客户端执行kafka-topics.sh --alter --topic topicName --zookeeper 100.112.22.233:24002/kafka --delete-config unclean.leader.election.enable。
  • [二次开发] 使用flink连接kafka生产消费数据,运行FlinkKafkaJavaExample.jar代码,消费不了数据,报错
    【功能模块】【操作步骤&问题现象】1、运行样例代码FlinkKafkaJavaExample.jar代码,生产者可以生产数据,但是消费者不能消费数据,运行消费者命令会报错,如附件所示:2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [运维管理] HD 6.5.1.3 版本集群kafka 那个参数控制安全模式和非安全模式的?
    【操作步骤&问题现象】HD 6.5.1.3 版本集群kafka 那个参数控制安全模式和非安全模式的?需要临时调整kafka调试。是否有临时设置方法或整体设置方法?
  • [行业动态] 基于大数据构建实时数仓的实践
    前言:数据处理现状,当前基于Hive的离线数据仓库已经非常成熟,数据平台体系也基本上是围绕离线数仓进行建设。但是随着实时计算引擎的不断发展以及业务对于实时报表的产出需求不断膨胀,业界最近几年就一直聚焦并探索于两个相关的热点问题:实时数仓建设和大数据架构的批流一体建设。实时数仓建设:实时数仓1.0        传统意义上我们通常将数据处理分为离线数据处理和实时数据处理。对于实时处理场景,我们一般又可以分为两类,一类诸如监控报警类、大屏展示类场景要求秒级甚至毫秒级;另一类诸如大部分实时报表的需求通常没有非常高的时效性要求,一般分钟级别,比如10分钟甚至30分钟以内都可以接受。        对于第一类实时数据场景来说,业界通常的做法比较简单粗暴,一般也不需要非常仔细地进行数据分层,数据直接通过Flink计算或者聚合之后将结果写入MySQL/ES/HBASE/Druid/Kudu等,直接提供应用查询或者多维分析。如下所示:        而对于后者来说,通常做法会按照数仓结构进行设计,我们称后者这种应用场景为实时数仓,将作为本篇文章讨论的重点。从业界情况来看,当前主流的实时数仓架构基本都是基于Kafka+Flink的架构(为了行文方便,就称为实时数仓1.0)。下图是基于业界各大公司分享的实时数仓架构抽象的一个方案:        这套架构总体依然遵循标准的数仓分层结构,各种数据首先汇聚于ODS数据接入层。再接着经过这些来源明细数据的数据清洗、过滤等操作,完成多来源同类明细数据的融合,形成面向业务主题的DWD数据明细层。在此基础上进行轻度的汇总操作,形成一定程度上方便查询的DWS轻度汇总层(注:这里没有画出DIM维度层,一般选型为Redis/HBase,下文架构图中同样没有画出DIM维度层,在此说明)。最后再面向业务需求,在DWS层基础上进一步对数据进行组织进入ADS数据应用层,业务在数据应用层的基础上支持用户画像、用户报表等业务场景。        基于Kafka+Flink的这套架构方案很好的解决了实时数仓对于时效性的业务诉求,通常延迟可以做到秒级甚至更短。基于上图所示实时数仓架构方案,笔者整理了一个目前业界比较主流的整体数仓架构方案:        上图中上层链路是离线数仓数据流转链路,下层链路是实时数仓数据流转链路,当然实际情况可能是很多公司在实时数仓建设中并没有严格按照数仓分层结构进行分层,与上图稍有不同。        然而基于Kafka+Flink的实时数仓方案有几个非常明显的缺陷:Kafka无法支持海量数据存储。对于海量数据量的业务线来说,Kafka一般只能存储非常短时间的数据,比如最近一周,甚至最近一天;Kafka无法支持高效的OLAP查询。大多数业务都希望能在DWD\DWS层支持即席查询的,但是Kafka无法非常友好地支持这样的需求;无法复用目前已经非常成熟的基于离线数仓的数据血缘、数据质量管理体系。需要重新实现一套数据血缘、数据质量管理体系;Lambad架构维护成本很高。很显然,这种架构下数据存在两份、schema不统一、 数据处理逻辑不统一,整个数仓系统维护成本很高;Kafka不支持update/upsert。目前Kafka仅支持append。实际场景中在DWS轻度汇聚层很多时候是需要更新的,DWD明细层到DWS轻度汇聚层一般会根据时间粒度以及维度进行一定的聚合,用于减少数据量,提升查询性能。假如原始数据是秒级数据,聚合窗口是1分钟,那就有可能产生某些延迟的数据经过时间窗口聚合之后需要更新之前数据的需求。这部分更新需求无法使用Kafka实现。        所以实时数仓发展到现在的架构,一定程度上解决了数据报表时效性问题,但是这样的架构依然存在不少问题,随着技术的发展,相信基于Kafka+Flink的实时数仓架构也会进一步往前发展。那会往哪里发展呢?        大数据架构的批流一体建设。        带着上面的问题我们再来接着聊一聊最近一两年和实时数仓一样很火的另一个概念:批流一体。对于批流一体的理解,笔者发现有很多种解读,比如有些业界前辈认为批和流在开发层面上都统一到相同的SQL上是批流一体,又有些前辈认为在计算引擎层面上批和流可以集成在同一个计算引擎是批流一体,比如Spark/Spark Structured Streaming就算一个在计算引擎层面实现了批流一体的计算框架,与此同时另一个计算引擎Flink,目前在流处理方面已经做了很多的工作而且在业界得到了普遍的认可,但在批处理方面还有一定的路要走。实时数仓2.0        笔者认为无论是业务SQL使用上的统一还是计算引擎上的统一,都是批流一体的一个方面。除此之外,批流一体还有一个最核心的方面,那就是存储层面上的统一。在这个方面业界也有一些走在前面的技术,比如最近一段时间开始流行起来的数据湖三剑客-- delta/hudi/iceberg,就在往这个方向走。存储一旦能够做到统一,上述数据仓库架构就会变成如下模样(以Iceberg数据湖作为统一存储为例),称为实时数仓2.0:         这套架构中无论是流处理还是批处理,数据存储都统一到数据湖Iceberg上。那这么一套架构将存储统一后有什么好处呢?很明显,可以解决Kafka+Flink架构实时数仓存在的前面4个问题:可以解决Kafka存储数据量少的问题。目前所有数据湖基本思路都是基于HDFS之上实现的一个文件管理系统,所以数据体量可以很大。DW层数据依然可以支持OLAP查询。同样数据湖基于HDFS之上实现,只需要当前的OLAP查询引擎做一些适配就可以进行OLAP查询。批流存储都基于Iceberg/HDFS存储之后,就完全可以复用一套相同的数据血缘、数据质量管理体系。Kappa架构相比Lambad架构来说,schema统一,数据处理逻辑统一,用户不再需要维护两份数据。        有的同学说了,这不,你直接解决了前4个问题嘛,还有第5个问题呢?对,第5个问题下文会讲到。        又有的同学会说了,上述架构确实解决了Lambad架构的诸多问题,但是这套架构看起来就像是一条离线处理链路,它是怎么做到报表实时产出呢?确实,上述架构图主要将离线处理链路上的HDFS换成了数据湖Iceberg,就号称可以实现实时数仓,听起来容易让人迷糊。这里的关键就是数据湖Iceberg,它到底有什么魔力?        为了回答这个问题,笔者就上述架构以及数据湖技术本身做一个简单的介绍(接下来也会基于Iceberg出一个专题深入介绍数据湖技术)。上述架构图中有两条数据处理链路,一条是基于Flink的实时数据链路,一条是基于Spark的离线数据链路。通常数据都是直接走实时链路处理,而离线链路则更多的应用于数据修正等非常规场景。这样的架构要成为一个可以落地的实时数仓方案,数据湖Iceberg是需要满足如下几个要求的:支持流式写入-增量拉取。流式写入其实现在基于Flink就可以实现,无非是将checkpoint间隔设置的短一点,比如1分钟,就意味每分钟生成的文件就可以写入到HDFS,这就是流式写入。没错,但是这里有两个问题,第一个问题是小文件很多,但这不是最关键的,第二个问题才是最致命的,就是上游每分钟提交了很多文件到HDFS上,下游消费的Flink是不知道哪些文件是最新提交的,因此下游Flink就不知道应该去消费处理哪些文件。这个问题才是离线数仓做不到实时的最关键原因之一,离线数仓的玩法是说上游将数据全部导入完成了,告诉下游说这波数据全部导完了,你可以消费处理了,这样的话就做不到实时处理。        数据湖就解决了这个问题。实时数据链路处理的时候上游Flink写入的文件进来之后,下游就可以将数据文件一致性地读走。这里强调一致性地读,就是不能多读一个文件也不能少读一个文件。上游这段时间写了多少文件,下游就要读走多少文件。我们称这样的读取叫增量拉取。解决小文件多的问题。数据湖实现了相关合并小文件的接口,Spark/Flink上层引擎可以周期性地调用接口进行小文件合并。支持批量以及流式的Upsert(Delete)功能。批量Upsert/Delete功能主要用于离线数据修正。流式upsert场景上文介绍了,主要是流处理场景下经过窗口时间聚合之后有延迟数据到来的话会有更新的需求。这类需求是需要一个可以支持更新的存储系统的,而离线数仓做更新的话需要全量数据覆盖,这也是离线数仓做不到实时的关键原因之一,数据湖是需要解决掉这个问题的。支持比较完整的OLAP生态。比如支持Hive/Spark/Presto/Impala等OLAP查询引擎,提供高效的多维聚合查询性能。        这里需要备注一点,目前Iceberg部分功能还在开发中。具体技术层面Iceberg是怎么解决上述问题的,请持续关注本号,接下来一篇文章会详细讲解哦。实时数仓3.0        按照批流一体上面的探讨,如果计算引擎做到了批流一体的统一,就可以做到SQL统一、计算统一以及存储统一,这时就迈入实时数仓3.0时代。对于以Spark为核心技术栈的公司来说,实时数仓2.0的到来就意味着3.0的到来,因为在计算引擎层面Spark早已做到批流一体。基于Spark/数据湖的3.0架构如下图:        假如未来Flink在批处理领域成熟到一定程度,基于Flink/数据湖的3.0架构如下图:        上面所介绍的,是笔者认为接下来几年数据仓库发展的一个可能路径。对于业界目前实时数仓的一个发展预估,个人觉得目前业界大多公司都还往实时数仓1.0这个架构上靠;而在接下来1到2年时间随着数据湖技术的成熟,实时数仓2.0架构会成为越来越多公司的选择,其实到了2.0时代之后,业务同学最关心的报表实时性诉求和大数据平台同学最关心的数据存储一份诉求都可以解决;随着计算引擎的成熟,实时数仓3.0可能和实时数仓2.0一起或者略微滞后一些普及。        目前,华为云FusionInsight MRS云原生数据湖,助力客户一个架构可持续演进,构建离线、实时、逻辑三种数据湖。通过LakeHouse范式,缩短数据分析链路和数据冗余,数据分析不出湖,在离线数据湖内实现常见批处理、流处理和全文检索等;通过Hudi实现数据实时增量Upsert入湖,通过ClickHouse使PB级数据实时OLAP毫秒级分析,实现全链路实时数据湖。通过HetuEngine,实现数据虚拟化,提供统一SQL接口访问,可跨湖跨仓跨云协同计算,将传统天级的人工摆渡数据,转变为跨湖跨仓跨云协同计算分钟级,实现逻辑数据湖。基于云原生能力,采用OBS实现大数据存算分离,提供企业级EC,将传统3副本降至1.2,TCO降低60%;采用BMS裸金属更好支持大数据高密集型计算场景,独家SDI技术彻底打通大数据所需的“CPU、内存、磁盘、网络”等资源的最后一环,释放100%数据基础设施算力。转载自过往记忆大数据,作者:大数据平台  https://mp.weixin.qq.com/s/cK6VA7Mnn6F6zdpdiBODPg免责声明:转载文章版权归原作者所有。如涉及作品内容、版权等问题,请及时联系文章编辑!
  • [维护宝典] Kafka修改配置详解
    Kafka修改配置有两个命令:kafka-topics.sh和kafka-configs.sh。kafka-topics.sh主要是修改单个topic的配置,支持修改的配置参数见附录一;kafka-configs.sh支持修改topic、broker、user和client级别的配置,支持修改的配置参数见附录二。一、          kafka-topics.sh1.      kafka-topics.sh命令增加/修改配置命令如下:kafka-topics.sh --alter --topic <topicName> --zookeeper <ZK业务IP:24002/kafka> --config <name=value>2.      kafka-topics.sh命令查看配置命令如下:kafka-topics.sh --describe --topic <topicName> --zookeeper <ZK业务IP:24002/kafka>修改后的配置会出现在Configs中。3.      kafka-topics.sh命令删除配置命令如下:kafka-topics.sh --alter --topic <topicName> --zookeeper <ZK业务IP:24002/kafka> --delete-config <name>配置删除后,修改的配置也会从Configs中删除。二、          kafka-configs.sh1.     kafka-configs.sh命令增加/修改topic级别配置命令如下:kafka-configs.sh --alter --zookeeper <ZK业务IP:24002/kafka> --add-config <name=value> --entity-name <topicName > --entity-type topicsTopic级别配置的修改,查看是否成功,依然可以使用kafka-topics.sh --describe --topic <topicName> --zookeeper <ZK业务IP:24002/kafka>命令查看Configs中是否有修改的配置。2.     kafka-configs.sh命令删除topic级别配置命令如下:kafka-configs.sh --alter --zookeeper <ZK业务IP:24002/kafka> --delete-config <name=value> --entity-name <topicName > --entity-type topicsTopic级别配置的删除,查看是否成功,依然可以使用kafka-topics.sh --describe --topic <topicName> --zookeeper <ZK业务IP:24002/kafka>命令查看Configs中是否还存在删除的配置。3.     kafka-configs.sh命令修改broker级别配置命令如下:修改单个broker的配置命令如下:kafka-configs.sh --alter --bootstrap-server <kafka业务IP:port> --add-config <name=value> --entity-name <brokerID> --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。修改所有broker的配置命令如下:kafka-configs.sh --alter --bootstrap-server <kafka业务IP:port> --add-config <name=value> --entity-default --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。注意:当config中对应的name是broker端不存在的配置时命令也可以执行成功,但是查看配置时对应的配置为null。4.     kafka-configs.sh命令查看broker级别配置命令如下:查看单个broker的修改配置命令如下:kafka-configs.sh -- describe --bootstrap-server <kafka业务IP:port> --entity-name <brokerID> --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。查看所有broker的修改配置命令如下:kafka-configs.sh -- describe --bootstrap-server <kafka业务IP:port> --entity-default --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。5.     kafka-configs.sh命令删除broker级别配置命令如下:删除单个broker的配置命令如下:kafka-configs.sh --alter --bootstrap-server <kafka业务IP:port> --delete-config <name> --entity-name <brokerID> --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。删除所有broker的配置命令如下:kafka-configs.sh --alter --bootstrap-server <kafka业务IP:port> --delete-config <name> --entity-default --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。附录一:kafka-topics.sh支持的topic级别的配置修改如下:cleanup.policycompression.typedelete.retention.msfile.delete.delay.msflush.messagesflush.msfollower.replication.throttled.replicasindex.interval.bytesleader.replication.throttled.replicaslog.partition.strategymax.message.bytesmessage.format.versionmessage.timestamp.difference.max.msmessage.timestamp.typemin.cleanable.dirty.ratiomin.compaction.lag.msmin.insync.replicaspreallocateretention.bytesretention.mssegment.bytessegment.index.bytessegment.jitter.mssegment.msunclean.leader.election.enable附录二:kafka-configs.sh支持的topic级别的配置修改如下:cleanup.policycompression.typedelete.retention.msfile.delete.delay.msflush.messagesflush.msfollower.replication.throttled.replicasindex.interval.bytesleader.replication.throttled.replicaslog.partition.strategymax.message.bytesmessage.format.versionmessage.timestamp.difference.max.msmessage.timestamp.typemin.cleanable.dirty.ratiomin.compaction.lag.msmin.insync.replicaspreallocateretention.bytesretention.mssegment.bytessegment.index.bytessegment.jitter.mssegment.msunclean.leader.election.enablekafka-configs.sh支持的broker级别的配置修改如下:log.message.timestamp.typessl.client.authlog.retention.mssasl.kerberos.ticket.renew.window.factorlog.preallocatelog.index.size.max.bytesssl.truststore.typessl.keymanager.algorithmlog.cleaner.io.buffer.load.factorssl.key.passwordbackground.threadslog.retention.bytesssl.trustmanager.algorithmlog.segment.byteslog.cleaner.delete.retention.mslog.segment.delete.delay.msmin.insync.replicasssl.keystore.locationssl.cipher.suiteslog.roll.jitter.mslog.cleaner.backoff.mssasl.jaas.configprincipal.builder.classlog.flush.interval.mslog.cleaner.dedupe.buffer.sizelog.flush.interval.messagesadvertised.listenersnum.io.threadslistener.security.protocol.mapsasl.enabled.mechanismsssl.truststore.passwordlistenersmetric.reportersssl.protocolsasl.kerberos.ticket.renew.jitterssl.keystore.passwordsasl.mechanism.inter.broker.protocollog.cleanup.policysasl.kerberos.principal.to.local.rulessasl.kerberos.min.time.before.reloginnum.recovery.threads.per.data.dirlog.cleaner.io.max.bytes.per.secondlog.roll.msssl.endpoint.identification.algorithmunclean.leader.election.enablemessage.max.byteslog.cleaner.threadslog.cleaner.io.buffer.sizesasl.kerberos.service.namessl.providerfollower.replication.throttled.ratelog.index.interval.byteslog.cleaner.min.compaction.lag.mslog.message.timestamp.difference.max.msssl.enabled.protocolslog.cleaner.min.cleanable.ratioreplica.alter.log.dirs.io.max.bytes.per.secondssl.keystore.typessl.secure.random.implementationssl.truststore.locationsasl.kerberos.kinit.cmdleader.replication.throttled.ratenum.network.threadscompression.typenum.replica.fetcherskafka-configs.sh支持的user级别的配置修改如下:request_percentageproducer_byte_rateSCRAM-SHA-256SCRAM-SHA-512consumer_byte_ratekafka-configs.sh支持的client级别的配置修改如下:request_percentageproducer_byte_rateconsumer_byte_ratekafka-configs.sh支持的topics_limit级别的配置修改如下:producer_byte_rateconsumer_byte_rate说明1:附录一和附录二中的配置是651X版本支持的参数,651X之前版本可以具体执行kafka-topics.sh和kafka-config.sh命令查看--config和--add-config中的参数说明。说明2:使用kafka-config.sh修改num.io.threads、num.network.threads、num.replica.fetchers时,只能在原设置的基础上2倍以内扩大或者1/2倍以内缩小,否则会执行报错。例如,FI中num.io.threads默认为8,直接修改为24时报错如下:若把num.io.threads改为24,执行命令两次,先把值改为16,再改为24。
  • [其他问题] 在flink开发指南中,flinkkafkaexaple打包在linux上运行报错,报的错是:提取元数据时,kafka超时已过期
    【功能模块】【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)报错信息如下: The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9e3c78adac04823062e5328231935728)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:700)        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:219)        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:932)        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1005)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:422)        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1737)        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1005)Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9e3c78adac04823062e5328231935728)        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1651)        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1633)        at com.huawei.bigdata.flink.examples.ReadFromKafka.main(ReadFromKafka.java:59)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)        ... 11 moreCaused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9e3c78adac04823062e5328231935728)        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)        at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)        ... 19 moreCaused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)        at akka.actor.ActorCell.invoke(ActorCell.scala:561)        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)        at akka.dispatch.Mailbox.run(Mailbox.scala:225)        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
  • [其他] 生产集群5台kafka组件Broker实例节点,是否可以添加yarn组件的nodemanager实例?
    【操作步骤&问题现象】生产集群5台kafka组件Broker实例节点,是否可以添加yarn组件的nodemanager实例?建议和不建议的理由是什么?现网网络节点吞吐200M/s 上行万M;
  • [维护宝典] Kafka分区未同步
    一、          节点故障或磁盘下线1.      Kafka节点服务异常通过FusionInsight Manager页面,选择“集群->Kafka”,查看当前Kafka集群当前状态,状态是否是良好;如果状态不是良好,说明Kafka服务异常。2.      磁盘下线通过FusionInsight Manager页面查看告警信息,是否有“Kafka数据目录状态异常”的告警,如果有,server.log日志分析告警产生原因,重启数据目录异常节点。二、          网络异常检查网络是否正常方法如下:使用ping -s 15000 IP命令查看是否有网络丢包,如果网络延迟大于5ms,说明网络延迟高。使用scp从客户端传输一个大文件到kafka服务侧节点,查看传输速率。一般,未完全同步的分区总数监控曲线一直在变化或者通过kafka-topics.sh  --zookeeper zk业务IP:24002/kafka --describe --under-replicated-partitions命令查询处理的未完全同步的分区一直在变化,网络原因的可能性比较大。三、          副本线程下线1.     查看未完全同步分区所在节点执行kafka-topics.sh  --zookeeper zk业务IP:24002/kafka --describe --under-replicated-partitions命令查看所有未完全同步的分区,对比Replicas和ISR列表中的节点,判断未同步的分区是否集中在一个或者几个节点上。例如,下图中ISR列表中都是3未同步。    2.     查看节点信息执行kafka-broker-info.sh  --zookeeper zk业务IP:24002/kafka查看未同步分区所在节点。3.     Kafka配置页面查看“num.replica.ers”的配置大小。4.     切omm用户打jstack信息根据第二步获取的节点IP,连上此节点,su - omm切omm用户,jps |grep Kafka获取Kafka进程的pid,执行“jstack -l kafkapid >>文件名”,然后cat jstack文件|grep ReplicaFetcherThread-查看同步线程。注意:一个副本同步线程的格式如:ReplicaFetcherThread-tid-bid,tid代表一个线程编号,如果配置多个(由服务端配置参数num.replica.ers决定),这个线程号会从0开始依次增加;bid表示leader所在节点的broker-id,如果两个broker之间存在主副本关系,比如:leader在broker-5上,follower在broker-6上,那么broker-6至少会有一个线程名为ReplicaFetcherThread-0-5;如果num.replica.ers配置为2,broker-6 可能会存在两个副本同步线程ReplicaFetcherThread-0-5,ReplicaFetcherThread-1-5。一个节点理论上能够出现的副本同步线程个数为num.replica.ers *(集群broker实例个数 - 1)。但并不是num.replica.ers配置多少就一定会出现这么多个副本同步线程,线程的数量会根据当前broker节点的分区总数来决定,如果节点上面的副本数很少,kafka会判定分区少而没有必要启动这么多副本同步线程,则副本同步线程个数可能少于num.replica.ers *(broker实例个数 - 1)。通常建议num.replica.ers值配置为3即可,太大会影响CPU的使用率)。5.     查看分区同步线程是否下线。如果未查询到同步线程,或者num.replica.ers配置为1且同步线程个数小于(broker实例个数 - 1)的值,或者没有查找此节点到未同步的leader节点的线程,则有同步线程下线。具体可根据未完全同步分区开始上升的时间去server.log日志中排查原因,可搜索“Stopped”、“Shutdown”等关键字查找“ReplicaFetcherThread-”开头的线程是否异常下线。6.     副本线程下线相关BUG1) java.nio.BufferOverflowException2) unexpected offset in append to3) Attempt to append to a full index4) Trying to roll a new log segment for partition以上BUG均在6518及之后版本解决。四、          节点异常退服节点退服前未将退服节点上的分区迁移至未退服节点,具体查看方式是执行kafka-topics.sh  --zookeeper zk业务IP:24002/kafka --describe --under-replicated-partitions命令查看所有未完全同步的分区,对比Replicas和ISR列表中的节点,判断未同步的分区是否集中在一个或者几个节点上,如果是,执行kafka-broker-info.sh  --zookeeper zk业务IP:24002/kafka查看节点是否在此列表中,如果没有,确认之前是否退服过节点,如果是,可以确定是退服节点导致。解决办法是迁移退服节点上的分区到正常节点。五、          Kafka节点压力大iostat -d -x 1查看CPU参数idle和IO参数util,idle值越高,CPU越空闲,util值越高,IO使用率越高。如果idle值小于20%,top -Hp kafkaPid查看CPU使用率高的线程,打jstack日志分析具体的原因;如果util值大于80%,查看磁盘对应的读写速率、await和svctm的大小判断对IO影响大的原因,如果读写速率比较大,排查kafka读写请求和延时,如果awati远大于svctm,IO队列太长,应用响应时间很慢。Kafka的数据流量监控中在未同步分区出现时有没有陡增,字节输入输出流量是否达到网络阈值。
  • [维护宝典] Kafka生产发送数据失败
    我们使用kafka时,有时候会遇到发送数据失败的情况,其原因及解决方案如下:1.     Kafka topic leader为-1Kafka客户端执行如下命令查看topic的leader信息:kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka如果leader为-1,需查看Replicas中的副本节点是否正常,查看命令如下:kafka-broker-info.sh --zookeeper zk业务IP:24002/kafka命令中可以查询到brokerid,说明节点kafka服务正常如果leader为-1的分区对应的Replicas中节点都不正常,需要先恢复异常节点kafka服务。如果都正常但ISR列表中无节点信息,或者ISR列表中的节点不正常,需要查看Kafka服务端的unclean.leader.election.enable参数是否为true或者topic端是否配置此参数为true。如果都不为true,需要对此topic修改配置为true。Kafka服务端的unclean.leader.election.enable参数配置查看方式如下:                                             Kafka topic端unclean.leader.election.enable参数配置查看方式如下:kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka --topic topicName如果Config中无unclean.leader.election.enable信息,则与服务端配置一致。如果有,则此配置优先级高于服务端配置。修改topic端此配置的方式如下:kafka-topics.sh --alter --topic topicName --zookeeper zk业务IP:24002/kafka --config unclean.leader.election.enable=true2.     DNS配置导致执行 vi /etc/resolv.conf,如果有“nameserver X.X.X.X”,把此内容注释掉3.     网络异常生产端节点ping服务端IP,ping -s 15000 IP,如果延迟高于5ms,说明网络延迟过高,也可通过长ping来判断是否有网络丢包。4.     CPU或者IO过高也可能导致连接失败iostat -d -x 1查看CPU参数idle和IO参数util,idle值越高,CPU越空闲,util值越高,IO使用率越高。如果idle值小于20%,top -Hp kafkaPid查看CPU使用率高的线程,打jstack日志分析具体的原因;如果util值大于80%,查看磁盘对应的读写速率、await和svctm的大小判断对IO影响大的原因,如果读写速率比较大,排查kafka读写请求和延时,如果awati远大于svctm,IO队列太长,应用响应时间很慢。5.     磁盘坏道或者其他原因也可能导致连接失败如果server.log日志中有大量“java.io.IOException: Connection to IP:21007 (id: 2 rack: null) failed”日志,查看IP节点操作系统日志中是否有“Sense Key : Medium Error”信息,如果有,说明出现磁盘坏道,需修复或更换磁盘。另外,还需要打此节点的jstack信息,排查是否有阻塞或者死锁。如果是C80版本,且jstack信息中有如下信息,需打死锁补丁:6.     如果报“TimeoutException”或偶尔发送失败,可先调大request.timeout.ms,并查看服务端num.io.threads和num.network.threads是否可以优化(这两个参数一般调整为节点磁盘个数的倍数)。根据发送的数据量的大小也可适当调整batch.size、buffer.memory、linger.ms的大小。如果发送的数据量很大且可容忍一定时延,也可以考虑开启压缩,compression.type指定压缩方式,可配置为“gzip”、“snappy”或“lz4”。7.     ssh卡住ssh -v -p 端口号 异常节点ip如果如上图所示卡住,解决方式是将GSSAPIAuthentication修改为no8.     如果是集群外客户端生产发送失败,还可以通过集群内客户端测试下生产是否成功,进一步减小排查方向。
总条数:158 到第
上滑加载中