-
一、操作系统和软件版本介绍1.操作系统为openEuler 20.03 (LTS-SP1) 可用如下命令查询: cat /etc/os-release 2.软件版本 Kafka版本为:Kafka 2.2.0 3.JDK依赖 Kafka 是用Scala 语言开发的,运行在JVM上,因此在安装Kafka 之前需要先安装JDK。openEuler 20.03 (LTS-SP1) 默认没有安装JDK环境,需要自己自行安装。 这里安装JDK环境做简单说明,请开发者自行安装,我这里安装的是 openjdk version "1.8.0_242"。 yum install java 输入 y 查看JDK版本 java -version二、详细安装步骤1.在根目录创建data目录 mkdir data2.进入data目录并下载Kafka 2.2.0 cd data wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz3.解压并进入到解压后的目录tar -zvxf kafka_2.12-2.2.0.tgz cd kafka_2.12-2.2.04.启动zookeeper服务和kafka服务,可以在命令的结尾加个&符号,这样服务就可以在后台运行 ./bin/zookeeper-server-start.sh ./config/zookeeper.properties & ./bin/kafka-server-start.sh ./config/server.properties & 三、验证1.创建一个叫"euler"的topic,它只有一个分区,一个副本: ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic euler2.发送消息,运行producer并在控制台中输一些消息,这些消息将被发送到服务端 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic euler3.在另一个终端开启consumer,可以读取到刚才发出的消息并输出 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic euler 至此所有操作已完成,请指正。
-
问题背景描述:某局点某Kafka节点文件句柄数很高,导致此节点上Kafka的请求失败,lsof -p kafkapid发现Kafka进程有大量socket泄漏,如下图所示:问题分析步骤:1. 执行ls -l /proc/kafkapid/fd >> fd.txt命令查看Kafka进程的文件描述符2. 执行trace -T -tt -f -F output.txt -p kafkapid命令打出kafka进程的调用信息3. 几分钟后再次执行执行ls -l /proc/kafkapid/fd >> fd1.txt命令查看Kafka进程的文件描述符4. 对比1和3步骤中的文件描述符,查找增加的socket连接,多查找几个增加socket连接,根据socket对应的id号从2步骤中获取的信息中查看对应的id连接调用信息5. 如果查找socket连接对应的客户端,可以通过accept查找,如下图所示此查询查询到对应的socketfd对应的所有的客户端连接,多次查找后可对比客户端连接来进一步确认客户端的范围。6. 通过accept查找的原因是socket通信中accept函数获取客户端的连接:int accept(int sockfd,struct sockaddr * addr,socklen_t * addrlen);sockfd的参数为listen()函数返回的监听套接字;addr是一个传出参数,表示客户端的地址,该参数设置为NULL时,表示不关心客户端的地址。addrlen为一个传入传出参数,传入时为函数调用时提供参数addr的长度,传出时为客户端地址结构体的实际长度;accept的返回值也是一个套接字,该套接字用于与本次通信的客户端进行数据交互。
-
问题描述:FusionInsight C80版本,通过Flink提交消费Kafka的任务,消费Kafka的消费者组信息通过kafka-consumer-groups.sh --list命令查询不到,但是通过kafka-consumer-groups.sh --describe可以查询到信息。问题分析:1、C80版本kafka-consumer-groups.sh命令使用list查询groupid源码分析如下:a. kafka-consumer-groups.sh命令的入口是执行ConsumerGroupCommand中的main函数b. ConsumerGroupCommand中的main函数执行时根据使用的是--zookeeper还是--bootstrap-server判断consumerGroupService是ZkConsumerGroupService还是KafkaConsumerGroupService。如果命令中有--list,就执行listGroups().foreach(println(_))c. KafkaConsumerGroupService中listGroups()的实现是通过adminClient获取所有的consumerGroup,源码如下:d. AdminClient中获取listAllConsumerGroupsFlattened的实现是获取所有的group组并过滤出所有协议是consumer的group组,源码如下:2. 提交一个flink作业,groupid设置为“testgroup”,使用Kafka中的AdminClient来获取listAllGroupsFlattened的信息,执行结果如下:如上图所示:执行结果中GroupOverview中testgroup对应的协议为空,而不是consumer,所以,使用list查询不到结果。3. Flink作业消费kafka时,GroupOverview中groupid对应的协议为空的原因是:flink不是直接调用的kafka consumer client端的消费接口,而是通过自己的逻辑去消费、只用kafka中的__consumer_offsets保存数据。4. 此问题在651版本已经解决,651版本中增加的协议为空的判断逻辑,源码如下所示:
-
一、 无权限创建Topic会报错“ERROR org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /config/topics/topicName。解决办法是使用具体kafkaadmin权限的用户。二、 Quota限制创建Topic会报错“Nodes number exceed quota limit for /config/topics/topicName”。解决办法是:检查Kafka全部配置中Quota配置quota.number是否是250000,如果不是改为此值。同步zk配置。FusionInsight Manager页面“集群->Zookeeper->更多->同步配置”,同步配置操作每5分钟执行一次,最多等5分钟,zk客户端执行listquota /kafka查看quota的count值是否为250000。zk客户端执行操作方法如下:进入FI客户端(例如/opt/hadoopclient)执行命令source bigdata_env执行命令kinit 用户名(普通模式跳过)执行sh -server zk业务IP:24002三、 副本个数大于正常节点数创建Topic会报错“ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: X larger than available brokers: X”解决办法是:解决异常节点问题或减少副本数。四、 客户端版本低客户端是低版本而服务端是高版本时,创建Topic可能不会报错但是查看topic的describe信息时leader为none。原因是zk上报没有权限访问“/brokers/topics/topicName”目录,此目录下无法创建partitions目录。解决办法是:使用与服务端对应的客户端版本。
-
华为云DMS Kafka能否在能力上匹配下Confluent
-
【功能模块】FI集群的flink 是开启的Kerberos认证 再flink-conf.yml中配置过的。现在需要用flink消费另外一个集群中开启Kerberos认证的kafka 拿到了kafka集群的的认证文件 krb5.conf jaas.conf user.keytab。【操作步骤&问题现象】1、整个flink应用是需要自己集群的认证 ,怎么在一个flink程序中认证另外一个kafka ,将其当作该Flink应用的source源,然后处理呢?拜托各位大佬提供一下处理思路!
-
再使用kafka的过程中,用到filebeat采集日志到kfka,非安全模式的kafka开源直接使用,但是安全模式的kafka,尝试使用类似logstash对接安全模式kafka,未能连通,请问filebeat对接安全的kafka需要如何配置?
-
除了kafka,有没有什么服务能无缝迁移rocketmq业务
-
一、无权限创建Topic会报错“ERROR kafka.admin.AdminOperationException: Error while deleting topic topicName解决办法是使用具有kafkaadmin权限的用户。二、“topic.enable”配置为falseFusionInsight Manager页面“集群->Kafka->配置->全部配置”查看Kafka服务端delete.topic.enable参数配置。如果参数设置为false,改为true后保存并重启kafka服务。三、节点异常或节点磁盘下线或数据目录异常1. 节点异常FusionInsight Manager页面“集群->Kafka->实例”查看各个实例状态是否是良好。2. 磁盘下线FusionInsight Manager有没有“数据目录状态异常”的告警。Topic副本所在节点server.log日志中搜“offline”关键字和“checkpoint file”关键字查看磁盘是否下线或因checkpoint文件问题没有上线。搜“cannot allocate memory”关键字查看是否内存不足导致。搜“No space left”关键字查看是否磁盘写满。注意:出现“数据目录状态异常”的告警后,只有重启告警节点才可以使磁盘重新上线。3. 数据目录权限异常节点上Kafka数据目录(一般是“/srv/BigData/kafka/dataX/kafka-logs”)目录权限是否正常。四、删除后又自动创建TopicFusionInsight Manager页面“集群->Kafka”查看controller所在主机节点,此节点上Kafka日志目录中查看controller.log,如果日志中有“Deletion of topic topicName successfully completed”和“New topics: [Set(topicName)]”说明topic又被自动创建。五、Controller未执行删除如果controller.log日志中只有“Starting topic deletion”日志而未打印具体的执行成功或失败的日志,可尝试切controller,步骤如下:1. 进FI客户端(例如/opt/hadoopclient)2. 执行zkCli.sh -server zk业务IP:24002/kafka3. 执行get /controller4. 若获取到controller信息,执行deleteall /controller和get /controller。
-
【功能模块】【操作步骤&问题现象】1报错信息如下:2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
-
【功能模块】【操作步骤&问题现象】1、具体报错信息如下:2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
-
登录ECS控制台登录ECS控制台,找到已购买的云服务器并登录点击远程登录,(用户名:root)输入密码(密码不显示,但可正常输入),即可登录ECS下载Kafka,并上传至ECS执行命令下载Kafka:wget https://repo.huaweicloud.com/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz解压:tar zxvf kafka_2.12-1.1.0.tgzroot@ecs-kc1-large-2-linux-20200906181836:~# wget https://repo.huaweicloud.com/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz--2020-09-15 10:18:47-- https://repo.huaweicloud.com/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgzResolving repo.huaweicloud.com (repo.huaweicloud.com)... 61.147.106.222, 59.56.28.88, 125.77.150.15, ...Connecting to repo.huaweicloud.com (repo.huaweicloud.com)|61.147.106.222|:443... connected.HTTP request sent, awaiting response... 200 OKLength: 50326212 (48M) [application/octet-stream]Saving to: ‘kafka_2.12-1.1.0.tgz’kafka_2.12-1.1.0.tgz 100%[====================================================================================================>] 47.99M 2.80MB/s in 17s 2020-09-15 10:19:04 (2.81 MB/s) - ‘kafka_2.12-1.1.0.tgz’ saved [50326212/50326212]root@ecs-kc1-large-2-linux-20200906181836:~# tar zxvf kafka_2.12-1.1.0.tgzkafka_2.12-1.1.0/...kafka_2.12-1.1.0/libs/kafka-streams-examples-1.1.0.jarroot@ecs-kc1-large-2-linux-20200906181836:~#在Kafka实例详情页,获取Kafka broker地址创建topic登录ECS执行生产脚本脚本目录是:/pathtokafka/kafka_2.12-1.1.0/bin/kafka-console-producer.sh执行命令:bin/kafka-console-producer.sh --broker-list 192.168.0.186:9092 --topic topic-1628361972root@ecs-kc1-large-2-linux-20200906181836:~# lltotal 49196drwx------ 7 root root 4096 Sep 15 10:22 ./drwxr-xr-x 25 root root 4096 Sep 15 10:15 ../-rw------- 1 root root 0 Dec 2 2019 .bash_history-rw-r--r-- 1 root root 3106 Apr 9 2018 .bashrcdrwx------ 2 root root 4096 Nov 26 2019 .cache/drwx------ 3 root root 4096 Nov 26 2019 .gnupg/drwxr-xr-x 6 root root 4096 Mar 24 2018 kafka_2.12-1.1.0/-rw-r--r-- 1 root root 50326212 Mar 28 2018 kafka_2.12-1.1.0.tgzdrwxr-xr-x 2 root root 4096 Dec 2 2019 .oracle_jre_usage/-rw-r--r-- 1 root root 148 Aug 17 2015 .profiledrwx------ 2 root root 4096 Sep 15 10:15 .ssh/-rw------- 1 root root 4864 Dec 2 2019 .viminfo-rw------- 1 root root 82 Sep 15 10:16 .Xauthorityroot@ecs-kc1-large-2-linux-20200906181836:~# cd kafka_2.12-1.1.0/root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# bin/kafka-console-producer.sh --broker-list 192.168.0.186:9092 --topic topic-1628361972>>hengji>2020.9.15>10.37>shanghai ^H^H>exit>–broker-list:kafka broker地址,多个已逗号隔开–topic:指定生产消息的topic(在3中创建输入任意字符后回车,即生产消息登录ECS执行消费脚本脚本目录是:/pathtokafka/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh执行命令: bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.186:9092 --topic topic-1628361972 --group test --consumer-property enable.auto.commit=true --from-beginningroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.186:9092 --topic topic-1628361972 --group test --consumer-property enable.auto.commit=true --from-beginning hengjishanghai 2020.9.15exit10.37–bootstrap-server:kafka broker地址,多个已逗号隔开–consumer-property enable.auto.commit=true,设置消费自动提交消费进度–topic:指定消费消息的topic(在3中创建)–group:指定消费组名称,可自定义在生产窗口输入消息,在消费窗口会显示消息生产代码demo为了方便操作,包已经打包好了:地址:链接: https://download.csdn.net/download/weixin_43572042/12845314.百度网盘: https://crazyhengji.blog.csdn.net/article/details/108636026.package dms.kafka.demo;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaProducerDemo{ public static void main(String[] args) throws InterruptedException, ExecutionException { if (args.length != 2) { throw new IllegalArgumentException("usage: dms.kafka.demo.KafkaProducerDemo bootstrap-servers topic-name."); } Properties props = new Properties(); props.put("bootstrap.servers", args[0]); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { Future<RecordMetadata> result = producer.send(new ProducerRecord<String, String>(args[1], Integer.toString(i), Integer.toString(i))); RecordMetadata rm = result.get(); System.out.println("topic: " + rm.topic() + ", partition: " + rm.partition() + ", offset: " + rm.offset()); } producer.close(); }}将生产代码打包,上传至Kafka libs目录root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# cd libs/root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0/libs# pwd/root/kafka_2.12-1.1.0/libsroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0/libs# lltotal 48976drwxr-xr-x 2 root root 4096 Sep 15 10:48 ./drwxr-xr-x 6 root root 4096 Mar 24 2018 ../-rw-r--r-- 1 root root 14768 Aug 14 2017 aopalliance-repackaged-2.5.0-b32.jar-rw-r--r-- 1 root root 90347 Aug 14 2017 argparse4j-0.7.0.jar-rw-r--r-- 1 root root 479881 Aug 14 2017 commons-lang3-3.5.jar-rw-r--r-- 1 root root 89496 Mar 24 2018 connect-api-1.1.0.jar-rw-r--r-- 1 root root 19542 Mar 24 2018 connect-file-1.1.0.jar-rw-r--r-- 1 root root 44928 Mar 24 2018 connect-json-1.1.0.jar-rw-r--r-- 1 root root 408225 Mar 24 2018 connect-runtime-1.1.0.jar-rw-r--r-- 1 root root 88980 Mar 24 2018 connect-transforms-1.1.0.jar-rw-r--r-- 1 root root 3225 Sep 15 10:48 dms.kafka.demo.jar......执行生产命令:java -cp .:./libs/* dms.kafka.demo.KafkaProducerDemo 192.168.0.186:9092 topic-1628361972root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# java -cp .:./libs/* dms.kafka.demo.KafkaProducerDemo 192.168.0.186:9092 topic-1628361972topic: topic-1628361972, partition: 2, offset: 2topic: topic-1628361972, partition: 0, offset: 2...topic: topic-1628361972, partition: 0, offset: 45topic: topic-1628361972, partition: 0, offset: 46消费代码demo为了方便操作,包已经打包好了:地址:链接: https://download.csdn.net/download/weixin_43572042/12845314.百度网盘: https://crazyhengji.blog.csdn.net/article/details/108636026.package dms.kafka.demo;import java.util.Arrays; import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;public class KafkaConsumerDemo { public static void main(String[] args) { if (args.length != 3) { throw new IllegalArgumentException("usage: dms.kafka.demo.KafkaProducerDemo bootstrap-servers topic-name groupname."); } Properties props = new Properties(); props.put("bootstrap.servers", args[0]); props.put("group.id", args[2]); props.put("enable.auto.commit", "true"); props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(args[1])); while (true) { ConsumerRecords<String, String> records = consumer.poll(200); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }}将消费代码打包,上传至Kafka libs目录root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# cd libs/root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0/libs# pwd/root/kafka_2.12-1.1.0/libsroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0/libs# lltotal 48976drwxr-xr-x 2 root root 4096 Sep 15 10:48 ./drwxr-xr-x 6 root root 4096 Mar 24 2018 ../-rw-r--r-- 1 root root 14768 Aug 14 2017 aopalliance-repackaged-2.5.0-b32.jar-rw-r--r-- 1 root root 90347 Aug 14 2017 argparse4j-0.7.0.jar-rw-r--r-- 1 root root 479881 Aug 14 2017 commons-lang3-3.5.jar-rw-r--r-- 1 root root 89496 Mar 24 2018 connect-api-1.1.0.jar-rw-r--r-- 1 root root 19542 Mar 24 2018 connect-file-1.1.0.jar-rw-r--r-- 1 root root 44928 Mar 24 2018 connect-json-1.1.0.jar-rw-r--r-- 1 root root 408225 Mar 24 2018 connect-runtime-1.1.0.jar-rw-r--r-- 1 root root 88980 Mar 24 2018 connect-transforms-1.1.0.jar-rw-r--r-- 1 root root 3225 Sep 15 10:48 dms.kafka.demo.jar......执行消费命令:java -cp .:./libs/* dms.kafka.demo.KafkaConsumerDemo 192.168.0.186:9092 topic-1628361972 testroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# java -cp .:./libs/* dms.kafka.demo.KafkaConsumerDemo 192.168.0.186:9092 topic-1628361972 testoffset = 2, key = 1, value = 1offset = 3, key = 5, value = 5...offset = 33, key = 94, value = 94offset = 34, key = 95, value = 95登录ECS执行消费组信息查询脚本目录是:/pathtokafka/kafka_2.12-1.1.0/bin/ kafka-consumer-groups.sh(1)查询消费组列表bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.186:9092 -listroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.186:9092 -listNote: This will not show information about old Zookeeper-based consumers.test(2)查询消费组详情bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.186:9092 --describe --group testroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.186:9092 --describe --group testNote: This will not show information about old Zookeeper-based consumers.Consumer group 'test' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtopic-1628361972 0 47 47 0 - - -topic-1628361972 2 24 24 0 - - -topic-1628361972 1 35 35 0 - - -TOPIC: 当前消费组消费的topicPARTITION:消费的分区CURRENT-OFFSET:分区的消费进度LOG-END-OFFSET:最新的消息offsetLAG:可消费消息数,为0代表该分区的消息都已消费完
上滑加载中
推荐直播
-
HDC深度解读系列 - Serverless与MCP融合创新,构建AI应用全新智能中枢2025/08/20 周三 16:30-18:00
张昆鹏 HCDG北京核心组代表
HDC2025期间,华为云展示了Serverless与MCP融合创新的解决方案,本期访谈直播,由华为云开发者专家(HCDE)兼华为云开发者社区组织HCDG北京核心组代表张鹏先生主持,华为云PaaS服务产品部 Serverless总监Ewen为大家深度解读华为云Serverless与MCP如何融合构建AI应用全新智能中枢
回顾中 -
关于RISC-V生态发展的思考2025/09/02 周二 17:00-18:00
中国科学院计算技术研究所副所长包云岗教授
中科院包云岗老师将在本次直播中,探讨处理器生态的关键要素及其联系,分享过去几年推动RISC-V生态建设实践过程中的经验与教训。
回顾中 -
一键搞定华为云万级资源,3步轻松管理企业成本2025/09/09 周二 15:00-16:00
阿言 华为云交易产品经理
本直播重点介绍如何一键续费万级资源,3步轻松管理成本,帮助提升日常管理效率!
回顾中
热门标签