-
如题所示,报下面的错误,借鉴网上https://blog.csdn.net/li1987by/article/details/82856873的改法。将kafka-clients-2.6.0.jar 替换为kafka-clients-1.1.0.jar后,改错误消失,但是在哪里可以下载到华为kafka-clients-2.6.0.jar 的jar??我们需要后面spark 任务需要kafka-client2.6
-
如题所述,kafka-console-producer.sh & kafka-console-consumer.sh 执行正常。但是代码执行失败, kerberos验证开启debug模式后,日志观察成功。jar引用是华为kafka client libs。kafka版本为:2.11-1.1.0哪位大佬给指点下,万分感谢。----------------------------String krb5 = args[0];String jaasPath = args[1];String broker= args[2];// todoSystem.setProperty("java.security.krb5.conf", krb5);System.setProperty("java.security.auth.login.config", jaasPath);System.setProperty("zookeeper.server.principal", "zookeeper/hadoop.hadoop.com");Properties props = new Properties();props.put("bootstrap.servers", broker);props.put("group.id", "g1");props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", ByteBufferDeserializer.class.getName());// todoprops.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "GSSAPI");props.put("sasl.kerberos.service.name", "kafka");// adminclientAdminClient client = AdminClient.create(props);ListTopicsResult listTopics = client.listTopics();Set<String> strings = listTopics.names().get();System.out.println(strings);client.close();// kafkaconsumerKafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();System.out.println(stringListMap.size());consumer.close();
-
【操作步骤&问题现象】HD 6.5.1.7版本 kafka节点出现单台故障实例后,执行删除topic标记删除问题业务侧自己执行zk 将对应topic信息删除,后创建topic使用时报错元数据不存在在单节点故障情况下是否可以执行,清理zk元数据 addauth krbgroup deleteall /kafka/brokers/topics/topicnamedeleteall /kafka/config/topics/topicname手动清理Kafka所有节点上的该topic的数据文件重启kafka实例 恢复?
-
【功能模块】kafka【操作步骤&问题现象】1、编写程序,大致逻辑:SparkStreaming读取kafka中的数据,然后写入hbase中2、此demo是华为云上的样例demo,视频地址:https://bbs.huaweicloud.com/forum/thread-90888-1-1.html提交方式使用的是yarn-client【截图信息】【日志信息】(可选,上传日志内容或者附件)一直在重复报地址连不上[Consumer clientId=consumer-testGroup-1, groupId=testGroup] Bootstrap broker 172.31.8.38:21007 (id: -2 rack: null) disconnected | org.apache.kafka.clients.NetworkClient.handleServerDisconnect,详细日志见附件
-
【功能模块】关于kafka节点部署台数规划【操作步骤&问题现象】1、某局点kafka broker节点部署了4个,在开会讨论中,客户领导说kafka节点必须保持奇数,否则有台broker会浪费,而且leader选举时也会有问题2、但实际情况4台broker节点都进行了存储和使用,并没有出现客户说的问题,而且产品文档中也说明了,broker节点最少三台,并没有说必须保持奇数。只是zookeeper需要保持奇数。客户想让说明,问什么开源的需要保持奇数,但FusionInsight HD 没有这个要求,麻烦大佬帮忙解释下。【截图信息】【日志信息】(可选,上传日志内容或者附件)
-
提示“您不在此活动范围。”为什么呀?不懂,瑟瑟发抖!
-
提示“您不在此活动范围。”为什么呀?不懂,瑟瑟发抖!
-
说明:本文参考https://bbs.huaweicloud.com/forum/thread-147823-1-1.html一、操作系统和软件版本介绍1.操作系统为openEuler 20.03 (LTS-SP1) 可用如下命令查询:[root@1ocalhost ~]# cat /etc/os-release [root@1ocalhost ~]# uname -i2.软件版本 Kafka版本为:Kafka 2.7.03.JDK依赖 Kafka 是用Scala 语言开发的,运行在JVM上,因此在安装Kafka 之前需要先安装JDK。openEuler 20.03 (LTS-SP1) 默认没有安装JDK环境,需要自己自行安装。 这里安装JDK环境做简单说明,请开发者自行安装。[root@1ocalhost ~]# yum install java输入Y确认配置环境变量vim /etc/profile export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-1.h5.oe1.x86_64 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar export PATH=$PATH:$JAVA_HOME/bin环境变量生效source /etc/profile 查看JDK版本[root@1ocalhost ~]# java -version openjdk version "1.8.0_242" OpenJDK Runtime Environment (build 1.8.0_242-b08) OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)二、详细安装步骤1.在/root目录创建data目录cd /root mkdir data2.进入data目录并下载Kafka 2.7.0 [root@1ocalhost ~]# cd data/ [root@1ocalhost data]# wget http://archive.apache.org/dist/kafka/2.7.0/kafka_2.12-2.7.0.tgz3.解压并进入到解压后的目录[root@1ocalhost data]# tar -zvxf kafka_2.12-2.7.0.tgz [root@1ocalhost data]# cd kafka_2.12-2.7.04.启动zookeeper服务和kafka服务,可以在命令的结尾加个&符号,这样服务就可以在后台运行 说明:单节点的 kafka 是不用修改配置文件,直接照官网的介绍就可以,见kafka官网入门[root@1ocalhost kafka_2.12-2.7.0]# /root/data/kafka_2.12-2.7.0/bin/zookeeper-server-start.sh /root/data/kafka_2.12-2.7.0/config/zookeeper.properties > zookeeper.log 2>&1 & [root@1ocalhost kafka_2.12-2.7.0]# /root/data/kafka_2.12-2.7.0/bin/kafka-server-start.sh /root/data/kafka_2.12-2.7.0/config/server.properties > kafkastart.log 2>&1 & 三、验证 kafka原理:Kafka集群将 Record 流存储在称为 Topic 的类别中,每个记录由一个键、一个值和一个时间戳组成。Kafka 中消息是以 Topic 进行分类的,生产者生产消息,消费者消费消息,面向的都是同一个 Topic。Topic 是逻辑上的概念,而 Partition 是物理上的概念,每个 Partition 对应于一个 log 文件,该 log 文件中存储的就是 Producer 生产的数据。Producer 生产的数据会不断追加到该 log 文件末端,且每条数据都有自己的 Offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个 Offset,以便出错恢复时,从上次的位置继续消费。1.创建一个叫"itren"的topic,它只有一个分区,一个副本:[root@1ocalhost kafka_2.12-2.7.0]# /root/data/kafka_2.12-2.7.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic itren2.发送消息,运行producer并在控制台中输一些消息,这些消息将被发送到服务端[root@1ocalhost kafka_2.12-2.7.0]# /root/data/kafka_2.12-2.7.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic itren >it520 >itren666 >byebye >3.在另一个终端开启consumer,可以读取到刚才发出的消息并输出[root@1ocalhost ~]# /root/data/kafka_2.12-2.7.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic itren it520 itren666 byebye
-
一、操作系统和软件版本介绍1.操作系统为openEuler 20.03 (LTS-SP1) 可用如下命令查询:[root@ecs-2e3d ~]# cat /etc/os-release [root@ecs-2e3d ~]# uname -i2.软件版本 Kafka版本为:Kafka 2.5.03.JDK依赖 Kafka 是用Scala 语言开发的,运行在JVM上,因此在安装Kafka 之前需要先安装JDK。openEuler 20.03 (LTS-SP1) 默认没有安装JDK环境,需要自己自行安装。 这里安装JDK环境做简单说明,请开发者自行安装。[root@ecs-2e3d ~]# yum install java输入y 配置环境变量 JDK默认安装路径/usr/lib/jvm在/etc/profile文件末尾配置jdk路径vi /etc/profile# set java environment JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-1.h5.oe1.x86_64 PATH=$PATH:$JAVA_HOME/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export JAVA_HOME CLASSPATH PATH保存关闭profile文件,执行如下命令生效source /etc/profile 查看JDK版本[root@ecs-2e3d ~]# java -version openjdk version "1.8.0_242" OpenJDK Runtime Environment (build 1.8.0_242-b08) OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)二、详细安装步骤1.在/root创建data目录cd /root mkdir data2.进入data目录并下载Kafka 2.5.0 [root@ecs-2e3d ~]# cd data [root@ecs-2e3d data]# wget http://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz3.解压并进入到解压后的目录[root@ecs-2e3d data]# tar -zvxf kafka_2.12-2.5.0.tgz [root@ecs-2e3d data]# cd kafka_2.12-2.5.04.启动zookeeper服务和kafka服务,可以在命令的结尾加个&符号,这样服务就可以在后台运行 说明:单节点的 kafka 是不用修改配置文件,直接照官网的介绍就可以,见kafka官网入门[root@ecs-2e3d kafka_2.12-2.5.0]# /root/data/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh /root/data/kafka_2.12-2.5.0/config/zookeeper.properties > zookeeper.log 2>&1 & [1] 7742 [root@ecs-2e3d kafka_2.12-2.5.0]# /root/data/kafka_2.12-2.5.0/bin/kafka-server-start.sh /root/data/kafka_2.12-2.5.0/config/server.properties > kafkastart.log 2>&1 & [2] 8115 [root@ecs-2e3d kafka_2.12-2.5.0]# 三、验证Kafka工作流程Kafka将消息按Topic进行分类,每条message由三个属性组成。 offset:表示message在当前Partition(分区)中的偏移量,是一个逻辑上的值,唯一确定了Partition中的一条message,可以简单的认为是一个id; MessageSize:表示message内容data的大小; data:message的具体内容;在整个kafka架构中,生产者和消费者采用发布和订阅的模式,生产者生产消息,消费者消费消息,它俩各司其职,并且都是面向topic的。(需要注意:topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据)Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,这样当出现故障并恢复后,可从这个offset位置继续进行消费,避免漏掉数据或者重复消费。1.创建一个叫"demo"的topic,它只有一个分区,一个副本:[root@ecs-2e3d kafka_2.12-2.5.0]# /root/data/kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo Created topic demo. [root@ecs-2e3d kafka_2.12-2.5.0]#2.发送消息,运行producer并在控制台中输一些消息,这些消息将被发送到服务端[root@ecs-2e3d kafka_2.12-2.5.0]# /root/data/kafka_2.12-2.5.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo >morning >nihao >goodday! >3.在另一个终端开启consumer,可以读取到刚才发出的消息并输出[root@ecs-2e3d kafka_2.12-2.5.0]# /root/data/kafka_2.12-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo morning nihao goodday!
-
一、操作系统和软件版本介绍1.操作系统为openEuler 20.03 (LTS-SP1) 可用如下命令查询:[root@ecs-1d84 ~]# cat /etc/os-release [root@ecs-1d84 ~]# uname -i2.软件版本 Kafka版本为:Kafka 2.4.03.JDK依赖 Kafka 是用Scala 语言开发的,运行在JVM上,因此在安装Kafka 之前需要先安装JDK。openEuler 20.03 (LTS-SP1) 默认没有安装JDK环境,需要自己自行安装。 这里安装JDK环境做简单说明,请开发者自行安装。[root@ecs-1d84 ~]# yum install java输入y 配置环境变量 JDK默认安装路径/usr/lib/jvm在/etc/profile文件末尾配置jdk路径vi /etc/profile# set java environment JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-1.h5.oe1.x86_64 PATH=$PATH:$JAVA_HOME/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export JAVA_HOME CLASSPATH PATH保存关闭profile文件,执行如下命令生效source /etc/profile查看JDK版本[root@ecs-1d84 ~]# java -version openjdk version "1.8.0_242" OpenJDK Runtime Environment (build 1.8.0_242-b08) OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)二、详细安装步骤1.在/root创建data目录cd /root mkdir data2.进入data目录并下载Kafka 2.4.0 [root@ecs-1d84 ~]# cd data [root@ecs-1d84 data]# wget http://archive.apache.org/dist/kafka/2.4.0/kafka_2.11-2.4.0.tgz3.解压并进入到解压后的目录[root@ecs-1d84 data]# tar -zvxf kafka_2.11-2.4.0.tgz [root@ecs-1d84 data]# cd kafka_2.11-2.4.04.启动zookeeper服务和kafka服务,可以在命令的结尾加个&符号,这样服务就可以在后台运行 说明:单节点的 kafka 是不用修改配置文件,直接照官网的介绍就可以,见kafka官网入门[root@ecs-1d84 kafka_2.11-2.4.0]# /root/data/kafka_2.11-2.4.0/bin/zookeeper-server-start.sh /root/data/kafka_2.11-2.4.0/config/zookeeper.properties > zookeeper.log 2>&1 &[root@ecs-1d84 kafka_2.11-2.4.0]# /root/data/kafka_2.11-2.4.0/bin/kafka-server-start.sh /root/data/kafka_2.11-2.4.0/config/server.properties > kafkastart.log 2>&1 &三、验证 Kafka工作流程Kafka将消息按Topic进行分类,每条message由三个属性组成。 offset:表示message在当前Partition(分区)中的偏移量,是一个逻辑上的值,唯一确定了Partition中的一条message,可以简单的认为是一个id; MessageSize:表示message内容data的大小; data:message的具体内容;在整个kafka架构中,生产者和消费者采用发布和订阅的模式,生产者生产消息,消费者消费消息,它俩各司其职,并且都是面向topic的。(需要注意:topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据)Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,这样当出现故障并恢复后,可从这个offset位置继续进行消费,避免漏掉数据或者重复消费。1.创建一个叫"test"的topic,它只有一个分区,一个副本:[root@ecs-1d84 kafka_2.11-2.4.0]# /root/data/kafka_2.11-2.4.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic test. [root@ecs-1d84 kafka_2.11-2.4.0]# 2.发送消息,运行producer并在控制台中输一些消息,这些消息将被发送到服务端[root@ecs-1d84 kafka_2.11-2.4.0]# /root/data/kafka_2.11-2.4.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >goodnight >20210818 >3.在另一个终端开启consumer,可以读取到刚才发出的消息并输出[root@ecs-1d84 kafka_2.11-2.4.0]# /root/data/kafka_2.11-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test goodnight 20210818现在所有完成,请指正。
-
一、操作系统和软件版本介绍1.操作系统为openEuler 20.03 (LTS-SP1) 可用如下命令查询:[root@localhost ~]#cat /etc/os-release [root@localhost ~]# uname -i2.软件版本 Kafka版本为:Kafka 2.1.03.JDK依赖 Kafka 是用Scala 语言开发的,运行在JVM上,因此在安装Kafka 之前需要先安装JDK。openEuler 20.03 (LTS-SP1) 默认没有安装JDK环境,需要自己自行安装。 这里安装JDK环境做简单说明,请开发者自行安装,我这里安装的是 openjdk version "1.8.0_242"。 yum install java 输入 y 查看JDK版本[root@localhost ~]#java -version二、详细安装步骤1.在根目录创建data目录 mkdmkdir data ir data2.进入data目录并下载Kafka 2.1.0 [root@localhost /]# cd data [root@localhost data]# wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz3.解压并进入到解压后的目录[root@localhost data]# tar -zvxf kafka_2.11-2.1.0.tgz [root@localhost data]# cd kafka_2.11-2.1.0 4.启动zookeeper服务和kafka服务,可以在命令的结尾加个&符号,这样服务就可以在后台运行 说明:单节点的 kafka 是不用修改配置文件,直接照官网的介绍就可以,见kafka官网入门[root@localhost kafka_2.11-2.1.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &[root@localhost kafka_2.11-2.1.0]#./bin/kafka-server-start.sh ./config/server.properties &三、验证1.创建一个叫"kafkademo"的topic,它只有一个分区,一个副本:[root@localhost kafka_2.11-2.1.0]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkademo2.发送消息,运行producer并在控制台中输一些消息,这些消息将被发送到服务端[root@localhost kafka_2.11-2.1.0]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkademo3.在另一个终端开启consumer,可以读取到刚才发出的消息并输出[root@localhost kafka_2.11-2.1.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkademo至此所有操作已完成,请指正。
-
【功能模块】按照官方的代码,提交flink消费kafka任务后,出现错误。在flink客户端使用的命令为bin/flink run -yt conf/ssl/ -ys 2 -m yarn-cluster -yjm 1024 -ytm 1024 -c org.mytest.stream.ReadFromKafka /opt/flink/flink.jar --topic topictest --bootstrap.servers $bs --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka【截图信息】【日志信息】(可选,上传日志内容或者附件)[root@node3 flink]# bin/flink run -yt conf/ssl/ -ys 2 -m yarn-cluster -yjm 1024 -ytm 1024 -c org.mytest.stream.ReadFromKafka /opt/flink/flink.jar --topic topictest --bootstrap.servers $bs --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafkaSLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/opt/hadoopclient/Flink/flink/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/hadoopclient/HDFS/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]Cluster started: Yarn cluster with application id application_1625541283910_0063Job has been submitted with JobID f50af8c85ed9c74e813f52c71231674fjava.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f50af8c85ed9c74e813f52c71231674f) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1651) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1633) at org.mytest.stream.ReadFromKafka.main(ReadFromKafka.java:21) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:700) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:219) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:932) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1005) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1737) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1005)Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f50af8c85ed9c74e813f52c71231674f) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) ... 19 moreCaused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.<init>(KafkaFetcher.java:109) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.createFetcher(FlinkKafkaConsumer.java:237) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)[root@node3 flink]# MRS版本:3.0.2flink版本:1.10.0
-
【功能模块】开发环境集群版本:6.5.1【操作步骤&问题现象】1、参考文档:Spring-kafka对接华为FusionInsight Kafka样例2、按照参考文档操作,测试时报没有方法错误【截图信息】pom文件:运行日志:【日志信息】(可选,上传日志内容或者附件)
-
##一、kafka简单介绍 kafka大家的第一印象是一个消息系统。但是kafka的官网的说法是: ## Apache Kafka® is a distributed streaming platform ##### kafka是一个分布式流处理平台,而流处理平台主要具备以下三种能力: 1.发布和订阅消息流,类似于消息队列或企业消息传递系统。 2.以容错的持久化方式储存消息流。 3.可以在产生消息流的时候,同时进行处理。 ##### 而kafka具备以下几个特性: 1.kafka作为一个集群可以运行在一个或者多个服务器上,这些服务器可以跨多个数据中心。 2.kafka集群存储的消息是按照topic(主题)进行分类的。 3.每个消息(也被称为记录)是由一个key,一个value和一个时间戳构成。 ##### kafka对外提供了四种核心API: 1.Producer API,允许应用程序发布消息到kafka集群上的1个或多个的topic。 2.Consumer API,允许应用程序订阅一个或多个topic,并处理这些topic的消息。 3.Streams API,允许应用程序充当一个流处理器,从1个或多个topic消费输入流,并产生一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。 4.Connector API,允许构建运行可重复使用的生产者或消费者,将topic和现有的应用程序或数据系统连接起来。例如,一个关系型数据库的连接器可以捕获到该库下每一个表的变化。  ##二、Producer发送数据到kafka的流程 DRS同步到kafka:DRS作为kafka的客户端,利用kafka的Producer API;将源端数据库产生的增量数据写入到目标kafka的topic上。 ##### 以mysql到kafka为例,大致流程入下: 1.将源端mysql的binlog日志记录的增量数据作为消息封装成一个Record。 2.经过拦截器,对消息进行过滤。 3.经过序列化器,将消息的key和value进行序列化,当然可以自行定义序列化规则或者自行编写序列化器。 4.消息经过分区器,确定这条消息需要发送到目标topic的分区号。如果在消息里面指定了partion字段,那么就是将消息发送到指定分区。 5.之后消息会封装从成一个一个批次汇总到RecordAccumulator。accumulator可以作为一个缓存,是kafka强大的写入性能原因之一。 6.之后会依赖一个后台唤醒的Sender线程,将数据有序的发送到leader partition所在的broker(kafka集群的每一个服务器都是一个broker)中。 7.在发送消息的过程中,kafka客户端可以从任意一个broker获取到kafka集群的metadata信息,metadata信息里面记录了kafka集群的每个topic的所有partition的信息: leader, fellow, isr, replicas等。 整体的流程如下图所示  ## 三、Producer两个重要参数 1.acks决定了生产者如何在性能与数据可靠之间做取舍,官方源码中描述如下: ``` public static final String ACKS_CONFIG = "acks"; private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are allowed: " + "" + "acks=0 If set to zero then the producer will not wait for any acknowledgment from the" + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be" + " made that the server has received the record in this case, and the retries configuration will not" + " take effect (as the client won't generally know of any failures). The offset given back for each record will" + " always be set to -1." + "acks=1 This will mean the leader will write the record to its local log but will respond" + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after" + " acknowledging the record but before the followers have replicated it then the record will be lost." + "acks=all This means the leader will wait for the full set of in-sync replicas to" + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting." + ""; ``` 对于kafka来说,消息日志是按照topic分类存储的,而对于一个topic来说有partitons分区数,replication-factor副本数。 对于一个topic而言有多个分区,一个分又可以有多个副本。这些副本中,只有一个leader partition。其他都是follower partiton,仅有leader partition可以对外提供服务,follower partiton主要用于冗余备份。 而副本是存放在不同的broker上面的,因此在创建topic的时候,副本数不能大于broker的节点数的。 而acks参数呢,就是和副本有关系。 ``` acks=0:这意味着producer发送数据后,不会等待broker确认,直接发送下一条数据,性能最好 acks=1:为1意味着producer发送数据后,需要等待leader副本确认接收后,才会发送下一条数据,性能次之 acks=-1/all:这个代表的是all,意味着发送的消息写入leader partition后,等到follower从leader拉取到消息后,才会发送下一条数据,性能最差,但可靠性最强 ``` 而DRS以可靠性优先,因此设置的acks参数值为all,确保消息写入到所有可用副本后,才进行下一条写入。 2.max.in.flight.requests.per.connection,官方源码描述如下: ``` public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of" + " message re-ordering due to retries (i.e., if retries are enabled)."; ``` ``` // 在InFlightRequests.java中 /** * Can we send more requests to this node? * * @param node Node in question * @return true iff we have no requests still being sent to the given node */ public boolean canSendMore(String node) { Deque queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() this.maxInFlightRequestsPerConnection); } ``` ``` // 在Sender.java中 if (guaranteeMessageOrder) { // Mute all the partitions drained for (List batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } ``` max.in.flight.requests.per.connection表示在单个连接中,最多可以忍受多少个请求处于发送中没有没有响应。kafka源码中这个参数默认是5,可以认为,在一个连接中有5个请求发送出去了,并且Producer都没有收到broker的响应。 如果这个参数大于1,由于有重试机制,可能会存在消息顺序错乱的风险。 如下图,在一个网络连接中将batch封装成不同的request,从batch队列中取出数据,按照顺序封装成不同的request(请求1... 请求5).  如果broker在处理请求2时因为borker节点不可以等因素导致写消息到partition异常了,但是其它请求的数据都正常写入了。此时由于重试机制,Producer会将请求2重新发送。 导致broker写入到leader partition消息顺序错乱。  而DRS为了保证数据写入到kafka是有序的,max.in.flight.requests.per.connection参数设置为1,但是这样降低了kafka的吞吐量。
-
【功能模块】mrs-flink【操作步骤&问题现象】1、使用per-job模式上传样例代码到集群上运行,执行FemaleInfoCollectionFromKafka这个样例;2、flink任务创建成功,但是任务执行时报错,显示KafkaConsumer的一个方法不存在。【截图信息】报错信息:客户端lib里已补充相关jar包,不知道具体缺少哪个?【日志信息】(可选,上传日志内容或者附件)org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.(KafkaFetcher.java:109) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.createFetcher(FlinkKafkaConsumer.java:237) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
上滑加载中
推荐直播
-
HDC深度解读系列 - Serverless与MCP融合创新,构建AI应用全新智能中枢2025/08/20 周三 16:30-18:00
张昆鹏 HCDG北京核心组代表
HDC2025期间,华为云展示了Serverless与MCP融合创新的解决方案,本期访谈直播,由华为云开发者专家(HCDE)兼华为云开发者社区组织HCDG北京核心组代表张鹏先生主持,华为云PaaS服务产品部 Serverless总监Ewen为大家深度解读华为云Serverless与MCP如何融合构建AI应用全新智能中枢
回顾中 -
关于RISC-V生态发展的思考2025/09/02 周二 17:00-18:00
中国科学院计算技术研究所副所长包云岗教授
中科院包云岗老师将在本次直播中,探讨处理器生态的关键要素及其联系,分享过去几年推动RISC-V生态建设实践过程中的经验与教训。
回顾中 -
一键搞定华为云万级资源,3步轻松管理企业成本2025/09/09 周二 15:00-16:00
阿言 华为云交易产品经理
本直播重点介绍如何一键续费万级资源,3步轻松管理成本,帮助提升日常管理效率!
回顾中
热门标签